- 快召唤伙伴们来围观吧
- 微博 QQ QQ空间 贴吧
- 文档嵌入链接
- 复制
- 微信扫一扫分享
- 已成功复制到剪贴板
Spark SQL自适应执行释放大规模集群的力量
展开查看详情
1 .Spark SQL Adaptive Execution Unleashes The Power of Cluster in Large Scale Carson Wang (Intel), Yuanjian Li (Baidu) #Exp5SAIS
2 .Agenda • Challenges in Using Spark SQL • Adaptive Execution Introduction • Adaptive Execution in Baidu #Exp5SAIS 2
3 .Tuning Shuffle Partition Number • Too small Spill, OOM • Too large Scheduling overhead. More IO requests. Too many small output files • The same shuffle partition number doesn’t fit for all stages #Exp5SAIS 3
4 .Spark SQL Join Selection • A Join may takes intermediate results as inputs. Spark SQL may choose an inefficient join strategy if it doesn’t know the exact size at planning phase. #Exp5SAIS 4
5 .Data Skew in Join • Data in some partitions are extremely larger than other partitions. Data skew is a common source of slowness for shuffle joins. • Common ways to solve data skew – Increase shuffle partition number – Increase BroadcastJoin threashold – Add prefix to the skewed keys #Exp5SAIS 5
6 .Spark SQL Execution Mode #Exp5SAIS 6
7 .Spark SQL Adaptive Execution Mode #Exp5SAIS 7
8 .Auto Setting the Reducer Number • Enable the feature – spark.sql.adaptive.enabled -> true • Configure the behavior – Target input size for a reduce task – Min/Max shuffle partition number Best Shuffle Partition Number Min Shuffle Partition Number Max Shuffle Partition Number #Exp5SAIS 8
9 .Auto Setting the Reducer Number • Target size per reducer = 64 MB. • Min-Max shuffle partition number = 1 to 5 ShuffledRowRDD ShuffledRowRDD Partition 0 (70MB) Partition 0 (70MB) Adaptive Execution Partition 1 (30MB) Partition 1 (30MB) uses 3 reducers at Partition 2 (20MB ) Partition 2 (20MB) Partition 3 (10MB) runtime. Partition 3 (10MB) Partition 4 (50MB) Partition 4 (50MB) #Exp5SAIS 9
10 .Optimize Join Strategy at Runtime #Exp5SAIS 10
11 .Optimize Join Strategy at Runtime Map Task • After optimizing SortMergeJoin to ReduceTask BroadcastJoin, each Executor reduce task local read the whole map output Map Output File file and join with the broadcasted table. #Exp5SAIS 11
12 .Handle Skewed Join at Runtime • spark.sql.adaptive.skewedJoin.enabled -> true • A partition is thought as skewed if its data size or row count is N times larger than the median, and also larger than a pre-defined threshold. #Exp5SAIS 12
13 .Handle Skewed Join at Runtime Table 1 Union Table 2 Partition 0 (part0) SMJ SMJ Partition 0 Join Partition 0 (part1) Sort Sort Sort Sort Partition 0 (part2) …… QS QS QS QS …… Input Input Input Input Table 1 Table 2 Table 1 Table 2 Parttition 1-N Partition 1-N Partition 0 (part0) Partition 0 #Exp5SAIS 13
14 . Spark in Baidu 20000 18000 15000 10000 9500 6500 5800 5000 3000 1000 1500 0 80 50 300 Nodes Jobs/day 2014 2015 2016 2017 2018 • Build Cluster • Build standalone over YARN • Structure cluster • Integrate with • SQL\Graph • Spark import Streaming • Integrate with in-house Service over • Spark Adaptive to Baidu in-house Spark Resource Execution • Version: 0.8 FS\Pub-Sub\DW • OAP Scheduler • Hadoop to Spark • Version: 1.4 System • Version: 2.1 • Version: 2.2 • Version: 1.6 #Exp5SAIS 14
15 .AE Boosting Scenario in Baidu • Specific user scene(SortMergeJoin -> BroadcastJoin) • Long running application or use Spark as a service • Graph & ML #Exp5SAIS 15
16 .SortMergeJoin -> BroadcastJoin • Common features in the scenario: – Small table join big table in sub query – Small table generated by sub query • Key Point: – Identify & determine ‘small’ table • Acceleration ratio: – 50%~200% #Exp5SAIS 16
17 .SortMergeJoin -> BroadcastJoin SELECT t.c1, t.id, t.c2, t.c3, t.c4, sum(t.num1), sum(t.num2), Base Base sum(t.num3) FROM UserList Table1 Table2 ( SELECT c1, t1.id as id, c2, c3, c4, sum(num1s) as num1, sum (num2) as num2, sum(num3) as num3 FROM huge_table1 t1 INNER JOIN user_list t2 ON (t1.id = t2.id) WHERE (event_da Inner Join Inner Join y=20171107) and flag != 'true' group by c1, t1.id, c2, c3, c4 UNION ALL SELECT c1, t1.id as id, c2, c3, c4, sum(num1s) as num1, sum SubQuery1 SubQuery2 (num2) as num2, sum(num3) as num3 FROM huge_table2 t1 INNER JOIN user_list t2 ON (t1.id = t2.id) WHERE (event_da y=20171107) and flag != 'true' group by c1, t1.id, c2, c3, c4 Union & Aggregate )t GROUP BY t.c1, t.id, t.c2, t.c3, c4 Result #Exp5SAIS #Exp5SAIS 17
18 .Long Running Application • Including scenario: – Long running batch job(> 1 hour) – Using Spark as a service • (Livy\Baidu BigSQL\Spark Shell\Zeppelin) – Spark Streaming • Key Point: – Adaptive parallelism adjustment • Acceleration ratio: – 50%~100% #Exp5SAIS 18
19 .Long Running Application Duration: 52min 100 instance 10G executor.mem 4 executor.cores AE enable False Duration: 30min 100 instance 10G executor.mem 4 executor.cores AE enable True Min/MaxNumPostShufflePa rtitions 400/10000 targetPostShuffleInputSize 512M #Exp5SAIS 19
20 .GraphFrame & MLlib • Including scenario: – GraphFrame APP – MLlib • Key Point: – Adaptive parallelism adjustment • Acceleration ratio: – 50%~100% #Exp5SAIS 20
21 .AE probe in Spark Baidu ShowX Console AE Probe Batch Streaming SQL … Spark Application Executor Executor MetricsSink NodeManager NodeManager RigAgent RigAgent HostA HostB Host… Spark Over YARN Baidu Bigpipe #Exp5SAIS 21
22 .AE probe in Spark #Exp5SAIS 22
23 .Takeaways • Three main features in our adaptive execution – Auto setting the shuffle partition number – Optimize join strategy at runtime – Handle skewed join at runtime • For more information about our implementation: – https://issues.apache.org/jira/browse/SPARK-23128 – https://github.com/Intel-bigdata/spark-adaptive #Exp5SAIS 23
24 .Thank you! Carson Wang carson.wang@intel.com Yuanjian Li liyuanjian@baidu.com #Exp5SAIS