Remote Persistent Memory Extension for Spark shuffle

讲师介绍

张建,英特尔亚太研发有限公司大数据部门的软件工程经理,专注于大数据和机器学习中存储方案优化

议题介绍

Spark中Shuffle密集型的作业的性能通常会受限于Shuffle过程中磁盘I/O和CPU开销。同时,很多工作证明简单的用更快的存储介质和网络来加速Shuffle 很难达到良好的效果。 我们将介绍如何利用持久化内存与高性能RDMA网络来加速Spark Shuffle。Spark Shuffle RPMem扩展提供了一个基于PMem和RDMA来加速Shuffle的方案,它采用PMem作为Shuffle的存储介质,利用PMDK用户态编程库进行数据读写,减小用户态、内核态切换与文件系统开销;用基于RDMA网络协议异构的传输层实现高性能数据传输;还将RDMA直接注册在PMem上,减少内存拷贝。 初步测试结果表明,相对传统Shuffle方案,Spark Shuffle RPMem扩展可以对Shuffle密集型作业带来显著的性能提升和时延下降。

展开查看详情

1.Jian Zhang 张建 Software Engineering Manager September 2020

2.Agenda ▪ Spark in machine learning ▪ Remote Persistent Memory ▪ Spark Shuffle challenges ▪ Remote persistent memory extension for Spark shuffle ▪ Remote persistent memory extension hands-on instructions ▪ Summary ▪ Call to action Optimization Notice Copyright © 2020, Intel Corporation. All rights reserved. 2 *Other names and brands may be claimed as the property of others.

3.

4.Spark usage example in DLRM Deep Learning Recommendation Model for Personalization and Recommendation Systems. ▪ Leverage embeddings to process sparse Features ▪ Adopt multilayer perceptron (MLP) to process dense features ▪ Interact features explicitly using the statistical techniques ▪ Predict the event probability by postprocessing the interactions with another MLP Data Processing: ▪ For numerical features, the data preprocessing steps include filling in missing values with 0 and normalization (shifting the values to be >=1 and taking the natural logarithm). ▪ For categorical features, the preprocessing transforms hashed values into a contiguous range of integers starting at 0. Numpy based data utility vs. SparkSQL ▪ The single threaded numpy takes unnecessarily longer time – runs on a single CPU thread and takes ~5.5 days to transform the whole Criteo Terabyte dataset (1.3TB) * – It takes 4025s for a much smaller dataset (4.3GB raw data) – Using SparkSQL to process the data brings 31.44x speedup Optimize Spark Performance improves end to end Machine Learning pipeline efficiency! Optimization Notice Copyright © 2020, Intel Corporation. All rights reserved. 4 *Other names and brands may be claimed as the property of others.

5.

6. Persistent Memory - A New Memory Tier IDC reports indicated that data is growing very fast ▪ Global datasphere growth rate (CAGR) 27%* ▪ But DRAM density scaling is becoming slower: from 4X/3yr (1997-ish) to 2X/3yr (~1997-2010) to 2X/4yr** (since 2010) ▪ A new memory system will be needed to met the data growth needs for new cases Persistent Memory (PMem): new category that sits between memory and storage ▪ Delivers a unique combination of affordable large capacity and support for data persistence Storage Hierarchy*** *Source: Data Age 2025, sponsored by Seagate with data from IDC Global DataSphere, Nov 2018 **Source: ”3D NAND Technology – Implications for Enterprise Storage Applications” by J.Yoon (IBM), 2015 Flash Memory Summit *** https://www.intel.com/content /www/us/en/product s/docs/mem ory-storage/optane-persistent-mem ory/optane-dc-persistent-mem ory-brief.html Optimization Notice Copyright © 2020, Intel Corporation. All rights reserved. 6 *Other names and brands may be claimed as the property of others.

