在eBay,我们每天需要处理超过60PB的数据,在使用Spark进行数据处理的时候,我们遇见了不少性能问题,解决这些问题通常需要大量的人工配置来干预或者优化,极大了增加了工作量;因此我们引入了Adaptive Execution(Dynamically Optimize execution) 和Indexed Bucket(Optimize Data Layout),以及一系列小的性能优化,本文讲介绍这些工作的基本原理和取得的效果,也有相关问题在Apache Spark Jira上的讨论链接。

注脚

1.

2.• Spark / Hadoop Contributor • eBay Data Services and Solutions team • Responsible for enterprise data warehouse / data lake development and optimization DATA SERVICES AND SOLUTIONS

3.Spark as Core ETL Engine DATA SERVICES AND SOLUTIONS 3

4.Spark as DW Processing Engine DS Model (Data Science) RT Data Service Batch Service Metadata Service DW Knowledge Metadata Integrated Data Layer Graph (Data Warehouse) ODS Layer DI ZETA (Data Infrastructure) Compute/Storage DATA SERVICES AND SOLUTIONS 4

5.Memory Capacity Challenge • 5K Target tables • 20K Intermediate/Working tables • 30PB Compressed data • 60PB Relational data processing every day DATA SERVICES AND SOLUTIONS

6.Optimization Objectives • Increase overall throughput: MB-Seconds (Memory x Duration) • Reduce response time for critical jobs: Duration DATA SERVICES AND SOLUTIONS

7.Spark Core Optimization Adaptive Execution – Dynamically optimize execution plan Indexed Bucket – Optimize data layout DATA SERVICES AND SOLUTIONS

8.Adaptive Execution Background • Initial by Databricks since 2015 @SPARK-9850 • At 2017, new design & implementation @SPARK-23128 • Main contributors from Intel and eBay. – Carson Wang – Yuming Wang – Yucai Yu – JkSelf – Chenzhao Guo – Windpiger – Cheng Hao – Chenxiao Mao – Yuanjian Li DATA SERVICES AND SOLUTIONS

9.What’s Adaptive Execution? Original Spark Even with CBO, hard to get optimal execution plans Catalyst Execution Execution SQL Results Plan Spark With Adaptive Execution Oracle has already supported AE in their latest products Execute stage Execute stage Catalyst and planning and planning SQL Execution Plan AE Optimized AE Optimized … Results Plan 1 Plan 2 DATA SERVICES AND SOLUTIONS

10.Major Cases • Simplify and Optimize configuration • Optimize join strategy • Handle skewed join DATA SERVICES AND SOLUTIONS

11.Dive Into Top Memory Intensive Queries Improper user configuration actually lead to many memory issue • Migration timeline is very tight • Tuning is very time consuming DATA SERVICES AND SOLUTIONS

12.Query 1: shuffle.partition is too big • shuffle.partition = 5000, each task processes only 20MB data. • Many schedule overhead, IO overhead etc. 113.5 GB / 5000 = 23.2 MB 85.6 GB / 5000 = 17.5 MB DATA SERVICES AND SOLUTIONS

13.Query 2: shuffle.partition is too small • shuffle.partition = 600 • Lots of GC overhead, the tasks run slow Duration: 7.0 min GC: 43s DATA SERVICES AND SOLUTIONS

14.Query 2: memory per core is too big • CPU usage is low: each core has 20GB memory spark.executor.memory 40g spark.executor.cores 2 40 GB / 2 = 20 GB spark.sql.shuffle.partitions 600 DATA SERVICES AND SOLUTIONS

15.shuffle.partition Conf with AE SPARK-23128 • In shuffle write: use big shuffle partition number to split data into small blocks. • In shuffle read: AE packs small blocks into 100MB. Apache Spark Spark With AE Reduce Reduce Reduce Shuffled Data Task 1 Task 2 Task 3 Partition 0 (90 MB) Partition 1 (30MB) Partition 1 (30 MB) Partition 0 Partition 2 Partition 4 Partition 2 (20 MB) (90MB) (20 MB) (70 MB) Partition 3 (40 MB) Partition 3 Partition 4 (70 MB) (40 MB) DATA SERVICES AND SOLUTIONS

16.But…No Free Launch ! • SPARK-9853: Optimize shuffle fetch of contiguous partition IDs 1.3x • SPARK-22537: Aggregation of map output statistics on driver faces single point bottleneck, 5s delay when shuffle.partition > 10000 DATA SERVICES AND SOLUTIONS

17.Cluster-wide Configuration • Best memory per core is 5GB q eBay has separate storage nodes and computer nodes q Each computer node has 384GB memory and 64 cores q CPU is the most expensive, max the CPU utilization: 384 / 64 = 5GB DATA SERVICES AND SOLUTIONS

