TFPark: Distributed TensorFlow in Production on Apache Spark

直播主题
TFPark: Distributed TensorFlow in Production on Apache Spark

讲师:
汪洋
英特尔大数据团队的机器学习工程师,专注于分布式机器学习框架和应用。他是Analytics Zoo和BigDL的核心贡献者之一。

时间:
7月23日 19:00

观看直播方式:
扫描下方二维码入群,或届时进入直播间(回看链接)
https://developer.aliyun.com/live/43484

直播介绍
TFPark是开源AI平台Analytics Zoo中一个模块,它的可以很方便让用户在Spark集群中分布式地进行TensorFlow模型的训练和推断。一方面,TFPark利用Spark将TensorFlow 定义的AI训练或推理任务无缝的嵌入到用户的大数据流水线中,而无需对现有集群做任何修改;另一方面TFPark屏蔽了复杂的分布式系统逻辑,可以将单机开发的AI应用轻松扩展到几十甚至上百节点上。本次分享将介绍TFPark的使用,内部实现以及在生产环境中的实际案例。

展开查看详情

1. What is Analytics Zoo Distributed, High-Performance Unified Analytics + AI Platform Deep Learning Framework Distributed TensorFlow, Keras, PyTorch and BigDL for Apache Spark on Apache Spark https://github.com/intel-analytics/bigdl https://github.com/intel-analytics/analytics-zoo Accelerating Data Analytics + AI Solutions At Scale

2. TFPark:TensorFlow in Production on Spark Wang, Yang (yang3.wang@intel.com) Machine Learning Engineer at Intel

3. TensorFlow • One of the most popular open source DL frameworks • Large community, many published model • Works on many devices

4.Why Running TensorFlow on Spark 4

5. Data Scale Driving Deep Learning Process 5

6.Machine Learning System Complexity

7.Problems of two clusters 7

8. TFPark Package in Analytics Zoo Running TensorFlow model on Spark with minimal code changes at scale

9. Analytics Zoo: End-to-End DL Pipeline Made Easy for Big Data Prototype on laptop Experiment on clusters Deployment with using sample data with history data production, distributed big data pipelines • “Zero” code change from laptop to distributed cluster • Directly access production data (Hadoop/Hive/HBase) without data copy • Easily prototype the end-to-end pipeline • Seamlessly deployed on production big data clusters

10. Analytics Zoo https://github.com/intel-analytics/analytics-zoo *Other names and brands may be claimed as the property of others.

11. TFPark Package in Analytics Zoo • Seamless Integrate TensorFlow with Spark & BigDL • Better performance with custom built TensorFlow with MKL • Easy to use distributed API, little changes required for your TF models • “Zero” code change from laptop to cluster

12.Comparison with Distributed TF Solutions 12

13. Local TensorFLow(v1.x) import tensorflow as tf import tensorflow_datasets as tfds global_batch_size = 256 global_step = tf.train.get_or_create_global_step() # Dataset ds = tfds.load("mnist", split="train").map(map_func) dataset = ds.repeat().shuffle(1000).batch(batch_size) iter = dataset.make_one_shot_iterator() images, labels = iter.get_next() # Model loss = create_model(images, labels) optimizer = tf.train.GradientDescentOptimizer(.0001) opt = optimizer.minimize(loss, global_step=global_step) # Session stop_hook = tf.train.StopAtStepHook(last_step=2000) hooks = [stop_hook] sess = tf.train.MonitoredTrainingSession(checkpoint_dir='.\checkpoint_dir', hooks=hooks) while not sess.should_stop(): _, r, gs = sess.run([opt, loss, global_step]) print("step {}, loss {}".format(gs, r)) Intel Confidential March 26, 2020

