Analytics Zoo: Unifying Big Data Analytics and AI for Apache Spark

Analytics-Zoo是基于Apache Spark的分布式TensorFlow, Keras 和BigDL的大数据分析和AI平台。(。它为Spark提供了深入学习功能的原生支持,同时为现成的使用单节点志强Xeon CPU的开源深度学习框架(如Caffe和Torch)带来了数量级的性能速度提升,并为它们提供了基于Spark架构的对深度学习任务的高效的水平扩展的能力;此外,它还允许数据科学家使用熟悉的工具(包括Python和Notebook等)来对大数据进行分布式深度学习分析。在这篇文档里包括了大数据用户和数据科学家如何使用Analytics-Zoo以分布式方式对海量数据进行深度学习分析(如图像识别、对象检测、NLP等)。

1.Analytics Zoo: Unifying Big Data Analytics and AI for Apache Spark


3.Data Scale Driving Deep Learning Process “Machine Learning Yearning”, “Hidden Technical Debt in Machine Learning Systems”, Andrew Ng, 2016 Google, NIPS 2015 paper

4. Chasm b/w Deep Learning and Big Data Communities The Chasm Deep learning experts Average users (Big Data users, data scientists, analysts, etc.)

5.Large-Scale Image Recognition at

6.Unified Big Data Analytics Platform Hadoop & Spark Platform Machine Graph Leaning SQL Notebook Spreadsheet Analytics Batch Streaming Interactive R Java Python DataFrame Flink Storm Data ML Pipelines Processing SQL SparkR Streaming MLlib GraphX & Analysis MR Giraph Spark Core Resource Mgmt YARN ZooKeeper & Co-ordination Data Flume Kafka Storage HDFS Parquet Avro HBase Input


8. BigDL Bringing Deep Learning To Big Data Platform • Distributed deep learning framework for Apache Spark* • Make deep learning more accessible to big data users and data scientists • Write deep learning applications as standard Spark programs DataFrame • Run on existing Spark/Hadoop clusters (no changes needed) ML Pipeline • Feature parity with popular deep learning frameworks SQL SparkR Streaming • E.g., Caffe, Torch, Tensorflow, etc. MLlib GraphX • High performance (on CPU) Spark Core • Powered by Intel MKL and multi-threaded programming • Efficient scale-out • Leveraging Spark for distributed training & inference

9. BigDL Run as Standard Spark Programs Standard Spark jobs • No changes to the Spark or Hadoop clusters needed Iterative • Each iteration of the training runs as a Spark job Data parallel • Each Spark task runs the same model on a subset of the data (batch) BigDL Program Worker Worker Worker Spark Spark BigDL lib DL App on Driver Executor Task Intel MKL (JVM) Spark BigDL Spark Standard Program library jobs Spark jobs Worker Worker Worker Spark Spark BigDL lib Intel MKL Executor Task (JVM)

10. Data Parallel Training Worker 1 Worker 2 Worker n Sample Partition 1 Partition 2 Partition n RDD Task 1 Task 2 Task n: zip Sample and model RDDs, and compute gradient on co-located Sample and model partitions Partition 1 Partition 2 Model Partition n RDD “Model Forward-Backward” Job

11. Distributed Training in BigDL Data Parallel, Synchronous Mini-Batch SGD Prepare training data as an RDD of Samples Construct an RDD of models (each being a replica of the original model) for (i <- 1 to N) { //”model forward-backward” job for each task in the Spark job: read the latest weights get a random batch of data from local Sample partition compute errors (forward on local model replica) compute gradients (backward on local model replica) //”parameter synchronization” job aggregate (sum) all the gradients update the weights per specified optimization method }

12. Network and Memory Bottlenecks • Large-scale problem settings Training – Model size: 100s of millions ~ billions unique features Set – Weight (W) and gradient (G) are both double vector, one entry for each unique Partition 1 feature Worker – Training data: billions ~ trillions training samples Training samples Sample – Partitioned & cached across workers cached in worker memory 2 1 3 Partition 2 Worker Driver 1 Sample Broadcast W Each task (>800MB) to each 2 4 computes G worker in each (>800MB) in each 3 iteration iteration 1 … Partition n Worker Each task sends G Sample (>800MB) for aggregation in each 3 2 iteration

13. Parameter Synchronization local gradient local gradient local gradient 1 2 n 1 2 n 1 2 n ∑ ∑ ∑ gradient 1 1 gradient 2 2 gradient n n update update update weight 1 1 weight 2 2 weight n n Task 1 Task 2 Task n “Parameter Synchronization” Job

14. Training Scalability Throughput of ImageNet Inception v1 training (w/ BigDL 0.3.0 and dual-socket Intel Broadwell 2.1 GHz); the throughput scales almost linear up to 128 nodes (and continue to scale reasonably up to 256 nodes). Source: Scalable Deep Learning with BigDL on the Urika-XC Software Suite (

15. Analytics Zoo Build E2E Deep Learning Applications for Big Data at Scale Analytics + AI Platform for Apache Spark and BigDL • Anomaly detection, sentiment analysis, fraud detection, image Reference Use Cases generation, etc. • Image classification, object detection, text classification, Built-In Deep Learning Models recommendations, etc. Feature transformations for Feature Engineering • Image, text, 3D imaging, time series, speech, etc. • Native deep learning support in Spark DataFrames and ML Pipelines High-Level Pipeline APIs • Autograd, Keras and transfer learning APIs for model definition • Model serving API for model serving/inference pipelines Backbends Spark, BigDL, TensorFlow, Python, etc.

16. Models Interoperability Support (e.g., between TensorFlow, Keras, Caffe, Torch, BigDL models) • Load existing TensorFlow, Keras, Caffe, Torch Model • Useful for inference and model fine-tuning • Allows for transition from single-node for distributed application deployment • Allows for model sharing between data scientists and production engineers

17. Industrial Inspection Platform in Midea* and KUKA* distributed-tensorflow-on-analytics

18.TensorFlow Object Detection: SSDLite+MobileNet V2

19. More details

20. More details

21. Feature Engineering 1. Read images into local or distributed ImageSet from zoo.common.nncontext import * from zoo.feature.image import * spark = init_nncontext() local_image_set = distributed_image_set =, spark, 2) 2. Image augmentations using built-in ImageProcessing operations transformer = ChainedPreprocessing([ImageBytesToMat(), ImageColorJitter(), ImageExpand(max_expand_ratio=2.0), ImageResize(300, 300, -1), ImageHFlip()]) new_local_image_set = transformer(local_image_set) new_distributed_image_set = transformer(distributed_image_set) Image Augmentations Using Built-in Image Transformations (w/ OpenCV on Spark)

22.Autograd, Keras &Transfer Learning APIs 1. Use transfer learning APIs to • Load an existing Caffe model • Remove last few layers • Freeze first few layers • Append a few layers from import * full_model = Net.load_caffe(def_path, model_path) # Remove layers after pool5 model = full_model.new_graph(outputs=["pool5"]).to_keras()) # freeze layers from input to res4f inclusive model.freeze_up_to(["res4f"]) # append a few layers image = Input(name="input", shape=(3, 224, 224)) resnet = model.to_keras()(image) resnet50 = Flatten()(resnet) Build Siamese Network Using Transfer Learning

23.Autograd, Keras &Transfer Learning APIs 2. Use autograd and Keras-style APIs to build the Siamese Network import zoo.pipeline.api.autograd as A from zoo.pipeline.api.keras.layers import * from zoo.pipeline.api.keras.models import * input = Input(shape=[2, 3, 226, 226]) features = TimeDistributed(layer=resnet50)(input) f1 = features.index_select(1, 0) #image1 f2 = features.index_select(1, 1) #image2 diff = A.abs(f1 - f2) fc = Dense(1)(diff) output = Activation("sigmoid")(fc) model = Model(input, output) Build Siamese Network Using Transfer Learning

24. nnframes Native DL support in Spark DataFrames and ML Pipelines 1. Initialize NNContext and load images into DataFrames using NNImageReader from zoo.common.nncontext import * from zoo.pipeline.nnframes import * sc = init_nncontext() imageDF = NNImageReader.readImages(image_path, sc) 2. Process loaded data using DataFrame transformations getName = udf(lambda row: ...) df = imageDF.withColumn("name", getName(col("image"))) 3. Processing image using built-in feature engineering operations from zoo.feature.image import * transformer = ChainedPreprocessing( [RowToImageFeature(), ImageChannelNormalize(123.0, 117.0, 104.0), ImageMatToTensor(), ImageFeatureToTensor()])

25. nnframes Native DL support in Spark DataFrames and ML Pipelines 4. Define model using Keras-style API from zoo.pipeline.api.keras.layers import * from zoo.pipeline.api.keras.models import * model = Sequential() .add(Convolution2D(32, 3, 3, activation='relu', input_shape=(1, 28, 28))) \ .add(MaxPooling2D(pool_size=(2, 2))) \ .add(Flatten()).add(Dense(10, activation='softmax'))) 5. Train model using Spark ML Pipelines Estimater = NNEstimater(model, CrossEntropyCriterion(), transformer) \ .setLearningRate(0.003).setBatchSize(40).setMaxEpoch(1) \ .setFeaturesCol("image").setCachingSample(False) nnModel =

26. Built-in Deep Learning Models • Object detection API • High-level API and pretrained models (e.g., SSD, Faster-RCNN, etc.) for object detection • Image classification API • High-level API and pretrained models (e.g., VGG, Inception, ResNet, MobileNet, etc.) for image classification • Text classification API • High-level API and pre-defined models (using CNN, LSTM, etc.) for text classification • Recommendation API • High-level API and pre-defined models (e.g., Neural Collaborative Filtering, Wide and Deep Learning, etc.) for recommendation

27. Object Detection API 1. Load pretrained model in Detection Model Zoo from zoo.common.nncontext import * from zoo.models.image.objectdetection import * spark = init_nncontext() model = ObjectDetector.load_model(model_path) 2. Off-the-shell inference using the loaded model image_set =, spark) output = model.predict_image_set(image_set) 3. Visualize the results using utility methods config = model.get_config() visualizer = Visualizer(config.label_map(), encoding="jpg") visualized = visualizer(output).get_image(to_chw=False).collect() Off-the-shell Inference Using Analytics Zoo Object Detection API

28. Reference Use Cases • Anomaly Detection • Using LSTM network to detect anomalies in time series data • Fraud Detection • Using feed-forward neural network to detect frauds in credit card transaction data • Recommendation • Use Analytics Zoo Recommendation API (i.e., Neural Collaborative Filtering, Wide and Deep Learning) for recommendations on data with explicit feedback. • Sentiment Analysis • Sentiment analysis using neural network models (e.g. CNN, LSTM, GRU, Bi-LSTM) • Variational Autoencoder (VAE) • Use VAE to generate faces and digital numbers

29.Use Cases