迁移Apache Hive作业到Apache Spark

在本节中,我们首先介绍迁移框架中的新特性和改进,以支持桶形表并提高自动化程度。接下来,我们将深入探讨我们遇到的最高级技术挑战以及我们如何应对这些挑战。通过识别/开发最缺失的特性、修复不兼容的UDF和实现UDF测试框架,我们将Hive和Spark之间的语法兼容性从51%提高到85%。此外,我们开发了可靠的连接操作符,以便在利用诸如ShuffledHashJoin之类的优化时提高生产中的在本节中,我们首先介绍迁移框架中的新特性和改进,以支持桶形表并提高自动化程度。接下来,我们将深入探讨我们遇到的最高级技术挑战以及我们如何应对这些挑战。通过识别/开发最缺失的特性、修复不兼容的UDF和实现UDF测试框架,我们将Hive和Spark之间的语法兼容性从51%提高到85%。此外,我们开发了可靠的连接操作符,以便在利用诸如ShuffledHashJoin之类的优化时提高生产中的Apache Spark 稳定性。
展开查看详情

1. Migrating Apache Hive Workload to Apache Spark: Bridge the Gap Zhan Zhang, Jane Wang, Facebook

2.Overview • Hive to Spark Migration Effort • Narrowing Down Feature Gaps – Regex Column Specification Support. – Local Writes support. – UDFs • Performance and Reliability – Dynamic Join – Bucket Join • Advanced Optimization for Extremely Large Jobs – Secondary Partitioning – Run-time Optimization.

3. Hive to Spark Migration • Why do we migrate workload from hive to Spark – Performance – Identify and narrow down the feature gap.

4. Regex Column Specification Support • One of the most failures in our syntax analysis. • Support regex column specification. – SELECT `(a)?+.+` FROM data table – SELECT t.`(a)?+.+` FROM data table • SPARK-12139 put your #assignedhashtag here by setting the footer in view-header/footer 4

5. Local Filesystem Writes • Support Writing data into the filesystem from queries … – INSERT OVERWRITE LOCAL? DIRECTORY path=STRING rowFormat? createFileFormat? – INSERT OVERWRITE LOCAL? DIRECTORY (path=STRING)? tableProvider (OPTIONS options=tablePropertyList)? 5

6. UDF Support • UDAF_JAVA_F/UDTF_JAVA_F/UDF_JAVA_F • UDF_Bind • UDF_EVAL_F • Non-deterministic Expression • … 6

7. Narrowing Down Feature Gaps - Syntax • Regex Column Specification • Syntax parser improvement • UDF compatibility – Enum value – User defined class type – Lambda function

8.3X Workload Growth in 6 Month Reserved CPU Days CPU Days

9. Joins Broadcast Join ShuffleHash Join SortMerge Join

10. Dynamic Join Start • More aggressively Build Hash table leverage HashJoin Hash No OOM Join • Provide a reliable Ye s fallback mechanim Reconstruct Iterator Sort MergeJoin End

11.Dynamic Join – Physical Plan

12. Bucket Join • Support different number (multiplier) of buckets on left/right side. Bucket 1 Split 1 Bucket 2 Bucket 1 Bucket 3 Bucket 2 Split 2 Split 1 Bucket 4 Bucket 3 Bucket 4 Bucket 1 Split 3 Split 2 Bucket 2 Bucket 1 Bucket 3 Bucket 2 Bucket 4 Split 4

13. Bucket Join Validation • To verify bucket join spark generate consistent result to hive bucket join – Read Spark/Hive Table. – Zip the corresponding splits from spark/hive generated tables. – Compare the sorted column in two splits sequentially. – Sort the bucket column in each split and compare rows in two splits sequentially. 13

14. Challenges in Large Jobs • A large job with 10,000 mapper * 10,000 reducer – IOPS: 100,000,000 – HDFS: 10,000 result files – Scheduling Overhead: 20,000 tasks – Manual Tuning – Data skewness 14

15.Advanced - Secondary Partitioning

16. Pros and Cons • Reduce IOPS • Number of HDFS files • Runtime Optimization • Backward Compatibility – Exactly same behavior with split number = 1 • Auto-Configuration – 503 partitions and 13 buckets to achieve good performance. BUT • Reduced Parallelism • Need to fetch all before computation.

17.Runtime Join Optimization

18. JIRA • SPARK-12139 – REGEX Column Specification for Hive Queries • SPARK-4131 – Support "Writing data into the filesystem from queries” • SPARK-23306 – Race condition in TaskMemoryManager • SPARK-19326 – Speculated task attempts do not get launched in few scenarios • SPARK-19839 – Fix memory leak in BytesToBytesMap

19. ACKNOWLEDGEMENTS The presentation includes the work from the Spark team in Facebook. Thanks for their contribution, esp., Lin Wang, Tejas Patil.

20.Question?