14. Built-in Distributed TF (v1) import tensorflow as tf import tensorflow_datasets as tfds # Model worker_device = "/job:%s/task:%d/cpu:0" % (FLAGS.job_name, FLAGS.task_index) ps_hosts = FLAGS.ps_hosts.split(",") with tf.device(tf.train.replica_device_setter(ps_tasks=1, worker_device=worker_device)): worker_hosts = FLAGS.worker_hosts.split(",") num_task = len(worker_hosts) global_step = tf.train.get_or_create_global_step() global_batch_size = 256 loss = create_model(images, labels) optimizer = tf.train.GradientDescentOptimizer(.0001) # Cluster optimizer = tf.train.SyncReplicasOptimizer(optimizer, cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts}) replicas_to_aggregate=num_task, total_num_replicas=num_task) if FLAGS.job_name == 'ps': # checks if parameter server opt = optimizer.minimize(loss, global_step=global_step) server = tf.train.Server(cluster, job_name="ps", task_index=FLAGS.task_index) server.join() # Session else: # it must be a worker server sync_replicas_hook = optimizer.make_session_run_hook(is_chief) is_chief = (FLAGS.task_index == 0) # checks if this is the chief node stop_hook = tf.train.StopAtStepHook(last_step=2000) server = tf.train.Server(cluster, job_name="worker", task_index=FLAGS.task_index) hooks = [sync_replicas_hook, stop_hook] # Dataset checkpoint_dir = './checkpoints' if is_chief else None ds = tfds.load("mnist", split="train").map(map_func) sess = tf.train.MonitoredTrainingSession(checkpoint_dir=checkpoint_dir, dataset = ds.shard(num_task, FLAGS.task_index). master=server.target, repeat().shuffle(1000). is_chief=is_chief, hooks=hooks, stop_grace_period_secs=10) batch(global_batch_size // num_task) images, labels = dataset.make_one_shot_iterator().get_next() while not sess.should_stop(): _, r, gs = sess.run([opt, loss, global_step]) Intel Confidential March 26, 2020

