在eBay,我们每天需要处理超过60PB的数据,在使用Spark进行数据处理的时候,我们遇见了不少性能问题,解决这些问题通常需要大量的人工配置来干预或者优化,极大了增加了工作量;因此我们引入了Adaptive Execution(Dynamically Optimize execution) 和Indexed Bucket(Optimize Data Layout),以及一系列小的性能优化,本文讲介绍这些工作的基本原理和取得的效果,也有相关问题在Apache Spark Jira上的讨论链接。


2.• Spark / Hadoop Contributor • eBay Data Services and Solutions team • Responsible for enterprise data warehouse / data lake development and optimization DATA SERVICES AND SOLUTIONS


4.Spark as DW Processing Engine DS Model (Data Science) RT Data Service Batch Service Metadata Service DW Knowledge Metadata Integrated Data Layer Graph (Data Warehouse) ODS Layer DI ZETA (Data Infrastructure) Compute/Storage DATA SERVICES AND SOLUTIONS 4

5.Memory Capacity Challenge • 5K Target tables • 20K Intermediate/Working tables • 30PB Compressed data • 60PB Relational data processing every day DATA SERVICES AND SOLUTIONS

6.Optimization Objectives • Increase overall throughput: MB-Seconds (Memory x Duration) • Reduce response time for critical jobs: Duration DATA SERVICES AND SOLUTIONS

7.Spark Core Optimization Adaptive Execution – Dynamically optimize execution plan Indexed Bucket – Optimize data layout DATA SERVICES AND SOLUTIONS

8.Adaptive Execution Background • Initial by Databricks since 2015 @SPARK-9850 • At 2017, new design & implementation @SPARK-23128 • Main contributors from Intel and eBay. – Carson Wang – Yuming Wang – Yucai Yu – JkSelf – Chenzhao Guo – Windpiger – Cheng Hao – Chenxiao Mao – Yuanjian Li DATA SERVICES AND SOLUTIONS

9.What’s Adaptive Execution? Original Spark Even with CBO, hard to get optimal execution plans Catalyst Execution Execution SQL Results Plan Spark With Adaptive Execution Oracle has already supported AE in their latest products Execute stage Execute stage Catalyst and planning and planning SQL Execution Plan AE Optimized AE Optimized … Results Plan 1 Plan 2 DATA SERVICES AND SOLUTIONS

10.Major Cases • Simplify and Optimize configuration • Optimize join strategy • Handle skewed join DATA SERVICES AND SOLUTIONS

11.Dive Into Top Memory Intensive Queries Improper user configuration actually lead to many memory issue • Migration timeline is very tight • Tuning is very time consuming DATA SERVICES AND SOLUTIONS

12.Query 1: shuffle.partition is too big • shuffle.partition = 5000, each task processes only 20MB data. • Many schedule overhead, IO overhead etc. 113.5 GB / 5000 = 23.2 MB 85.6 GB / 5000 = 17.5 MB DATA SERVICES AND SOLUTIONS

13.Query 2: shuffle.partition is too small • shuffle.partition = 600 • Lots of GC overhead, the tasks run slow Duration: 7.0 min GC: 43s DATA SERVICES AND SOLUTIONS

14.Query 2: memory per core is too big • CPU usage is low: each core has 20GB memory spark.executor.memory 40g spark.executor.cores 2 40 GB / 2 = 20 GB spark.sql.shuffle.partitions 600 DATA SERVICES AND SOLUTIONS

15.shuffle.partition Conf with AE SPARK-23128 • In shuffle write: use big shuffle partition number to split data into small blocks. • In shuffle read: AE packs small blocks into 100MB. Apache Spark Spark With AE Reduce Reduce Reduce Shuffled Data Task 1 Task 2 Task 3 Partition 0 (90 MB) Partition 1 (30MB) Partition 1 (30 MB) Partition 0 Partition 2 Partition 4 Partition 2 (20 MB) (90MB) (20 MB) (70 MB) Partition 3 (40 MB) Partition 3 Partition 4 (70 MB) (40 MB) DATA SERVICES AND SOLUTIONS