7.Persistent memory Introduction DRAM, or DDR4 DRAM Intel Optane™ PMem*. DRAM as App Direct Storage cache • Memory Mode Memory • Volatile storage. Provides higher memory capacity. Cache management is handled by processor’s integrated controller. • APP Direct Mode • Non-volatile storage. Application manages its cache data by itself. Read/write bypass page cache. 1 MEMORY mode ● Larger memory at lower cost Cost APP DIRECT ● Low latency persistent memory ● Persistent data for rapid recovery mode ▪ Cost/GB is lower than DRAM. 2 Storage over APP DIRECT ● Fast direct-attach storage Capacity ▪ 128GB, 256GB, 512GB * https://www.intel.com/content/www/us/en/architecture-and-technology/optane-dc- persistent-memory.html Optimization Notice Copyright © 2020, Intel Corporation. All rights reserved. 7 *Other names and brands may be claimed as the property of others.

8. Remote Persistent Memory Usage High Availability Remote PM Shared Remote PM Data Replication • Extend on-node memory • Replicate Data in local PM • PM holds SHARED data capacity (w/ or w/o persistency) across Fabric and Store in among distributed in a disaggregated architecture remote PM applications to enlarge compute node • For backup • Remote Shuffle service, memory IMDB • IMDB https://www.snia.org/sites/default/files/PM-Summit/2018/presentations/05_PM_Summit_Grun_PM_%20Final_Post_CORRECT ED.pdf Optimization Notice Copyright © 2020, Intel Corporation. All rights reserved. 8 *Other names and brands may be claimed as the property of others.

9.Remote Persistent Memory over RDMA Remote Persistent Memory offers RDMA offers ▪ Remote persistence, without losing any of ▪ Moving data between (zero-copy) two system with Volatile characteristic of memory DRAM, offload data movement from CPU to NIC ▪ PM is Fast ▪ Low latency ▪ Needs ultra low-latency networking ▪ Latency < uses ▪ PM has very high bandwidth ▪ High BW ▪ Needs ultra efficient protocol, transport offload, ▪ 200Gb/s, 400Gb/s, zero-copy, kernel bypass, HW high BW offered one side memory to remote memory operations ▪ Remote access must not add significant latency ▪ Reliable credit base data and control delivered by HW ▪ Network resiliency, scale-out ▪ Network switches & adaptors deliver predictability, fairness, zero packet loss Optimization Notice Copyright © 2020, Intel Corporation. All rights reserved. 9 *Other names and brands may be claimed as the property of others.

10. Peer A Peer B Peer B Peer B APP SW RPMem Durability RDMA RNIC RNIC Memory Controller PMEM RDMA Write Posted Write Write RDMA (Non-Allocating) Write RDMA Posted Write Write RDMA Read (Non-Allocating) RDMA Read Flushing Read • RDMA Write Write • Guarantee that Data has been successfully received and accepted for Read execution by the remote HCA Read Data Read ACK • Doesn’t guarantee data has reached remote host memory – need ADR RDMA Read RDMA Read ACK ACK • Doesn’t guarantee the data can be visible/durable for other consumers accesses (other connections, host processor) • Using small RDMA read to forces write data to PMem APP SW Peer A Peer B Peer B Peer B RDMA RNIC RNIC Memory Controller PMEM Write RDMA Posted Write • New transport operation – RDMA FLUSH RDMA Write (Non-Allocating) RDMA Write Write Posted Write • New RDMA command opcode (Non-Allocating) RDMA Flush RDMA Flush • Flush all previous writes or specific regions Flush Flush • Provides memory placement guarantee to the upper layer software • RDMA Flush forces previous RDMA Write data to durability domain • It makes PM operations with RDMA more efficient! Flush Flush Flush Optimization Notice Copyright © 2020, Intel Corporation. All rights reserved. 10 *Other names and brands may be claimed as the property of others.

11.

12.Re-cap of Shuffle Input Intermediate Data Output A HDFS file Each Map’s output Shuffle (Random Partition) A HDFS file 9 2 2 1 2 load 1 1 1 sort 1 1 2 9 2 2 Local 8 1 1 load 5 5 5 5 6 sort 5 8 5 6 Local 6 2 5 load 9 8 2 6 8 sort 9 Local 5 Decompression Compression Decompression Compression Read Remotely via Network Write Local, Spill to Storage Optimization Notice Copyright © 2020, Intel Corporation. All rights reserved. 12 *Other names and brands may be claimed as the property of others.

