- 快召唤伙伴们来围观吧
- 微博 QQ QQ空间 贴吧
- 文档嵌入链接
- 复制
- 微信扫一扫分享
- 已成功复制到剪贴板
Optimizing Performance and Computing Resource Efficiency of In-Memory Big Data A
展开查看详情
1 . Optimizing Computing Cluster Resource Utilization with Disaggregated Persistent Memory Zhen Fan, JD.com Yue Li, MemVerge #UnifiedAnalytics #SparkAISummit
2 .Agenda Motivation • Production Environment — tens of thousands of servers • Uneven computing resource utilization in production data center • Independent scaling of compute and storage Extending Spark with external storage • Current: JD’s remote shuffle service (RSS) • The next generation: disaggregated persistent memory Performance evaluation Conclusion #UnifiedAnalytics #SparkAISummit 2
3 . Computing Resource Utilization of a Production Cluster 700.00 160100 140100 600.00 CPU (Cores) Memory (TB) 120100 500.00 100100 80100 400.00 60100 300.00 40100 200.00 20100 100 100.00 30 53 15 38 0 3 5 8 0 3 5 30 53 15 38 0 3 5 8 0 3 5 :0 :2 :4 :0 :3 :5 :1 :0 :2 :4 :0 :3 :5 :1 8: 8: 9: 9: 8: 8: 9: 9: 10 10 10 11 11 11 12 10 10 10 11 11 11 12 Time Time Allocated VCores Total VCores Allocated Memory Total Memory • 3,700 servers from 8:30 AM to 12:30 PM • Memory is at high level all time #UnifiedAnalytics #SparkAISummit 3
4 .Uneven Computing Resource Utilization 60.00% Memory Overhead(%) 50.00% 40.00% 30.00% 20.00% 10.00% 0.00% -10.00% 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 Day Memory overhead of a production computing cluster with 3,700 servers - MemoryOverhead = MemoryUtilization(%) – CPUUtilization(%) • Uneven computing resource utilization between CPU and memory • Thousands of servers – impossible to change hardware • Spark applications centric data center – need more memory #UnifiedAnalytics #SparkAISummit 4
5 . High Memory Demand from Spark Tasks 6000 Execution Time (s) 5000 4000 3000 2000 1000 0 200 400 600 800 1000 1200 1400 Memory Utilization (GB) • Spark performance highly depends on the capacity of available memory • Machine learning and iterative jobs • Cache and execution memory #UnifiedAnalytics #SparkAISummit 5
6 .Too Much Shuffle Data to Store • Shuffle-heavy applications are very common in production • Local HDD − slow under high pressure − Intolerable random I/O when shuffle data heavy − easily broken in production and no replicas #UnifiedAnalytics #SparkAISummit 6
7 .Too Costly to Fail • Stage-recompute is a disaster to SLA • We need shuffle storage that is manageable and highly available #UnifiedAnalytics #SparkAISummit 7
8 .Current Solution • Related discussion – Spark JIRA-25299 • Separate storage from compute – Shuffle to external storage • The JD remote shuffle service (RSS) − More effective and more stable for shuffle-heavy applications #UnifiedAnalytics #SparkAISummit 8
9 .Workflow of JD Remote Shuffle Service • RemoteShuffleManager #UnifiedAnalytics #SparkAISummit 9
10 .Writer & RSS Implementation Shuffle Writer: • Add metadata header and use protobuf to encode the sending blocks • Partition group for effective merge in RSS • Ack to guarantee blocks saved in HDFS • The data path can introduce dirty data — deduplication at reducer side RSS: • Merge blocks of the same partition group in memory • trade-off between merge buffer size and the lingered time • Flush the merged buffer in synchronization • Many other details about controlling the data flow #UnifiedAnalytics #SparkAISummit 10
11 .Reducer Side Implementation • Reduce task fetches related HDFS file(s) - Files: if one crashes, other RSS make a new file for the this partition group • Extract the key info from driver mapStatuses - For deduplication • Skip the partitions that is not relevant • Deduplication - with metadata in blocks and mapStatus • Throttle the input streams … #UnifiedAnalytics #SparkAISummit 11
12 .Use case – JD Data Warehouse Workload #UnifiedAnalytics #SparkAISummit 12
13 .Use case – JD Data Warehouse Workload #UnifiedAnalytics #SparkAISummit 13
14 .To Improve Performance … • Storage – HDFS might not be performant enough? • Network – Netty might introduce bottleneck? • We need better tools that help us explore – Different storage backend – Different network transports #UnifiedAnalytics #SparkAISummit 14
15 . The Splash Shuffle Manager • A flexible shuffle manager • Supports user-defined storage backend and network transport for shuffle • Works with vanilla Spark • JD-RSS uses HDFS-based backend • Open source • https://github.com/MemVerge/splash • Sending shuffle states to external storage • Shuffle index • Data partition • Spill #UnifiedAnalytics #SparkAISummit 15
16 .The Next Generation Architecture • JD remote shuffle service (RSS) – Only supports shuffle workload – Uses external storage that is relatively slow (HDFS) • A more general framework that separates compute and storage – Shuffle – RDD caching – Goal: faster, reliable and elastic • Our solution • Memory extension via external memory pool #UnifiedAnalytics #SparkAISummit 16
17 .Extending Spark Memory via Remote Memory RDD caching External Memory Pool Shuffle • High capacity • High performance • High endurance Spark Memory Management Model • Affordable Image source: https://0x0fff.com/spark-memory-management/ #UnifiedAnalytics #SparkAISummit 17
18 . Intel Optane DC Persistent Memory: An Excellent Candidate • Jointly developed by Intel and Micron • High density (up to 6TB per 2-socket server) • High endurance • Low latency: ns level • Byte-addressable, can be used as main memory • Non-volatile, can be used as primary storage • Half the price of DRAM #UnifiedAnalytics #SparkAISummit 18
19 .Spark with Disaggregated Persistent Memory Fast network External Extended TOR Memory Node RDMA RDMA pmem Node #1 Node #N DDR-4 DDR-4 DDR-4 Persistence, Spark Executor Spark Executor Shuffling &Spill Data Persistence, Persistence, Shuffling & Spill Shuffling & Spill Spark Executor Data Data Spark Compute Node Spark Compute Node #UnifiedAnalytics #SparkAISummit 19
20 .Summary Spark with disaggregated persistent memory • A dedicated remote PMEM pool • Easier to manage • Affordable • Minimal changes • No change for existing computing nodes • No change for user application • Minimal at Spark level • Highly elastic • Computing nodes become stateless • Highly performant #UnifiedAnalytics #SparkAISummit 20
21 .Workloads TeraSort • A synthetic shuffle intensive benchmark well suited to evaluate the I/O performance Core service of data warehouse application • Spark-SQL • A core data warehouse task at JD.com, supporting business decision • I/O-intensive, based on select, insert, and full outer join operations Anti-Price-Crawling Service • A core analytic task of JD.com that defends against coordinated price crawling • It is both I/O intensive and computing intensive #UnifiedAnalytics #SparkAISummit 21
22 . Workload Characteristics Data Warehouse Anti-Price- TeraSort Crawling Input Data size 600 GB 200 GB 726.9 GB Cached RDD size N/A N/A 349 GB Shuffle size 334.2 GB 234.7 GB 57.6 GB #UnifiedAnalytics #SparkAISummit 22
23 . Experimental Setup Ethernet Switch • 10 Spark computing nodes • 1 external persistent memory node PMEM Pool • Spark 2.3 with standalone mode • HDFS 2.7 #UnifiedAnalytics #SparkAISummit 23
24 .Performance – TeraSort 1750 Spark Low executor memory scenario: Execution Time (s) • Execution time ⇓300-400s (31%) 1600 Optimal Spark 1450 Spark w remote pmem Medium executor memory scenario: 1300 • Execution time ⇓300-500s (38%) 1150 1000 Large executor memory scenario: 850 • Execution time ⇓450s (35%) 700 400 600 800 1000 1200 1400 1600 Memory Utilization (GB) Low Medium Large #UnifiedAnalytics #SparkAISummit 24
25 .Performance – Data Warehouse Low executor memory scenario: 1600 Spark Execution Time (s) • Execution time ⇓700-800s (58%) 1400 Optimal Spark 1200 Spark w remote pmem Medium executor memory scenario: • Execution time ⇓550-650s (54%) 1000 800 Large executor memory scenario: 600 • Execution time ⇓350-400s (42%) 400 100 300 500 700 900 1100 1300 1500 1700 Memory Utilization (GB) Low Medium Large #UnifiedAnalytics #SparkAISummit 25
26 . Performance – Anti-Price-Crawling Low executor memory scenario: 6000 Spark Execution Time (s) • Execution time ⇓500s (17%) 5000 Optimal Spark Spark w remote pmem Medium executor memory scenario: 4000 • Execution time ⇓1200s (40%) 3000 2000 Large executor memory scenario: • Execution time ⇑0-60s (4%) 1000 • Baseline → process local caching data 0 200 400 600 800 1000 1200 1400 1600 1800 • Spark w remote PMEM→ remote PMEM Memory Utilization (GB) caching data Low Medium Large #UnifiedAnalytics #SparkAISummit 26
27 .Conclusion • The separation of compute and storage – JD-RSS • Shuffle to external storage pool based on HDFS – Disaggregated persistent memory pool • Storage memory extension for shuffle and RDD caching – Unified under MemVerge Splash Shuffle Manager • Better elasticity and reliability • High capacity, high performance and affordable • Persistent memory will bring fundamental changes to Big Data infrastructure #UnifiedAnalytics #SparkAISummit 27
28 .DON’T FORGET TO RATE AND REVIEW THE SESSIONS SEARCH SPARK + AI SUMMIT