Spark tunning in Apache Kylin

本文是2018年7月,Kyligence 架构师在 Apache Kylin meetup@上海 活动上做的分享;介绍了如何调优 Kylin 的 Spark cubing 引擎。

1.Spark performance tuning for Apache Kylin Shaofeng Shi 2018/7

2.Background • Kylin 2.0 starts to use Spark as the Cube build engine • Has been proved can improve 2x to 3x build performance • Need to have Spark tuning experience. • Kylin 2.5 will move more jobs onto Spark • Convert to HFile (KYLIN-3427) • Merge segments (KYLIN-3441) • Merge dictionaries on YARN (KYLIN-3471) • Fact distinct columns in Spark (KYLIN-3442) • In the future, Spark engine will replace MR

3.Cubing in Spark

4.Agenda • Why Spark • Spark on YARN Model • Spark Executor Memory Model • Executor/Driver memory/core configuration • Dynamic Resource Allocation • RDD Partitioning • Shuffle • Compression • DFS Replication • Deploy Modes • Other Tips

5.Why Apache Spark • Fast, memory centric distributed computing framework • Flexible API • Spark Core • DataFrames, Datasets and SparkSQL • Spark Streaming • MLLib/SparkR • Languages support • Java, Scala, Python, R • Deployment option: • Standalone/YARN/Mesos/Kubernetes (Spark 2.3+)

6.Spark on YARN memory model • Overhead memory • JVM need memory to run • By default: executor memory * 0.1, minimal 384 MB; • Executor memory

7.Spark on YARN memory model (cont.) • If you allocation 4GB to an executor, Spark will request: • 4 * 0.1 + 4 = 4.4 GB as the container memory from YARN • From our observation, the default factor (0.1) is a little small for Kylin, executor is very likely be killed. • Give 1GB or more as overhead memory • spark.yarn.executor.memoryOverhead=1024 • From Kylin 2.5, default request 1GB for overhead.

8.Spark executor memory model • Reserved memory • 300MB, just for avoiding OOM • Spark memory • spark.memory.fraction=0.6 • For both storage/cache and execution (shuffle, sort) • spark.memory.storageFraction=0.5: cache and execution half half. • User memory • The left is for user code execution

9.Spark executor memory model(cont.) • An example: • Given an executor 4GB memory, its max. storage/execution memory is: • (4096 – 300) * 0.6 = 2.27GB • If the executor need run computation (need sorting/shuffling), the space for RDD cache can be shrined to: • 2.27GB * 0.5 = 1.13 GB • User memory: • (4096 – 300) * 0.4 = 1.52 GB • When you have big dictionaries, consider to allocate more to user memory

10.Executor memory/core configuration • Check how much memory/core available in your Hadoop cluster • To maximize the resource utilization, use the similar ratio for Spark. • For example, a cluster has 100 cores and 500 GB memory. You can allocate 1 core, 5GB (1 GB for overhead, 4GB for executor) for each executor instance. • If you use multiple cores in one executor, increase the memory accordingly • e.g., 2 core + 10 GB per instance. • No more than 40GB mem / instance

11.Driver memory configuration • Kylin does not collect data to driver, you can configure less resource for driver • spark.driver.memory=2g • spark.driver.cores=1

12.More instances less core, or less instance more cores? • Spark active task number = instance * (cores / instance) • Both can get similar parallelism • If use more cores in one executor, tasks can share references in the same JVM • Share big objects like dictionaries • If with Spark Dynamic resource allocation, 1 core per instance.

13.Dynamic resource allocation • Dynamic allocation can improve resource utilization • Not enabled by default

14.Dynamic resource allocation • Static allocation does not fit for Kylin. • Cubing is by layer; Each layer’s size is different • Workload is unbalanced: small -> mediate -> big -> extreme big -> small -> tiny • DRA is highly recommended. • With DRA enabled, 1 executor has 1 core.

15.RDD partitioning • RDD Partition is similar as File Split in MapReduce; • Spark prefers to many & small partitions, instead of less & big partition • Kylin splits partition by estimated file size (after aggregation), by default 1 partition per 10 MB: • kylin.engine.spark.rdd-partition-cut-mb=10 • The real size may vary as the estimation might be inaccurate • This may affect the performance greatly! • Min/max partition cap: • kylin.engine.spark.min-partition=1 • kylin.engine.spark.max-partition=5000

