- 快召唤伙伴们来围观吧
- 微博 QQ QQ空间 贴吧
- 文档嵌入链接
- 复制
- 微信扫一扫分享
- 已成功复制到剪贴板
Spark SQL自适应执行
展开查看详情
1 .Carson Wang (Intel), Yuanjian Li (Baidu) Spark SQL Adaptive Execution Unleashes The Power of Cluster in Large Scale #Exp5SAIS
2 .Agenda Challenges in Using Spark SQL Adaptive Execution Introduction Adaptive Execution in Baidu 2 #Exp5SAIS
3 .Tuning Shuffle Partition Number Too small:Spill , OOM Too large:Scheduling overhead. More IO requests. Too many small output files The same shuffle partition number doesn’t fit for all stages 3 #Exp5SAIS
4 .Spark SQL Join Selection A Join may takes intermediate results as inputs. Spark SQL may choose an inefficient join strategy if it doesn’t know the exact size at planning phase. 4 #Exp5SAIS
5 .Data Skew in Join Data in some partitions are extremely larger than other partitions. Data skew is a common source of slowness for shuffle joins. Common ways to solve data skew Increase shuffle partition number Increase BroadcastJoin threashold Add prefix to the skewed keys 5 #Exp5SAIS
6 .Spark SQL Execution Mode 6 #Exp5SAIS
7 .Spark SQL Adaptive Execution Mode 7 #Exp5SAIS
8 .Auto Setting the Reducer Number Enable the feature spark.sql.adaptive.enabled -> true Configure the behavior Target input size for a reduce task Min/Max shuffle partition number 8 #Exp5SAIS Min Shuffle Partition Number Max Shuffle Partition Number Best Shuffle Partition Number
9 .Auto Setting the Reducer Number Target size per reducer = 64 MB. Min-Max shuffle partition number = 1 to 5 9 #Exp5SAIS Adaptive Execution uses 3 reducers at runtime .
10 .Optimize Join Strategy at Runtime 10 #Exp5SAIS
11 .Optimize Join Strategy at Runtime After optimizing SortMergeJoin to BroadcastJoin, each reduce task local read the whole map output file and join with the broadcasted table. 11 #Exp5SAIS Map Task Executor ReduceTask Map Output File
12 .Handle Skewed Join at Runtime spark.sql.adaptive.skewedJoin.enabled -> true A partition is thought as skewed if its data size or row count is N times larger than the median, and also larger than a pre-defined threshold. 12 #Exp5SAIS
13 .Handle Skewed Join at Runtime 13 #Exp5SAIS …… Partition 0 (part0) Partition 0 (part1) Partition 0 (part2) Partition 0 Table 1 Table 2 Join Sort Sort SMJ Sort QS Input QS Input Sort SMJ Union …… QS Input QS Input Table 1 Partition 0 (part0) Table 2 Partition 0 Table 1 Parttition 1-N Table 2 Partition 1-N
14 .Spark in Baidu 14 # Exp5SAIS Spark i mport to Baidu Version: 0.8 2014 2015 2016 2017 2018 Build standalone c luster Integrate with in-house FS\Pub-Sub\DW Version: 1.4 Build Cluster over YARN Integrate with in-house Resource Scheduler System Version: 1.6 SQL\Graph Service over Spark OAP Version: 2.1 Structure Streaming Spark Adaptive Execution Hadoop to Spark Version: 2.2
15 .Specific user scene( SortMergeJoin -> BroadcastJoin ) Long running application or use Spark as a service Graph & ML 15 # Exp5SAIS AE Boosting Scenario in Baidu
16 .Common features in the scenario: Small table join big table in sub query Small table generated by sub query Key Point: Identify & determine ‘ small ’ table Acceleration ratio: 50%~200% 16 SortMergeJoin -> BroadcastJoin # Exp5SAIS
17 .SELECT t.c1 , t.id , t.c2, t.c3, t.c4, sum(t.num1), sum(t.num2), sum(t.num3) FROM ( SELECT c1, t1.id as id, c2, c3, c4, sum(num1s) as num1, sum(num2) as num2, sum(num3) as num3 FROM huge_table1 t1 INNER JOIN user_list t2 ON (t1.id = t2.id) WHERE ( event_day =20171107) and flag != true group by c1, t1.id, c2, c3, c4 UNION ALL SELECT c1, t1.id as id, c2, c3, c4, sum(num1s) as num1, sum(num2) as num2, sum(num3) as num3 FROM huge_table2 t1 INNER JOIN user_list t2 ON (t1.id = t2.id) WHERE ( event_day =20171107) and flag != true group by c1, t1.id, c2, c3, c4 ) t GROUP BY t.c1 , t.id , t.c2, t.c3, c4 17 # Exp5SAIS SortMergeJoin -> BroadcastJoin UserList Base Table1 Base Table2 SubQuery1 SubQuery2 Inner Join Inner Join Result Union & Aggregate
18 .18 Long R unning Application Including scenario: Long running batch job(> 1 hour) Using Spark as a service (Livy\Baidu BigSQL \Spark Shell\Zeppelin) Spark Streaming Key Point: Adaptive parallelism adjustment Acceleration ratio: 50%~100% # Exp5SAIS
19 .19 Long R unning Application Duration: 52min 100 instance 10G executor.mem 4 executor.cores AE enable False Duration: 30min 100 instance 10G executor.mem 4 executor.cores AE enable True Min/ MaxNumPostShufflePartitions 400/10000 targetPostShuffleInputSize 512M # Exp5SAIS
20 .20 GraphFrame & MLlib Including scenario: GraphFrame APP MLlib Key Point: Adaptive parallelism adjustment Acceleration ratio: 50%~100% # Exp5SAIS
21 .21 AE probe in Spark # Exp5SAIS Spark Over YARN NodeManager HostA RigAgent Executor NodeManager HostB RigAgent Executor Host … MetricsSink Spark Application Baidu ShowX Console Baidu Bigpipe Batch Streaming SQL … AE Probe
22 .22 #Exp5SAIS AE probe in Spark
23 .Takeaways Three main features in our adaptive execution Auto setting the shuffle partition number Optimize join strategy at runtime Handle skewed join at runtime For more information about our implementation: https :// issues.apache.org/jira/browse/SPARK-23128 https:// github.com/Intel-bigdata/spark-adaptive 23 #Exp5SAIS
24 .Carson Wang carson.wang@intel.com Yuanjian Li liyuanjian@baidu.com Thank you! #Exp5SAIS