13.Challenges of Spark shuffle Data Center Infrastructure evolution ▪ Compute and storage disaggregation become a key trend, diskless environment becoming more and more popular ▪ Modern datacenter is evolving: high speed network between compute and disaggregated storage and tiered storage architecture makes local storage less attractive ▪ New storage technologies are emerging, e.g., storage class memory (or PMem) Spark shuffle Challenges ▪ Uneven resource utilization of CPU and Memory ▪ Out of memory issues and GC ▪ Disk I/O too slow ▪ Data spill degrades performance ▪ Shuffle I/O grows quadratically with data ▪ Local SSDs wear out by frequent intermediate data writes ▪ Unaffordable re-compute cost Other related works ▪ Intel Disaggregated shuffle w/ DAOS1, Facebook cosco2, Baidu DCE shuffle 3, JD.com & Memverge RSS 4 and etc. 1. https://www.slideshare.net/databricks/improving-apache -spark-by-taking-advantage-of-disaggregated-architecture 2. https://databricks.com/session/cosco-a n-efficient-facebook-scale-shuffle-service 3. http://apache-spark-developers-list.1001551.n3.nabble.com/Enabling-f ully-disaggregated-shuffle-on-Spark-td28329.html 4. https://databricks.com/session/optimizing -performance -and-computi ng-resource-efficiency -of-i n-mem ory-big-data-a nalytics-with-disaggregated-persi stent -memory Optimization Notice Copyright © 2020, Intel Corporation. All rights reserved. 13 *Other names and brands may be claimed as the property of others.

14.

15. Re-cap: Remote Persistent Memory Extension for Spark shuffle Design Worker Worker Executor JVM #1 Executor JVM #1 Shuffle write Shuffle Manager Shuffle Manager 1 Shuffle read 1 obj Heap obj Heap Shuffle Shuffle Shuffle Shuffle Writer bytebuffer Reader Writer(new) Reader(new) PMEM Off-heap bytebuffer Off-heap 2 User 2 3 User PMEM Kernel Spark.Local.dir Kernel 3 Shuffle file 4 Shuffle file Drivers SSD HDD NIC PMEM RDMA NIC 1. Serialize obj to off-heap memory 1. Serialize obj to off-heap memory 2. Write to local shuffle dir 2. Persistent to PMEM 3. Read from local shuffle dir 3. Read from remote PMEM through RDMA, PMEM 4. Send to remote reader through TCP-IP is used as RDMA memory buffer ➢ Lots of context switch ➢ No context switch ➢ POSIX buffered read/write on shuffle disk ➢ Efficient read/write on PMEM ➢ TCP/IP based socket send for remote shuffle read ➢ RDMA read for remote shuffle read Spark PMoF: https://github.com/intel-bigdata/spark-pmof Strata-ca-2019: https://conferences.oreilly.com/strata/strata-ca-2019/public/schedule/detail/72992 Optimization Notice Copyright © 2020, Intel Corporation. All rights reserved. 15 *Other names and brands may be claimed as the property of others.

16.Network Zero-copy approach ▪ The HPNL buffer allowed to be directly used by application without copying data between HPNL buffer and application buffer. ▪ Supporting user-space to kernel-space zero-copy. Client1 Client2 Client ClientN Threading model ▪ Implements the Proactor model. HPNL interface ▪ Interrupt + polling approach to optimize HPNL thread. ▪ Supports thread binding to specific core. Demultiplexer HPNL interface Epoll + fi_wait ▪ C/C++ and Java interface. ▪ Supports send, receive, remote read, remote write semantics. CQService CQService CQService EQService ▪ Pluggable buffer management interface. Buffer mgr. Buffer mgr. Buffer mgr. User User User ▪ Capable of using Persistent memory as RDMA buffer. callback callback callback ▪ Open Sourced libfabric libfabric libfabric ▪ https://github.com/intel-bigdata/hpnl Core Core Core Server Optimization Notice Copyright © 2020, Intel Corporation. All rights reserved. 16 *Other names and brands may be claimed as the property of others.

