讲解了Facebook在spark shuffle方面的优化,相关论文为 EuroSys ’18: Riffle: Optimized Shuffle Service for Large-Scale Data Analytics

feiwang发布于2018/12/05 11:32

注脚

1.

2.SOS: Optimizing Shuffle I/O Brian Cho and Ergin Seyfe, Facebook Haoyu Zhang, Princeton University

3.1. Shuffle I/O at large scale

4.Large-scale shuffle Stage 1 Stage 2 • Shuffle: all-to-all communication between stages • >10x larger than available memory, strong fault tolerance requirements → on-disk shuffle files

5.Shuffle I/O grows quadratically with data ShuIIOe Time I/2 5eTuest SKuffle )etFK Size 5eTuest CRunt / 106 ShuIIOe Time (sec) 4000 1500 120 Size (KB) 3000 1000 2000 80 40 500 1000 0 0 0 0 5000 10000 0 5000 10000 1umber RI TasNs 1umber of TasNs • M * R (number of mappers * number of reducers) shuffle fetches • Large amount of fragmented I/O requests • Adversarial workload for hard drives!

6.Strawman: tune number of tasks in a job • Tasks spill intermediate data to disk if data splits exceed memory capacity • Larger task execution reduces shuffle I/O, but increases spill I/O

7.Strawman: tune number of tasks in a job 6huffle 6Sill 3000 3000 7ime (sec) 2000 2000 1000 0 00 00 00 00 00 00 00 00 00 00 00 00 3 4 5 6 7 8 9 10 20 40 80 00 1 1umber of 0aS 7asNs • Need to retune when input data volume changes for each individual job • Small tasks run into the quadratic I/O problem • Bulky tasks can be detrimental [Dolly NSDI 13] [SparkPerf NSDI 15] [Monotask SOSP 17] • straggler problems, imbalanced workload, garbage collection overhead

8. Large Amount of Small Tasks Fragmented Shuffle I/O Fewer, Sequential Bulky Tasks Shuffle I/O

9.2. SOS: optimizing shuffle I/O

10.SOS: optimizing shuffle I/O a.k.a. Riffle, presented at Eurosys 2018 Deployed at Facebook scale

11.SOS: optimizing shuffle I/O Application Driver • Merge map task outputs into larger Merge Scheduler shuffle files Worker-Side Merger map reduce 1. Combines small shuffle files into larger ones map merge reduce request 2. Keeps partitioned file layout map reduce reduce map • Reducers fetch fewer, large blocks reduce instead of many, small blocks map merge reduce request • Number of requests: map reduce (M * R) / (merge factor) Optimized Shuffle Service

12.SOS: optimizing shuffle I/O Driver assign Task Task WorkerNode Worker Node Tasks Worker Machine Job / Task Task Task Task Task report task Scheduler statuses Executor Executor send merge SOS Merge requests File System Scheduler report merge SOS Shuffle Service statuses • SOS shuffle service: a long running instance on each physical node • SOS scheduler: keeps track of shuffle files and issues merge requests

13.Results on synthetic workload (unoptimized) 5ead BlRcN 6ize 1umber Rf 5eads 0aS SWage 5educe SWage 500 5equesW CRunW 6000 8000 400 6ize (KB) Time (sec) 4500 6000 300 3000 4000 200 1500 2000 100 0 0 0 1R 0erge 5 10 20 40 1R 0erge 5 10 20 40 1-Way 0erge 1-Way 0erge •SOS reduces number of fetch requests by 10x •Reduce stage -393s, map stage +169s → job completes 35% faster

14.Best-effort merge: mixing merged and unmerged files 5ead BlRcN 6ize 1umber Rf 5eads 0aS SWage 5educe SWage 500 5equesW CRunW 6000 8000 400 6ize (KB) Time (sec) 4500 6000 300 3000 4000 200 1500 2000 100 0 0 0 1R 0erge 5 10 20 40 1R 0erge 5 10 20 40 1-Way 0erge 1-Way 0erge Best-effort merge (95%) • Reduce stage -393s, map stage +52s → job completes 53% faster • SOS finishes job with only ~50% of cluster resources!