16.Partition number is important • When partition number is less than normal • Less parallelism, low resource utilization ratio • Executor OOM (especially when use "mapPartition ”) • When partition number is much more than normal • Shuffle is slow • Many small fraction generated • Pay attention if you observe a job has > 1000 partitions

17.Partition number can be wild in certain case • If your cube has Count Distinct or TopN measures, the estimated size may be far bigger than actual, causing too many partitions. • Tune the parameter manually, at Cube level, according to the actual Cuboid file size: • kylin.engine.spark.rdd-partition-cut-mb=100 • Or, reduce the max. partition number: • kylin.engine.spark.max-partition=500 • KYLIN-3453 Make the size estimation more accurate • KYLIN-3472 TopN in Spark is slow

18.Shuffle • Spark shuffle is similar as MapReduce • Partition mapper’s output and send the partition only to its reducer; • Reducer buffers data in memory, sort, aggregate and then reduce. • But with difference • Spark sorts the data on map side, but doesn’t merge them on reduce side; • If user need the data be sorted, call “sortByKey”or similar, Spark will re-sort the data. The re-sort doesn’t aware map’s output is already sorted. • The sorting is in memory, spill if memory is full

19.Shuffle (cont.) • Shuffle spill • Spill memory = (executorMemory – 300M) * memory.fractor * (1 – memory.StorageFraction) • Spilled files won’t be merged, until data be request, merging on the fly • If you need data be sorted, Spark is slower than MR. • SPARK-2926 tries to introduce MR-style merge sort. • Kylin’s“Convert to HFile” step need the value being sorted. Spark may spend 2x time on this step than MR.

20.Compression • Compression can significantly reduce IO • By default Kylin enabled compression for MR in `conf/kylin_job_conf.xml`, but not for Spark • If your Hadoop did not enable compression, you may see 2X sized file generated when switch from MR to Spark engine • Manually enable compression with adding: • kylin.engine.spark- conf.spark.hadoop.mapreduce.output.fileoutputformat.compress=true • kylin.engine.spark- conf.spark.hadoop.mapreduce.output.fileoutputformat.compress.codec=org.apa • Kylin 2.5 will enable compression by default.

21.Compression (cont.) • 40% performance improvement + 50% disk saving No compression vs compression (Merge segments on Spark)

22.DFS replication • Kylin keeps 2 replication for intermediate files, configurated in `kylin_job_conf.xml` and `kylin_hive_conf.xml` • But this does not work for Spark • Manually add: • kylin.engine.spark-conf.spark.hadoop.dfs.replication=2 • Save 1/3 disk space • Kylin 2.5 will enable this by default.

23.Deployment modes • Spark on YARN has two deploy modes • Cluster: driver runs inside app master • Client: driver runs in client process • When dev/debugging, use `client` mode; • Start fast, with detailed log message printed on console • Will occupy client node memory • In production deployment, use `cluster` mode. • Kylin 2.5 will use `cluster` mode by default

24.Other tips • Pre-upload YARN archive • Avoid uploading big files repeatedly • Accelerate job startup • Run Spark history server for trouble shooting • Identify bottleneck much easier •

25.Recommended configurations (Kylin 2.2-2.4, Spark 2.1) • kylin.engine.spark-conf.spark.submit.deployMode=cluster • kylin.engine.spark-conf.spark.dynamicAllocation.enabled=true • kylin.engine.spark-conf.spark.dynamicAllocation.minExecutors=1 • kylin.engine.spark-conf.spark.dynamicAllocation.maxExecutors=1000 • kylin.engine.spark-conf.spark.dynamicAllocation.executorIdleTimeout=300 • kylin.engine.spark-conf.spark.driver.memory=2G • kylin.engine.spark-conf.spark.executor.memory=4G • kylin.engine.spark-conf.spark.yarn.executor.memoryOverhead=1024 • kylin.engine.spark-conf.spark.executor.cores=1 • • kylin.engine.spark-conf.spark.yarn.archive=hdfs://nameservice/kylin/spark/spark-libs.jar • kylin.engine.spark-conf.spark.shuffle.service.enabled=true • kylin.engine.spark-conf.spark.hadoop.dfs.replication=2 • kylin.engine.spark-conf.spark.hadoop.mapreduce.output.fileoutputformat.compress=true • •

26.Key takeaway • Kylin will move more jobs to Spark • Master Spark tuning will help you run Kylin better • Kylin aims to provide an out-of-box user experience of Spark, like MR.

27.We are hiring Apache Kylin dev@kylin.apach Kyligence Inc

Kyligence (上海跬智信息技术有限公司)由首个来自中国的 Apache 软件基金会顶级开源项目 Apache Kylin 核心团队组建,是专注于大数据分析领域的数据科技公司,通过前沿数据技术的分析认知来加速用户关键商业决策是其使命。