17.Storage Spark etc. Database other… Storage Shuffle Plugin ▪A transactional and atomic Key Value Data Store w/ user-space access, designed to fully drive PMem capabilities JVM ▪ Extend: backend adaptor to tier data to different storage media, e.g. DRAM, PMem, SSD, HDD JNI FE Adaptor FE Adaptor Space Management ▪ Provision pmem name space in advance client Native ▪ Leverage a circular buffer to build unidirectional channels for RDMA primitives DRAM (circular buffer) ▪ For Write Native – Libpmemobj converts PMem to flexible object store – Data write to a circular buffer, once hit threshold (4MB by default), DRAM (circular buffer) create a block via libpmemobj on pmem device with memcopy – Append write, only write once Storage – Tiering support planned ▪ For read : PMDK(libpmemobj) – Use memcopy to read the data Persistent Memory Devices – Read through PMem memory directly from RDMA memory region that registered on PMem (avoid DRAM to PMem copies) (devdax/fsdax mode) ▪ No index file, mapping info stored in pmem object metadata ▪ Libpmem based, kernel bypass SSD Optimization Notice Copyright © 2020, Intel Corporation. All rights reserved. 17 *Other names and brands may be claimed as the property of others.

18.Data layout CircularBuffer Client RmaBufferR Read_m Write_ buffer bits egister utex mutex Request Request TaskExecutor Request … RDMA RECV READ ACK CircularBuffer indexmap buffer … Key PMemoid (addr) PUT RecvWorker ReadWorker FinalizeWorker Allocateandwrite PMem PMem Base Pool Bytes_writ Head Tail rwlock en Block Block_hdr data Block Block_hdr data entry Next Pre addr size … entry Next Pre addr size Optimization Notice Copyright © 2020, Intel Corporation. All rights reserved. 18 *Other names and brands may be claimed as the property of others.

19. SPARK PMEM Shuffle Details SortShuffleManager->registerShuffle() Add new shuffle writer for pmem <= bypassMergeThreshold using an optimized serialized shuffle? yes no yes BypassMergeSortShuffleHandle BaseShuffleHandle SerializedShuffleHandle PMoFHandle getWriter() getWriter() getWriter() getWriter() BypassMergeSortShuffleWriter SortShuffleWriter UnsafeShuffleWriter PMoFWriter Optimization Notice Copyright © 2020, Intel Corporation. All rights reserved. 19 *Other names and brands may be claimed as the property of others.

20. Spark-PMoF End-to-End Time Evaluation – TeraSort Workload Spark 550GB TeraSort End-to-End Time (lower is Spark 550GB Data Repartition End-to-End better) Time 100000 9419.3 12277.2 10000 10000 695 1000 374.8 251.4 540.5 Second 1000 Seconds 100 100 10 10 1 1 terasort-hdd terasort-nvme terasort-pmof Data repartition -hdd Data repartition-nvme Data repartition-pmof 1000 tasks Shuffle Read block time Min Median 75th percentile Max terasort-pmof 0ms 1 ms 2 ms 7 ms terasort-hdd 34s 4.2 min 5.3 min 8.3 min terasort-nvme 34ms 3s 4s 11 s RPMem shuffle delivers 22x speedup over HDD Spark PMoF demonstrate 1.29x speed up over 4x P4500 NVMe SSD as shuffle media for TeraSort/PureIO Terasort • If skip timsort (Data repartition), PMoF delivers 1.49x speedup over 4x NVMe SSD as shuffle media • Huge reduction on Shuffle Read Blocked Time (is the time that tasks spent blocked waiting for shuffle data to be read from re mote machines) • Refer to backup for details Performance results are based on testing as of 3/3/2020 and may not reflect all publicly available security updates. See configuration disclosure on slide for details. No product can be absolutely secure. For more complete information about performance and benchmark reaults, visit www.intel.com/benchmarks . Configurations refer to page 42 Optimization Notice Copyright © 2020, Intel Corporation. All rights reserved. 20 *Other names and brands may be claimed as the property of others.