18.Cluster-wide Configuration • Roughly estimated data size per core (shuffle stage) is 100MB q Spark UMM 5 GB * 60% = 3GB = 3072 MB q Shuffle data’s compression rate is 5x – 30x, suppose 15x q Temporary space for algorithm (e.g. radix sort), suppose 2x q 5GB * 60% / 15 / 2 ~ 102.4 MB: less spill, GC etc. DATA SERVICES AND SOLUTIONS

19.Unified Configuration with AE • spark.executor.memory=20GB • spark.executor.cores=4 // memory per core 5GB • spark.sql.adaptive.maxNumPostShufflePartitions=10000 • spark.sql.adaptive.shuffle.targetPostShuffleInputSize=100MB less spill, less GC, less schedule overhead less human configuration DATA SERVICES AND SOLUTIONS

20.Query 1: 1.7x MB-Seconds improvement Apache Spark Spark With AE MB-Seconds: 7,661,564,130 MB-Seconds: 4,540,665,342 5000 2500 1112 DATA SERVICES AND SOLUTIONS

21.Query 2: 6x MB-Seconds improvement • Enable AE: no GC issue, 1.4x MB-Seconds improvement • Less memory in new configuration solution Duration: 17s GC: 0.2s DATA SERVICES AND SOLUTIONS

22.Major Cases • Simplify and Optimize configuration • Optimize join strategy • Handle skewed join DATA SERVICES AND SOLUTIONS

23.Optimize Join Strategy – Overview • Based on runtime intermediate table size, AE builds best join plan. Apache Spark Spark With AE Sort Merge Broadcast Join Hash Join Shuffle Shuffle Broadcast Intermediate Intermediate Intermediate Intermediate Table A Table B Table A Table B Big Table Small Table DATA SERVICES AND SOLUTIONS

24.Optimize Join Strategy – How it works • Enable stats estimation for physical plan • Allow shuffle readers to request data from just one mapper • Optimize SortMergeJoin to BroadcastHashJoin at runtime DATA SERVICES AND SOLUTIONS 24

25.Optimize Join Strategy – Case Apache Spark Spark With AE MB-Seconds: 75,203,212,678 MB-Seconds: 17,926,197,353 13.4 MB BroadcastJoin SortMergeJoin DATA SERVICES AND SOLUTIONS

26.Optimize Join Strategy – Case Apache Spark 4.2x MB-Seconds improvement Spark With AE DATA SERVICES AND SOLUTIONS

27.Major Cases • Simplify and Optimize shuffle partitions • Optimize join strategy • Handle skewed join DATA SERVICES AND SOLUTIONS

28.Handle Skewed Join – Challenge • Common issue, some partitions data are extremely larger than others. Avg: 3 s Max Time: 4.0 min Avg: 5 MB Max Size: 172 MB DATA SERVICES AND SOLUTIONS

29.Handle Skewed Join – How it works • AE dynamically detect the skewed task, and increases parallelism. Spark With AE Apache Spark Table A Table B Task 1 Partition x - 1 Partition x Task 1 Table A Table B Task 2 Table A Partition x - 2 Partition x Union Partition x Table B Partition x … Table A Table B Partition x - N Partition x Task N DATA SERVICES AND SOLUTIONS

30.Handle Skewed Join – Case 1.6x run time reduced 27MB 1024MB 35 min Apache Spark 22 min Spark with AE DATA SERVICES AND SOLUTIONS

31.Bucket Support in AE • Bucket table read support: avoid shuffle for the bucketed table when shuffle.partition > bucket number • Bucket table write support: AE should respect bucket number when writing to the bucket table DATA SERVICES AND SOLUTIONS

32.Production Benchmark In 40 production queries: • 3.09x average MB-Seconds improvement • 61% improve +200% • 83% improve +50% DATA SERVICES AND SOLUTIONS

33.Spark Core Optimization Adaptive Execution – Dynamically optimize execution plan Indexed Bucket – Optimize data layout DATA SERVICES AND SOLUTIONS

34.Why Indexed Bucket Challenge: how to optimize query and merge at the same time Merge Query Working Downstream Big Table Table User bucket partition Bucket + Partition: too many small files DATA SERVICES AND SOLUTIONS

35.Indexed Bucket – How it works CREATE TABLE dw_lstg_item ( item_id DECIMAL(18,0), ...... ...... ...... auct_end_dt DATE, 2000-01-01 2000-01-01 2000-01-01 ... 2001-01-01 2001-01-01 2001-01-01 ) ...... ...... ...... USING parquet 2018-04-01 2018-04-01 ...... 2018-04-01 CLUSTERED BY (item_id) SORTED BY (auct_end_dt) 2018-04-02 2018-04-02 2018-04-02 INTO 10000 BUCKETS; ...... ...... ...... INSERT INTO TABLE dw_lstg_item bucket 9999 bucket 0 bucket 1 SELECT * FROM xxx DISTRIBBUTE BY item_id; – bucket by item_id + sort by auct_end_dt (reduces files number) – data is sorted with parquet’s min-max index for filtering performance DATA SERVICES AND SOLUTIONS

