在过去的一年时间中,Baidu和Intel的工程师为了最大限度优化Spark SQL在超大规模集群中的执行效率,引入了Adaptive Execution(自适应执行)技术,本文介绍自适应执行技术适用的应用场景,在数据倾斜,超小数据分块,自动广播Join等优化上表现不俗,最后也展现了Baidu在生产环境中采用自适应执行技术所获的的成本收益。

注脚

展开查看详情

1.Carson Wang (Intel), Yuanjian Li (Baidu) Spark SQL Adaptive Execution Unleashes The Power of Cluster in Large Scale #Exp5SAIS

2.Agenda Challenges in Using Spark SQL Adaptive Execution Introduction Adaptive Execution in Baidu 2 #Exp5SAIS

3.Tuning Shuffle Partition Number Too small:Spill , OOM Too large:Scheduling overhead. More IO requests. Too many small output files The same shuffle partition number doesn’t fit for all stages 3 #Exp5SAIS

4.Spark SQL Join Selection A Join may takes intermediate results as inputs. Spark SQL may choose an inefficient join strategy if it doesn’t know the exact size at planning phase. 4 #Exp5SAIS

5.Data Skew in Join Data in some partitions are extremely larger than other partitions. Data skew is a common source of slowness for shuffle joins. Common ways to solve data skew Increase shuffle partition number Increase BroadcastJoin threashold Add prefix to the skewed keys 5 #Exp5SAIS

6.Spark SQL Execution Mode 6 #Exp5SAIS

7.Spark SQL Adaptive Execution Mode 7 #Exp5SAIS

8.Auto Setting the Reducer Number Enable the feature spark.sql.adaptive.enabled -> true Configure the behavior Target input size for a reduce task Min/Max shuffle partition number 8 #Exp5SAIS Min Shuffle Partition Number Max Shuffle Partition Number Best Shuffle Partition Number

9.Auto Setting the Reducer Number Target size per reducer = 64 MB. Min-Max shuffle partition number = 1 to 5 9 #Exp5SAIS Adaptive Execution uses 3 reducers at runtime .

10.Optimize Join Strategy at Runtime 10 #Exp5SAIS

11.Optimize Join Strategy at Runtime After optimizing SortMergeJoin to BroadcastJoin, each reduce task local read the whole map output file and join with the broadcasted table. 11 #Exp5SAIS Map Task Executor ReduceTask Map Output File

12.Handle Skewed Join at Runtime spark.sql.adaptive.skewedJoin.enabled -> true A partition is thought as skewed if its data size or row count is N times larger than the median, and also larger than a pre-defined threshold. 12 #Exp5SAIS

13.Handle Skewed Join at Runtime 13 #Exp5SAIS …… Partition 0 (part0) Partition 0 (part1) Partition 0 (part2) Partition 0 Table 1 Table 2 Join Sort Sort SMJ Sort QS Input QS Input Sort SMJ Union …… QS Input QS Input Table 1 Partition 0 (part0) Table 2 Partition 0 Table 1 Parttition 1-N Table 2 Partition 1-N

14.Spark in Baidu 14 # Exp5SAIS Spark i mport to Baidu Version: 0.8 2014 2015 2016 2017 2018 Build standalone c luster Integrate with in-house FS\Pub-Sub\DW Version: 1.4 Build Cluster over YARN Integrate with in-house Resource Scheduler System Version: 1.6 SQL\Graph Service over Spark OAP Version: 2.1 Structure Streaming Spark Adaptive Execution Hadoop to Spark Version: 2.2

15.Specific user scene( SortMergeJoin -> BroadcastJoin ) Long running application or use Spark as a service Graph & ML 15 # Exp5SAIS AE Boosting Scenario in Baidu

16.Common features in the scenario: Small table join big table in sub query Small table generated by sub query Key Point: Identify & determine ‘ small ’ table Acceleration ratio: 50%~200% 16 SortMergeJoin -> BroadcastJoin # Exp5SAIS

17.SELECT t.c1 , t.id , t.c2, t.c3, t.c4,  sum(t.num1), sum(t.num2), sum(t.num3) FROM ( SELECT  c1, t1.id as id, c2, c3, c4, sum(num1s) as num1, sum(num2) as num2, sum(num3)  as num3 FROM huge_table1 t1 INNER JOIN user_list t2 ON (t1.id = t2.id)  WHERE  ( event_day =20171107)  and flag !=  true  group by c1, t1.id, c2, c3, c4 UNION ALL SELECT  c1, t1.id as id, c2, c3, c4, sum(num1s) as num1, sum(num2) as num2, sum(num3)  as num3 FROM huge_table2 t1 INNER JOIN user_list t2 ON (t1.id = t2.id)  WHERE  ( event_day =20171107)  and flag !=  true  group by c1, t1.id, c2, c3, c4 ) t GROUP BY t.c1 , t.id , t.c2, t.c3, c4 17 # Exp5SAIS SortMergeJoin -> BroadcastJoin UserList Base Table1 Base Table2 SubQuery1 SubQuery2 Inner Join Inner Join Result Union & Aggregate

18.18 Long R unning Application Including scenario: Long running batch job(> 1 hour) Using Spark as a service (Livy\Baidu BigSQL \Spark Shell\Zeppelin) Spark Streaming Key Point: Adaptive parallelism adjustment Acceleration ratio: 50%~100% # Exp5SAIS

19.19 Long R unning Application Duration: 52min 100 instance 10G executor.mem 4 executor.cores AE enable False Duration: 30min 100 instance 10G executor.mem 4 executor.cores AE enable True Min/ MaxNumPostShufflePartitions 400/10000 targetPostShuffleInputSize 512M # Exp5SAIS

20.20 GraphFrame & MLlib Including scenario: GraphFrame APP MLlib Key Point: Adaptive parallelism adjustment Acceleration ratio: 50%~100% # Exp5SAIS

21.21 AE probe in Spark # Exp5SAIS Spark Over YARN NodeManager HostA RigAgent Executor NodeManager HostB RigAgent Executor Host … MetricsSink Spark Application Baidu ShowX Console Baidu Bigpipe Batch Streaming SQL … AE Probe

22.22 #Exp5SAIS AE probe in Spark

23.Takeaways Three main features in our adaptive execution Auto setting the shuffle partition number Optimize join strategy at runtime Handle skewed join at runtime For more information about our implementation: https :// issues.apache.org/jira/browse/SPARK-23128 https:// github.com/Intel-bigdata/spark-adaptive 23 #Exp5SAIS

24.Carson Wang carson.wang@intel.com Yuanjian Li liyuanjian@baidu.com Thank you! #Exp5SAIS