21. Shuffle disk details HDD NVMe PMem HDD shuffle write BW (single HDD) 80000 60000 40000 20000 0 1 1261 1786 1051 1156 1366 1471 1576 1681 421 736 106 211 316 526 631 841 946 rkB/s wkB/s • HDD shuffle: ~25MB/s, HDD is the bottleneck (small random I/O ~100KB for write, ~200KB for read ), throttled by await & util% • NVMe shuffle: Peak Shuffle Read BW: 150MB/s+ for single NVMe, ~500MB/s for single node • PMem shuffle: average 749MB/s, Peak 4GB/s Optimization Notice Copyright © 2020, Intel Corporation. All rights reserved. 21 *Other names and brands may be claimed as the property of others.

22. Profiling Data for TeraSort (Spark-PMoF vs Vanilla Spark on NVMe) CPU utilization Network bandwidth Disk bandwidth Memory Vanilla Spark on NVMe Spark-PMoF Optimization Notice Copyright © 2020, Intel Corporation. All rights reserved. 22 *Other names and brands may be claimed as the property of others.

23. Decision Support workload 99 Queries Execution Time - Spark-PMoF vs Vanilla Spark ▪ Decision support workload is less I/O intensive compared with 4000 Terasort 3000 2000 ▪ Spark-PMoF end-to-end time gains .vs Vanilla Spark 1000 ▪ 3.2x speed up for total execution time of 99 queries 0 17 37 57 77 97 13 21 25 29 33 41 45 49 53 61 65 69 73 81 85 89 93 1 5 9 101 ▪ All IO intensive workloads can be benefit from PMoF Spark-PMoF Vanilla Spark performance improvement. Query 25 End-to-End Time - Spark-PMoF vs Vanilla Spark 1200 1000 800 5x 600 400 200 0 Spark-PMoF Vanilla Spark Performance results are based on testing as of 1/31/2020 and may not reflect all publicly available security updates. See configuration disclosure on slide for details. No product can be absolutely secure. For more complete information about performance and benchmark re sults, visit www.intel.com/benchmarks. Configurations refer to page 42 Optimization Notice Copyright © 2020, Intel Corporation. All rights reserved. 23 *Other names and brands may be claimed as the property of others.

24. Decision Support workload W/ NVMe Spark PmoF vs. Vanilla on HDD vs. Vanilla on Spark PmoF vs. Vanilla on HDD vs. Vanilla PMoF/4NVMe (higher is better) NVMe (lower is better) on NVMe (Normalized, lower is better) 1.400 PMoF vs. 4x NVMe (Normalized) 40000 4.000 1.200 3.172 Total Execution Time of 99 Total Execution Time of 99 30201 1.000 30000 3.000 0.800 20000 2.000 0.600 queries (s) queries (s) 10580 9522 1.111 1.000 0.400 10000 1.000 0.200 0 0.000 0.000 Vanilla Spark Spark-4NVMe Spark-PMoF Vanilla Spark Spark-4NVMe 1 6 111621263136414651566166717681869196 Total Time (s) Normalized Query ID Spark PMoF delivers 1.11x for 99 queries total execution time ▪ Not all queries are shuffle intensive ▪ Up to 1.3x speedup for shuffle intensive queries (Q47) Performance results are based on testing as of 3/3/2020 and may not reflect all publicly available security updates. See configuration disclosure on slide for details. No product can be absolutely secure. For more complete information about performance and benchmark results, visit www.intel.com/benchmarks. Configurations refer to page 42 Optimization Notice Copyright © 2020, Intel Corporation. All rights reserved. 24 *Other names and brands may be claimed as the property of others.

25. Profiling Data for Decision Support Q25(Spark-PMoF vs Vanilla Spark on NVMe) CPU utilization Network bandwidth Disk bandwidth Memory Vanilla Spark on NVMe Spark-PMoF Optimization Notice Copyright © 2020, Intel Corporation. All rights reserved. 25 *Other names and brands may be claimed as the property of others.

26.Extending to fully Disaggregated Shuffle solution Remote Persistent Memory for read demonstrated good results, but what’s more? In real production environment, there are more challenges ▪ Disaggregated, diskless environment ▪ Scale shuffle/compute independently ▪ CPU/Memory unbalanced issue ▪ Some jobs lasts for long time, stage recompute cost is intolerable in case of shuffle failure Elastic Deployment with compute and storage disaggregation requires independent shuffle solution ▪ Decouple shuffle I/O from a specific network/storage is capable of delivering dedicated SLA for critical applications – Fault tolerant in case on shuffle failure, no need for recompute ▪ Offload spill as well, reduced compute memory resource requirements – Balanced resource utilization ▪ Leverage state-of-art storage medium as storage media – To provide high performance, high endurance storage backend Drive the intention to build a RPMem based, fully disaggregated shuffle solution! Optimization Notice Copyright © 2020, Intel Corporation. All rights reserved. 26 *Other names and brands may be claimed as the property of others.

