Updates from Project Hydrogen: Unifying State-of-the-Art AI and Big Data in Apac

Project Hydrogen is a major Apache Spark initiative to bring state-of-the-art AI and Big Data solutions together. It contains three major projects: - Barrier execution mode; - Optimized data exchange; - Accelerator-aware scheduling; A basic implementation of barrier execution mode was merged into Apache Spark 2.4.0, and the community is working on the latter two. In this talk, we will present progress updates to Project Hydrogen and discuss the next steps. First, we will review the barrier execution mode implementation from Spark 2.4.0. It enables developers to embed distributed training jobs properly on a Spark cluster. We will demonstrate distributed AI integrations built on top it, e.g., Horovod and Distributed TensorFlow. We will also discuss the technical challenges to implement those integrations and future work. Second, we will outline on-going work for optimized data exchange. Its target scenario is distributed model inference. We will present how we do performance testing/profiling, where the bottlenecks are, and how to improve the overall throughput on Spark. If time allows, we might also give updates on accelerator-aware scheduling.

1.Updates from Project Hydrogen: Unifying State-of-the-Art AI and Big Data in Apache Spark Xiangrui Meng, Databricks #UnifiedAnalytics #SparkAISummit

2.About me ● Software Engineer at Databricks ○ machine learning and data science/engineering ● Committer and PMC member of Apache Spark ○ MLlib, SparkR, PySpark, Spark Packages, etc 2

3.About Project Hydrogen Announced last June, Project Hydrogen is a major Spark initiative to unify state-of-the-art AI and big data workloads. Barrier Optimized Accelerator Execution Data Aware Mode Exchange Scheduling 3

4.Why Spark + AI? 4

5. Apache Spark: The First Unified Analytics Engine Runtime Delta Spark Core Engine Big Data Processing Machine Learning ETL + SQL +Streaming MLlib + SparkR 5

6.AI is re-shaping the world Huge disruptive innovations are affecting most enterprises on the planet Healthcare and Genomics Fraud Prevention Digital Personalization Internet of Things and many more... 6

7.Better AI needs more data 7

8.When AI goes distributed ... When datasets get bigger and bigger, we see more and more distributed training scenarios and open-source offerings, e.g., distributed TensorFlow, Horovod, and distributed MXNet. This is where Spark and AI cross. 8

9.Why Project Hydrogen? 9

10.Two simple stories As a data scientist, I can: ● build a pipeline that fetches training events from a production data warehouse and trains a DL model in parallel; ● apply a trained DL model to a distributed stream of events and enrich it with predicted labels. 10

11.Distributed training load fit model data warehouse Required: Be able to read from Required: distributed GPU cluster Databricks Delta, Parquet, for fast training MySQL, Hive, etc. Answer: Horovod, Distributed Answer: Apache Spark Tensorflow, etc 11

12.Two separate data and AI clusters? load using a Spark fit on a GPU model cluster save data cluster required: glue code 12

13.Streaming model inference load predict model Kafka required: ● save to stream sink ● GPU for fast inference 13

14.A hybrid Spark and AI cluster? fit a model load using a Spark distributedly model cluster w/ GPUs on the same cluster load using a Spark predict w/ GPUs as model cluster w/ GPUs a Spark task 14

15.Unfortunately, it doesn’t work out of the box. See a previous demo.

16.Project Hydrogen to fill the major gaps Barrier Optimized Accelerator Execution Data Aware Mode Exchange Scheduling 16

17.Updates from Project Hydrogen As a Spark contributor, I want to present: ● what features from Project Hydrogen are available, ● what features are in development. As a Databricks engineer, I want to share: ● how we utilized features from Project Hydrogen, ● lessons learned and best practices. 17

18. Story #1: Distributed training fit a model load using a Spark distributedly model cluster w/ GPUs on the same cluster 18

19.Project Hydrogen: barrier execution mode Barrier Optimized Accelerator Execution Data Aware Mode Exchange Scheduling 19

20.Different execution models Task 1 Spark (MapReduce) Task 2 Tasks are independent of each other Task 3 Embarrassingly parallel & massively scalable Task 1 Distributed training Complete coordination among tasks Optimized for communication Task 2 Task 3 20

21.Barrier execution mode We introduced gang scheduling to Spark on top of MapReduce execution model. So a distributed DL job can run as a Spark job. ● It starts all tasks together. ● It provides sufficient info and tooling to run a hybrid distributed job. ● It cancels and restarts all tasks in case of failures. JIRA: SPARK-24374 (Spark 2.4) 21

22.API: RDD.barrier() RDD.barrier() tells Spark to launch the tasks together. rdd.barrier().mapPartitions { iter => val context = BarrierTaskContext.get() ... } 22

23.API: context.barrier() context.barrier() places a global barrier and waits until all tasks in this stage hit this barrier. val context = BarrierTaskContext.get() … // preparation context.barrier() 23

24.API: context.getTaskInfos() context.getTaskInfos() returns info about all tasks in this stage. if (context.partitionId == 0) { val addrs = context.getTaskInfos().map(_.address) ... // start a hybrid training job, e.g., via MPI } context.barrier() // wait until training finishes 24

25.Barrier mode integration 25

26.Horovod (an LF AI hosted project) Horovod is a distributed training framework for TensorFlow, Keras, PyTorch, and MXNet. It is originally developed at Uber, now an LF AI hosted project at Linux Foundation. ● Little modification to single-node code. ● High-performance I/O via MPI and NCCL. ● Same convergence theory. Some limitation: ● Before v0.16, user still needs to use mpirun to launch a job, ● … with a python training script: mpirun -np 16 -H server1:4,server2:4,server3:4,server4:4 -bind-to none -map-by slot -x NCCL_DEBUG=INFO -x LD_LIBRARY_PATH -x PATH -mca pml ob1 -mca btl ^openib python train.py 26

27.Hydrogen integration with Horovod Databricks released HorovodRunner w/ Runtime 5.0 ML built on top of Horovod and Project Hydrogen. ● Runs Horovod under barrier execution mode. ● Hides cluster setup, scripts, MPI command line from users. def train_hvd(): hvd.init() … # train using Horovod HorovodRunner(np=2).run(train_hvd) 27

28.Implementation of HorovodRunner Integrating Horovod with barrier mode is straightforward: ● Pickle and broadcast the train function. ○ Inspect code and warn users about potential issues. ● Launch a Spark job in barrier execution mode. ● In the first executor, use worker addresses to launch the Horovod MPI job. ● Terminate Horovod if the Spark job got cancelled. ○ Hint: PR_SET_PDEATHSIG Limitation: ● Tailored for Databricks Runtime ML ○ Horovod built with TensorFlow/PyTorch, SSH, OpenMPI, NCCL, etc. ○ Spark 2.4, GPU cluster configuration, etc. 28

29.horovod.spark horovod.spark is a new feature in Horovod 0.16 release. Similar to HorovodRunner, it runs Horovod as a Spark job and takes python train functions. Its assumption is more general: ● no dependency on SSH, ● system-independent process termination, ● multiple Spark versions, ● and more … also check out horovodrun:) 29

由Apache Spark PMC & Committers发起。致力于发布与传播Apache Spark + AI技术,生态,最佳实践,前沿信息。