15. Horovod import tensorflow as tf import horovod.tensorflow as hvd import tensorflow_datasets as tfds hvd.init() global_batch_size = 256 global_step = tf.train.get_or_create_global_step() ds = tfds.load("mnist", split="train").map(map_func) dataset = ds.shard(hvd.size(), hvd.rank()). repeat().shuffle(1000). batch(global_batch_size // num_task) images, labels = dataset.make_one_shot_iterator().get_next() loss = create_model(images, labels) optimizer = tf.train.GradientDescentOptimizer(.0001) optimizer = hvd.DistributedOptimizer(optimizer, op=hvd.Average) opt = optimizer.minimize(loss, global_step=global_step) hooks = [ hvd.BroadcastGlobalVariablesHook(0), tf.train.StopAtStepHook(last_step=2000 // hvd.size()), ] checkpoint_dir = './checkpoints' if hvd.rank() == 0 else None with tf.train.MonitoredTrainingSession(checkpoint_dir=checkpoint_dir,hooks=hooks) as mon_sess: while not mon_sess.should_stop(): _, r, gs = mon_sess.run([opt, loss, global_step]) Intel Confidential March 26, 2020

16. Analytics Zoo (TFPark) import tensorflow as tf import tensorflow_datasets as tfds from zoo.tfpark import TFDataset, TFOptimizer from zoo import init_nncontext sc = init_nncontext() global_batch_size = 256 ds = tfds.load("mnist", split="train").map(map_func) ds = ds.shuffle(1000) dataset = TFDataset.from_tf_data_dataset(ds, batch_size=global_batch_size) images, labels = dataset.tensors loss = create_model(images, labels) optimizer = TFOptimizer.from_loss(loss, SGD(1e-3), model_dir="./checkpoints") optimizer.optimize(end_trigger=MaxIteration(200000)) Intel Confidential March 26, 2020

17. Code Change Summary Modifications to Single Node Program Built-in Distributed TF 1. Specify ps_hosts and worker_hosts, and create tf.train.ClusterSpec object 2. Customize tf.train.Server object for each node 3. Shard data according to worker node number and index using tf.data.dataset 4. Assign operations to devices 5. Replace regular Optimizer with SyncReplicasOptimizer 6. Create tf.train.MonitoredTrainingSession Horovod 1. Run hvd.init() 2. Shard data according to hvd.size() and hvd.rank() using tf.data.dataset 3. Replace regular optimizer with hvd.DistributedOptimizer 4. Add hvd.BroadcastGlobalVariablesHook(0) to broad rank 0’s variables to other nodes 5. Checking hdv.rank() and only make checkpoint and Tensorboard log if rank is 0 Analytics Zoo (TFPark) 1. Run init_nncontext() 2. Create TFDataset from tf.data.dataset or RDD 3. Create TFOptimizer and start training Intel Confidential March 26, 2020

18. Deployment Installation Modification to Cluster Run the program Built-in Distributed TF pip install tensorflow Install on every node Copy program to every node, set environment variables and start program on each node Horovod • pip install horovod • Install on every node Using horovodrun command (on the driver • install openMPI • Driver need to ssh to each machine using ssh to each worker without node without prompt prompt) Analytics Zoo pipe install analytics- Only install on driver Using spark-submit command to the (TFPark) zoo underlying Haoop, Spark or K8s cluster Intel Confidential March 26, 2020

19.Other TFPark Features 19

20. TensorFlow High-Level API (Estimator) https://github.com/intel-analytics/analytics- zoo/blob/master/pyzoo/zoo/examples/tensorflow/tfpark/estimator/pre-made- estimator.py 20

21. TensorFlow High-Level API (Keras) https://github.com/intel-analytics/analytics- zoo/blob/master/pyzoo/zoo/examples/tensorflow/tfpark/keras/keras_ndarray.py 21

22. GAN Model Support https://github.com/intel-analytics/analytics- zoo/tree/master/pyzoo/zoo/examples/tensorflow/tfpark/gan 22

23.Connecting to Various Data Sources (TFDataset) • Driver in memory data, such as numpy.ndarray • Pipeline written as tensorflow.data.Dataset • File data such as text file or image files on both local file system/ hdfs/ s3 • Data represented as Spark RDDs or Spark Dataframes • Any other data sources that can be read using Spark, such as Hbase,Hive, Kafka, or Relational Database 23

24.TFPark Implementation

25. Architecture Overview Executor Executor Spark Job Spark Job Training Training Spark Driver Program Data Partition Data Partition TensorFlow TensorFlow Runtime Gradient Runtime Gradient Parallelize Model Replica Model Replica dataset = TFDataset.from_ndarrays(...) Broadcast keras_model.fit(dataset) Parallelize Executor Executor Spark Job Spark Job Training Training Data Data Broadcast Partition Partition TensorFlow TensorFlow Runtime Gradient Runtime Gradient Model Replica Model Replica Iteration t AllReduce Iteration t + 1 25

26. Synchronous Data Parallelism Worker 1 Worker 2 Worker n Sample Partition 1 Partition 2 Partition n RDD Task 1 Task 2 Task n: zip Sample and model RDDs, and compute gradient on co-located Sample and model partitions Partition 1 Partition 2 Model Partition n RDD “Model Forward-Backward” Job 26

27. All-Reduce Parameter Synchronization local gradient local gradient local gradient 1 2 n 1 2 n 1 2 n ∑ ∑ ∑ gradient 1 1 gradient 2 2 gradient n n update update update weight 1 1 weight 2 2 weight n n Task 1 Task 2 Task n “Parameter Synchronization” Job 27

28.For each Iteration • Job 1: every partition input data and label, output loss and gradients, using TensorFlow • Job 2: all-reduce parameter sync, using BigDL 28

29. Optimize CPU Utilization Tunable parameter Spark Driver • Inter/inter_op_parallelism in each model Program • Model number in each executor • Executor Number Spark Executor Spark Executor Spark Executor Model Model Model Model Model Model Model Model Model Replica Replica Replica Replica Replica Replica Replica Replica Replica Gradient Gradient Gradient Gradient Gradient Gradient Gradient Gradient Gradient Local gradient Local gradient Local gradient aggregate aggregate aggregate All reduce 29