16.But…No Free Launch ! • SPARK-9853: Optimize shuffle fetch of contiguous partition IDs 1.3x • SPARK-22537: Aggregation of map output statistics on driver faces single point bottleneck, 5s delay when shuffle.partition > 10000 DATA SERVICES AND SOLUTIONS

17.Cluster-wide Configuration • Best memory per core is 5GB q eBay has separate storage nodes and computer nodes q Each computer node has 384GB memory and 64 cores q CPU is the most expensive, max the CPU utilization: 384 / 64 = 5GB DATA SERVICES AND SOLUTIONS

18.Cluster-wide Configuration • Roughly estimated data size per core (shuffle stage) is 100MB q Spark UMM 5 GB * 60% = 3GB = 3072 MB q Shuffle data’s compression rate is 5x – 30x, suppose 15x q Temporary space for algorithm (e.g. radix sort), suppose 2x q 5GB * 60% / 15 / 2 ~ 102.4 MB: less spill, GC etc. DATA SERVICES AND SOLUTIONS

19.Unified Configuration with AE • spark.executor.memory=20GB • spark.executor.cores=4 // memory per core 5GB • spark.sql.adaptive.maxNumPostShufflePartitions=10000 • spark.sql.adaptive.shuffle.targetPostShuffleInputSize=100MB less spill, less GC, less schedule overhead less human configuration DATA SERVICES AND SOLUTIONS

20.Query 1: 1.7x MB-Seconds improvement Apache Spark Spark With AE MB-Seconds: 7,661,564,130 MB-Seconds: 4,540,665,342 5000 2500 1112 DATA SERVICES AND SOLUTIONS

21.Query 2: 6x MB-Seconds improvement • Enable AE: no GC issue, 1.4x MB-Seconds improvement • Less memory in new configuration solution Duration: 17s GC: 0.2s DATA SERVICES AND SOLUTIONS

22.Major Cases • Simplify and Optimize configuration • Optimize join strategy • Handle skewed join DATA SERVICES AND SOLUTIONS

23.Optimize Join Strategy – Overview • Based on runtime intermediate table size, AE builds best join plan. Apache Spark Spark With AE Sort Merge Broadcast Join Hash Join Shuffle Shuffle Broadcast Intermediate Intermediate Intermediate Intermediate Table A Table B Table A Table B Big Table Small Table DATA SERVICES AND SOLUTIONS

24.Optimize Join Strategy – How it works • Enable stats estimation for physical plan • Allow shuffle readers to request data from just one mapper • Optimize SortMergeJoin to BroadcastHashJoin at runtime DATA SERVICES AND SOLUTIONS 24

25.Optimize Join Strategy – Case Apache Spark Spark With AE MB-Seconds: 75,203,212,678 MB-Seconds: 17,926,197,353 13.4 MB BroadcastJoin SortMergeJoin DATA SERVICES AND SOLUTIONS

26.Optimize Join Strategy – Case Apache Spark 4.2x MB-Seconds improvement Spark With AE DATA SERVICES AND SOLUTIONS

27.Major Cases • Simplify and Optimize shuffle partitions • Optimize join strategy • Handle skewed join DATA SERVICES AND SOLUTIONS

28.Handle Skewed Join – Challenge • Common issue, some partitions data are extremely larger than others. Avg: 3 s Max Time: 4.0 min Avg: 5 MB Max Size: 172 MB DATA SERVICES AND SOLUTIONS

29.Handle Skewed Join – How it works • AE dynamically detect the skewed task, and increases parallelism. Spark With AE Apache Spark Table A Table B Task 1 Partition x - 1 Partition x Task 1 Table A Table B Task 2 Table A Partition x - 2 Partition x Union Partition x Table B Partition x … Table A Table B Partition x - N Partition x Task N DATA SERVICES AND SOLUTIONS