36.Indexed Bucket – How it works • Index: Efficient data skipping based on min/max statistics • Pushdown: [SPARK-25419] Parquet predicate pushdown improvement #SAISExp11 #SAISExp11 36 DATA SERVICES AND SOLUTIONS

37.Query Improvements – Predicate Pushdown [SPARK-25419] Improvement parquet predicate pushdown • [SPARK-23727] Support Date type • [SPARK-24549] Support Decimal type • [SPARK-24718] Support Timestamp type • [SPARK-24706] Support Byte type and Short type • [SPARK-24638] Support StringStartsWith predicate • [SPARK-17091] Support IN predicate DATA SERVICES AND SOLUTIONS

38.Query Improvements – Predicate Pushdown Bucket table SELECT lstg.item_site_id, ... FROM dw_lstg_item lstg WHERE lstg.auct_end_dt = CAST('2018-04-19') AS DATE); Indexed bucket table 24.7x Improvement #SAISExp11 38 DATA SERVICES AND SOLUTIONS

39.Query Improvements – Enhance Bucket Join DATA SERVICES AND SOLUTIONS

40.Query Improvements – Enhance Bucket Join Apache Spark, unnecessary shuffle DATA SERVICES AND SOLUTIONS

41.Query Improvements – Enhance Bucket Join Spark With IB [SPARK-24087] Avoid shuffle when join keys are a super-set of bucket keys DATA SERVICES AND SOLUTIONS

42.Query Improvements – Comprehensive Case Apache Spark 2,892,165,122 MB-Seconds SELECT lstg.item_site_id, ... FROM dw_lstg_item lstg, dw_category_hierarchy h_categ lstg_item_vrtn v WHERE lstg.auct_end_dt = CAST(SUBSTR('2018-04-19 00:00:00',1,10) AS DATE) Spark With IB AND lstg.item_id = v.item_id 944,326,656 MB-Seconds AND lstg.auct_end_dt = v.auct_end_dt AND lstg.categ_key = h_categ.categ_key AND lstg.item_site_id IN (SELECT site_id FROM dw_dlp_site_lkp) ORDER BY 1,2,3; 3.06x MB-Seconds improvement DATA SERVICES AND SOLUTIONS

43.Indexed Bucket Summary • Brings significant benefits – Reduce execution time – Improve memory efficiency • Especially useful for the downstream to avoid full table scan on huge and hot tables. DATA SERVICES AND SOLUTIONS

44.More Than AE/IB … Make optimization production ready • : TPCDS 100TB, eBay cases • • DATA SERVICES AND SOLUTIONS

45.~ 50 issues reported to community during migration Case-insensitive field resolution • SPARK-25132 Case-insensitive field resolution when reading from Parquet • SPARK-25175 Field resolution should fail if there's ambiguity for ORC native reader • SPARK-25207 Case-insensitive field resolution for filter pushdown when reading Parquet Parquet filter pushdown • SPARK-23727 Support DATE predict push down in parquet • SPARK-24716 Refactor ParquetFilters • SPARK-24706 Support ByteType and ShortType pushdown to parquet • SPARK-24549 Support DecimalType push down to the parquet data sources • SPARK-24718 Timestamp support pushdown to parquet data source • SPARK-24638 StringStartsWith support push down • SPARK-17091 Convert IN predicate to equivalent Parquet filter UDF Improvement • SPARK-23900 format_number udf should take user specified format as argument • SPARK-23903 Add support for date extract Bugs • SPARK-24076 very bad performance when shuffle.partition = 8192 • SPARK-25084 "distribute by" on multiple columns may lead to codegen issue • SPARK-25368 Incorrect constraint inference returns wrong result • SPARK-24556 ReusedExchange should rewrite output partitioning DATA SERVICES AND SOLUTIONS

46.THANKS DATA SERVICES AND SOLUTIONS

user picture
  • 献良
  • 非著名互联网公司工程师

相关Slides

  • 讲解了Facebook在spark shuffle方面的优化,相关论文为 EuroSys ’18: Riffle: Optimized Shuffle Service for Large-Scale Data Analytics

  • Hive作为数据仓库的核心,其元数据管理已经成为大数据领域事实上的标准,各种大数据处理引擎都尝试对其兼容,本文描述社区如何讲Hive服务以及Hive MetaStore服务独立处理,并支持各种权限验证功能。

  • Spark 流式有两套系统:Spark Streaming 和 Structured Streaming。那么这两套系统的区别在哪里呢?以及为什么 Spark 有了 Spark Streaming 还有做 Structured Streaming 呢?我们应该如何去选择呢?

  • MLSQL的文档自助系统 更多信息访问官网: http://www.mlsql.tech