15.Additional details • Merge operation fault-tolerance • Handled by falling back to the unmerged files • Efficient memory management • Merger read/write large buffers for performance and IO efficiency Buffered Read Block 65-1 Buffered Write Block 65-2 … … … … Block 65 Block 65 Block 65 Block 65-m Merge Block 66 Block 66 … Block 66 Block 66-1 Block 67 Block 67 Block 67 Block 66-2 … … … … Block 66-m

16.3. Deployment and observed gains

17.Deployment • Started staged rollout late last year • Completed in April, running stably for over a month

18.SOS + zstd • Rollout includes zstd compression with SOS • Combined they produce a net gain in IO and Compute efficiency

19.SOS + zstd • Rollout includes zstd compression with SOS • Combined they produce a net gain in IO and Compute efficiency SOS zstd Net Spill I/O Shuffle I/O

20.SOS + zstd • Rollout includes zstd compression with SOS • Combined they produce a net gain in IO and Compute efficiency SOS zstd Net Spill I/O Regression Gain Small Gain Shuffle I/O

21.SOS + zstd • Rollout includes zstd compression with SOS • Combined they produce a net gain in IO and Compute efficiency SOS zstd Net Spill I/O Regression Gain Small Gain Shuffle I/O Gain Small Gain Gain

22.SOS + zstd • Rollout includes zstd compression with SOS • Combined they produce a net gain in IO and Compute efficiency SOS zstd Net Spill I/O Regression Gain Small Gain Shuffle I/O Gain Small Gain Gain SOS zstd Net CPU time Reserved CPU time

23.SOS + zstd • Rollout includes zstd compression with SOS • Combined they produce a net gain in IO and Compute efficiency SOS zstd Net Spill I/O Regression Gain Small Gain Shuffle I/O Gain Small Gain Gain SOS zstd Net CPU time No change Small Regression Small Regression Reserved CPU time

24.SOS + zstd • Rollout includes zstd compression with SOS • Combined they produce a net gain in IO and Compute efficiency SOS zstd Net Spill I/O Regression Gain Small Gain Shuffle I/O Gain Small Gain Gain SOS zstd Net CPU time No change Small Regression Small Regression Reserved CPU time Gain No change Gain

25.IO Gains: Request-level Spark-level I/O requests: number of application-level R/W requests made • 7.5x less

26.IO Gains: Disk-level Disk service time: time spent on disks in the storage system • 2x more efficient

27.IO Gains: Disk-level Disk service time: time spent on Average IO Size: average size disks in the storage system of IO request at the disks • 2x more efficient • 2.5x increase

28.Compute Gains Reserved CPU time: resources allocated for Spark executors Total 10% Gain • CPU time: time spent using CPU à 5% Regression • I/O time: time spent waiting (not using CPU) à 75% Gain Currently working on increasing these gains

29.4. Summary

30.Summary 1) Shuffle at large scale induces large fragmented shuffle I/Os 2) SOS provides a solution to optimize these I/Os 3) SOS deployed and running stably at Facebook scale 4) Observed gains of 2x more efficient I/O which translates to 10% more efficient compute 5) Plan to contribute back to Apache Spark

31.Questions?

32.

user picture

相关Slides

  • 讲解了Facebook在spark shuffle方面的优化,相关论文为 EuroSys ’18: Riffle: Optimized Shuffle Service for Large-Scale Data Analytics

  • Hive作为数据仓库的核心,其元数据管理已经成为大数据领域事实上的标准,各种大数据处理引擎都尝试对其兼容,本文描述社区如何讲Hive服务以及Hive MetaStore服务独立处理,并支持各种权限验证功能。

  • Spark 流式有两套系统:Spark Streaming 和 Structured Streaming。那么这两套系统的区别在哪里呢?以及为什么 Spark 有了 Spark Streaming 还有做 Structured Streaming 呢?我们应该如何去选择呢?

  • MLSQL的文档自助系统 更多信息访问官网: http://www.mlsql.tech