Spark SQL自适应执行释放大规模集群的力量

在本文中,我们将探讨英特尔和百度为应对大规模的挑战而作出的共同努力,并概述我们为基于Spark SQL的百度Big SQL平台实现的自适应执行模式。在运行时,自适应执行可以更改执行计划以使用更好的连接策略并自动处理倾斜连接。它还可以改变减速器的数量,以更好地适应数据规模。通常,自适应执行减少了调优SQL查询参数所需的工作,并且通过在运行时选择更好的执行计划和并行性来提高执行性能。

1.Spark SQL Adaptive Execution Unleashes The Power of Cluster in Large Scale Carson Wang (Intel), Yuanjian Li (Baidu) #Exp5SAIS

2.Agenda • Challenges in Using Spark SQL • Adaptive Execution Introduction • Adaptive Execution in Baidu #Exp5SAIS 2

3.Tuning Shuffle Partition Number • Too small Spill, OOM • Too large Scheduling overhead. More IO requests. Too many small output files • The same shuffle partition number doesn’t fit for all stages #Exp5SAIS 3

4.Spark SQL Join Selection • A Join may takes intermediate results as inputs. Spark SQL may choose an inefficient join strategy if it doesn’t know the exact size at planning phase. #Exp5SAIS 4

5.Data Skew in Join • Data in some partitions are extremely larger than other partitions. Data skew is a common source of slowness for shuffle joins. • Common ways to solve data skew – Increase shuffle partition number – Increase BroadcastJoin threashold – Add prefix to the skewed keys #Exp5SAIS 5

6.Spark SQL Execution Mode #Exp5SAIS 6

7.Spark SQL Adaptive Execution Mode #Exp5SAIS 7

8.Auto Setting the Reducer Number • Enable the feature – spark.sql.adaptive.enabled -> true • Configure the behavior – Target input size for a reduce task – Min/Max shuffle partition number Best Shuffle Partition Number Min Shuffle Partition Number Max Shuffle Partition Number #Exp5SAIS 8

9.Auto Setting the Reducer Number • Target size per reducer = 64 MB. • Min-Max shuffle partition number = 1 to 5 ShuffledRowRDD ShuffledRowRDD Partition 0 (70MB) Partition 0 (70MB) Adaptive Execution Partition 1 (30MB) Partition 1 (30MB) uses 3 reducers at Partition 2 (20MB ) Partition 2 (20MB) Partition 3 (10MB) runtime. Partition 3 (10MB) Partition 4 (50MB) Partition 4 (50MB) #Exp5SAIS 9

10.Optimize Join Strategy at Runtime #Exp5SAIS 10

11.Optimize Join Strategy at Runtime Map Task • After optimizing SortMergeJoin to ReduceTask BroadcastJoin, each Executor reduce task local read the whole map output Map Output File file and join with the broadcasted table. #Exp5SAIS 11

12.Handle Skewed Join at Runtime • spark.sql.adaptive.skewedJoin.enabled -> true • A partition is thought as skewed if its data size or row count is N times larger than the median, and also larger than a pre-defined threshold. #Exp5SAIS 12

13.Handle Skewed Join at Runtime Table 1 Union Table 2 Partition 0 (part0) SMJ SMJ Partition 0 Join Partition 0 (part1) Sort Sort Sort Sort Partition 0 (part2) …… QS QS QS QS …… Input Input Input Input Table 1 Table 2 Table 1 Table 2 Parttition 1-N Partition 1-N Partition 0 (part0) Partition 0 #Exp5SAIS 13

14. Spark in Baidu 20000 18000 15000 10000 9500 6500 5800 5000 3000 1000 1500 0 80 50 300 Nodes Jobs/day 2014 2015 2016 2017 2018 • Build Cluster • Build standalone over YARN • Structure cluster • Integrate with • SQL\Graph • Spark import Streaming • Integrate with in-house Service over • Spark Adaptive to Baidu in-house Spark Resource Execution • Version: 0.8 FS\Pub-Sub\DW • OAP Scheduler • Hadoop to Spark • Version: 1.4 System • Version: 2.1 • Version: 2.2 • Version: 1.6 #Exp5SAIS 14

15.AE Boosting Scenario in Baidu • Specific user scene(SortMergeJoin -> BroadcastJoin) • Long running application or use Spark as a service • Graph & ML #Exp5SAIS 15

16.SortMergeJoin -> BroadcastJoin • Common features in the scenario: – Small table join big table in sub query – Small table generated by sub query • Key Point: – Identify & determine ‘small’ table • Acceleration ratio: – 50%~200% #Exp5SAIS 16

17.SortMergeJoin -> BroadcastJoin SELECT t.c1,, t.c2, t.c3, t.c4, sum(t.num1), sum(t.num2), Base Base sum(t.num3) FROM UserList Table1 Table2 ( SELECT c1, as id, c2, c3, c4, sum(num1s) as num1, sum (num2) as num2, sum(num3) as num3 FROM huge_table1 t1 INNER JOIN user_list t2 ON ( = WHERE (event_da Inner Join Inner Join y=20171107) and flag != 'true' group by c1,, c2, c3, c4 UNION ALL SELECT c1, as id, c2, c3, c4, sum(num1s) as num1, sum SubQuery1 SubQuery2 (num2) as num2, sum(num3) as num3 FROM huge_table2 t1 INNER JOIN user_list t2 ON ( = WHERE (event_da y=20171107) and flag != 'true' group by c1,, c2, c3, c4 Union & Aggregate )t GROUP BY t.c1,, t.c2, t.c3, c4 Result #Exp5SAIS #Exp5SAIS 17

18.Long Running Application • Including scenario: – Long running batch job(> 1 hour) – Using Spark as a service • (Livy\Baidu BigSQL\Spark Shell\Zeppelin) – Spark Streaming • Key Point: – Adaptive parallelism adjustment • Acceleration ratio: – 50%~100% #Exp5SAIS 18

19.Long Running Application Duration: 52min 100 instance 10G executor.mem 4 executor.cores AE enable False Duration: 30min 100 instance 10G executor.mem 4 executor.cores AE enable True Min/MaxNumPostShufflePa rtitions 400/10000 targetPostShuffleInputSize 512M #Exp5SAIS 19

20.GraphFrame & MLlib • Including scenario: – GraphFrame APP – MLlib • Key Point: – Adaptive parallelism adjustment • Acceleration ratio: – 50%~100% #Exp5SAIS 20

21.AE probe in Spark Baidu ShowX Console AE Probe Batch Streaming SQL … Spark Application Executor Executor MetricsSink NodeManager NodeManager RigAgent RigAgent HostA HostB Host… Spark Over YARN Baidu Bigpipe #Exp5SAIS 21

22.AE probe in Spark #Exp5SAIS 22

23.Takeaways • Three main features in our adaptive execution – Auto setting the shuffle partition number – Optimize join strategy at runtime – Handle skewed join at runtime • For more information about our implementation: – – #Exp5SAIS 23

24.Thank you! Carson Wang Yuanjian Li #Exp5SAIS