27.RPMPArchitecture Remote Persistent Memory Pool: ▪ A persistent memory based distributed storage system SQL K/V Transactions Streaming Machine Learning ▪ An RDMA powered network library and an innovative approach to use persistent memory as both shuffle media as well as RDMA memory region to reduce Compute Spark Flink Presto additional memory copies and context switches. shuffle data cache ▪ Target as high-performance storage & ephemeral data storage Features Ephemeral Data Storage ▪ High Performance Storage powered by modern HW like PMem and RDMA w/ RPMP Data Lake user-level I/O access DRAM PMem SSD Object Block File ▪ Rich API: Provides memory-like allocate/free/read/write APIs on pooled PMem resources ▪ Pluggable modules: A modular architecture makes it can be plugged into in memory database, Apache Spark (shuffle) & Cache etc. ▪ Heterogeneous tiered storage backend Benefits ▪ Improved scalability of analytics and AI workloads by disaggregating ephemeral data from compute node to a high-performance distributed storage, e.g., Spark shuffle ▪ Improved performance with high speed persistent memory and low latency RDMA network ▪ Improved reliability by providing a manageable and highly available disaggregated storage supports ephemeral data replication and fault-tolerant, e.g. shuffle data to avoid recompute. Optimization Notice Copyright © 2020, Intel Corporation. All rights reserved. 27 *Other names and brands may be claimed as the property of others.

28. RPMP architecture details RPMP (Client) RPMP Client ▪ RPMP client provides transactional Interface tx_alloc/tx_free/tx_read/tx_write/put/get read/write/allocate/free, and obj put/get interfaces to users Network Layer HPNL Encode/Decode Buffer Mgmt ▪ Both cpp and java API are provided ▪ Data will be transferred by HPNL(RDMA) between selected server nodes and client nodes. RPMP (Server) RPMP Server RPMP Proxy ▪ RPMP proxy is used to maintain an unified ActiveNodeMap and distributes Network Layer HPNL Encode/Decode Buffer Mgmt client request among RPMP nodes. ▪ Network Layer is based on HPNL to Controller layer Global Address Mgmt Scheduler Transaction Checksum Accelerator provide RDMA Data transfer. ▪ Controller Layer is responsible for Global Storage Layer Storage Proxy PmemAllocator Address Management, TransactionalProcess, etc. ▪ Storage Layer is responsible for Pmem management using high performance /dev/dax0.0 /dev/dax1.1 /dev/dax2.1 PMem user space PMDK libs /dev/dax0.1 /dev/dax1.0 /dev/dax2.0 Optimization Notice Copyright © 2020, Intel Corporation. All rights reserved. 28 *Other names and brands may be claimed as the property of others.

29. RPMP Read Write Flow Write Read Write Read client client client client DRAM DRAM DRAM DRAM 1 2 4 1 2 1 2 4 1 2 3 3 server server server server server server Proxy 2 DRAM DRAM DRAM DRAM DRAM DRAM 3 3 PMem PMem 3 PMem PMem PMem PMem 1. Write data to specific address. 1. Read data from specific 1. Write data to specific address. 1. Read data from specific address. 2. Server issue RDMA read (client DRAM -> address. 2. RDMA read (client DRAM -> server DRAM), 2. RDMA write (server PMEM -> client server DRAM). 2. RDMA write (server PMEM - Secondary Node DRAM -> Primary Node DRAM). 3. Flush (DRAM -> PMEM). > client DRAM). DRAM) 3. Request ACK. 4. Request ACK. 3. Request ACK. 3. Flush (DRAM -> PMEM). 4. Request ACK. Optimization Notice Copyright © 2020, Intel Corporation. All rights reserved. 29 *Other names and brands may be claimed as the property of others.