Apache Spark Serving: Unifying Batch, Streaming, and RESTful Serving

We present Spark Serving, a new spark computing mode that enables users to deploy any Spark computation as a sub-millisecond latency web service backed by any Spark Cluster. Attendees will explore the architecture of Spark Serving and discover how to deploy services on a variety of cluster types like Azure Databricks, Kubernetes, and Spark Standalone. We will also demonstrate its simple yet powerful API for RESTful SparkSQL, SparkML, and Deep Network deployment with the same API as batch and streaming workloads. In addition, we will explore the “dual architecture”: HTTP on Spark. This architecture converts any spark cluster into a distributed web client with the familiar and pipelinable SparkML API. These two contributions provide the fundamental spark communication primitives to integrate and deploy any computation framework into the Spark Ecosystem. We will explore how Microsoft has used this work to leverage Spark as a fault-tolerant microservice orchestration engine in addition to an ETL and ML platform. And will walk through two examples drawn from Microsoft’s ongoing work on Cognitive Service composition, and unsupervised object detection for Snow Leopard recognition.
展开查看详情

1.Apache Spark Serving: Unifying Batch, Streaming, and RESTful Serving Mark Hamilton, Microsoft, marhamil@microsoft.com #UnifiedAnalytics #SparkAISummit

2.Overview • Spark Serving 101 – Basic Usage – Anatomy of a Query – Performance • Architecture – Fault Tolerance – Serving Shuffles – Replying from within computations • HTTP on Spark • Deployment – Kubernetes, Azure Machine Learning #UnifiedAnalytics #SparkAISummit 2

3. Batch API: spark.read.parquet.load(…) Motivation .select(…) Streaming API: • RESTful model deployment spark.readStream.kafka.load(…) makes it easy to integrate .select(…) ML into other systems • Current solutions rely on Serving API: exporting Spark pipelines ??? or using the high latency batch API • Spark’s philosophy is to unify computing with a single easy to use API #UnifiedAnalytics #SparkAISummit 3

4. Serving Lightning Fast Web Services on Any Spark Cluster • Sub-millisecond val server = spark latencies .readStream • Fully Distributed .server(“0.0.0.0", 80, "api") .option(“name”, “my_api”) • Spins up in seconds .load() • Same API as Batch .parseRequest(schema) and Streaming .mlTransform(model) • Scala, Python, R and .makeReply("scores") Java .writeStream .server() • Fully Open Source .option(“name”, “my_api”) www.aka.ms/spark .start() #UnifiedAnalytics #SparkAISummit 4

5.Deploying a Deep Network • Demo/Code walkthrough #UnifiedAnalytics #SparkAISummit 5

6.Anatomy val server = spark 1) Read a streaming data source .readStream 2) Use the “server” source with host, .server(“0.0.0.0", 80, "api") port, and API path .option(“name”, “my_api”) .load() 3) Load the dataframe .parseRequest(schema) 4) Parse the incoming request body to a .mlTransform(model) target schema .makeReply("scores") 5) Transform the dataframe with a .writeStream sparkML model (same as model.transform) .server() .option(“name”, “my_api”) 6) Pack the target column into the .start() body of a web response #UnifiedAnalytics #SparkAISummit 6

7.Performance • PMML, ONNX, and MLeap require writing exporters for each model in SparkML • Clipper and AML leverage the Batch API which incurs a steep df.writeStream .server() 500ms overhead but .option(“name”, “my_api”) does not require .trigger(continuous=“20 seconds”) .start() additional code #UnifiedAnalytics #SparkAISummit 7

8.Architecture Basics Users / Apps Load Balancer HTTP Requests and Responses Server Server Partition Partition Partition Partition Partition Partition Spark Worker Spark Worker Spark Master #UnifiedAnalytics #SparkAISummit 8

9. Architecture Details: Microbatch Fault Tolerance Server (Epoch, Partition) → LinkedBlockingQueue[Request] 1. Handler adds List[Request] Request request to queue History Queue Store Epoch 1 Partition Partition Partition 1 2 3 Spark Worker #UnifiedAnalytics #SparkAISummit 9

10. Architecture Details: Microbatch Fault Tolerance Server (Epoch, Partition) → LinkedBlockingQueue[Request] 1. Handler adds List[Request] Request request to queue History Queue Store Epoch 1 2. Partitions pull request Partition Partition Partition 1 2 3 Spark Worker #UnifiedAnalytics #SparkAISummit 10

11. Architecture Details: Microbatch Fault Tolerance Server (Epoch, Partition) → LinkedBlockingQueue[Request] 1. Handler adds List[Request] Request request to queue History Queue Store Epoch 1 2. Partitions pull request 3. Partitions add to history store Partition Partition Partition 1 2 3 Spark Worker #UnifiedAnalytics #SparkAISummit 11

12. Architecture Details: Microbatch Fault Tolerance Server (Epoch, Partition) → LinkedBlockingQueue[Request] List[Request] Request History Queue Store Epoch 1 Partition Partition Partition 1 2 3 Spark Worker #UnifiedAnalytics #SparkAISummit 12

13. Architecture Details: Microbatch Fault Tolerance Server (Epoch, Partition) → LinkedBlockingQueue[Request] List[Request] Request History Queue Store Epoch 1 Partition Partition Partition 1 2 3 Spark Worker #UnifiedAnalytics #SparkAISummit 13

14. Architecture Details: Microbatch Fault Tolerance Server (Epoch, Partition) → LinkedBlockingQueue[Request] List[Request] Request History Queue Store Epoch 1 4. Retry partition pulls from history store Partition Partition Partition 1 2, Retry 1 3 Spark Worker #UnifiedAnalytics #SparkAISummit 14

15. Architecture Details: Microbatch Fault Tolerance Server 1. Handler adds request to queue (Epoch, Partition) → LinkedBlockingQueue[Request] List[Request] Request Request History Queue Queue Store Epoch 1 Epoch 2 Partition Partition Partition 1 2 3 Spark Worker #UnifiedAnalytics #SparkAISummit 15

16.Reply From within Pipelines df.withColumn(“sentReplies”, when(col(“condition”), ServingUDFs.sendReplyUDF(apiName, col(“replies”), col(“id”)) ).otherwise( lit(null) ) .filter(col(“sentReplies").isNull) .makeReply("value") #UnifiedAnalytics #SparkAISummit 16

17. Architecture Details: Serving Shuffles Request Time Response Time Driver Node Driver Node Serving Query Serving Service status info Serving Query Serving Object Monitor Object Monitor Inter-machine sent outside of routing in case of hotpath shuffles Worker Node 1 Worker Node N Worker Node 1 Worker Node N Partition 1 Partition 2 Partition M-1 Partition M Partition 1 Partition 2 Partition M-1 Partition M Web Routing Web Routing Web Routing Web Routing Server 1 Service Server N Service Server 1 Service Server N Service Function Dispatch if Load Balancer Load Balancer request is local #UnifiedAnalytics #SparkAISummit 17

18. on • Full Integration between HTTP Protocol and df = SimpleHTTPTransformer() Spark SQL .setInputParser(JSONInputParser()) .setOutputParser(JSONOutputParser() • Spark as a Microservice .setDataType(schema)) .setOutputCol("results") Orchestrator .setUrl(…) • Spark + X #UnifiedAnalytics #SparkAISummit 18

19. on Web Service Local Local Local HTTP Service Service Service Requests and Client Client Client Responses Client Client Client Partition Partition Partition Partition Partition Partition Spark Worker Spark Worker #UnifiedAnalytics #SparkAISummit 19

20.Spark as a Microservice Orchestrator • Can use Spark as a Web Web Web distributed web server and Service Service Service client 1 2 3 • Can incorporate web HTTP on services into SparkML Spark pipelines, then deploy these Spark Worker composite models as services Spark Serving • Can compose Spark with other ecosystems via orchestration frameworks #UnifiedAnalytics #SparkAISummit 20

21. Azure Kubernetes Service + Helm Kubernetes (AKS, ACS, GKE, On-Prem etc) • Works on any k8s cluster K8s worker K8s worker K8s worker • Helm: Package Manager Cloud Cognitive Service Cognitive Spark Service Worker Cognitive Service Container Container Container for Kubernetes Cognitive Services HTTP on Spark HTTP on Spark HTTP on Spark HTTP on Spark Spark Spark Spark Worker Worker Worker helm repo add mmlspark \ Storage or https://dbanda.github.io/charts other Databases Spark Serving Hotpath Jupyter, Zepplin Spark Zepplin, Serving helm install mmlspark/spark \ Spark Readers Load LIVY, or Spark Jupyter Balancer --set localTextApi=true Submit LB REST Requests to Submit Jobs, Run Notebooks, Deployed Models Manage Cluster, etc Dalitso Banda, dbanda@microsoft.com Users / Apps Microsoft AI Development Acceleration Program #UnifiedAnalytics #SparkAISummit 21

22.Deployment: Azure ML • Can use Spark Serving to Incoming improve latency Request Request of Azure ML Passthrough Services AML Flask Spark Serving Public Server Local Server • Just modify AML scoring script AML Container #UnifiedAnalytics #SparkAISummit 22

23. Microsoft Machine Learning for Apache Spark v0.16 Microsoft’s Open Source Contributions to Apache Spark Cognitive Spark Model LightGBM Deep Networks HTTP on Services Serving Interpretability Gradient Boosting with CNTK Spark www.aka.ms/spark Azure/mmlspark #UnifiedAnalytics #SparkAISummit 23

24.Conclusions • Spark Serving: idiomatic way to deploy any Spark streaming computation as a web service www.aka.ms/spark • Millisecond latencies Help us advance Spark: • Get started now with Azure/mmlspark interactive examples! • The Azure Cognitive Services on Contact: Spark: Clusters with Embedded marhamil@microsoft.com Intelligent Services – 3:30pm Room 2009 mmlspark-support@microsoft.com #UnifiedAnalytics #SparkAISummit 24

25.Thanks To • Sudarshan Raghunathan, Anand Raman, Pablo Castro • Ilya Matiach • Andrew Schonhoffer • Microsoft Development Acceleration Team: – Dalitso Banda, Casey Hong, Karthik Rajendran, Manon Knoertzer, Tayo Amuneke, Alejandro Buendia • Daniel Ciborowski, Markus Cosowicz, Scott Graham, Jeremy Reynolds, Miguel Fierro, Tao Wu • Azure CAT Team + AzureML Team #UnifiedAnalytics #SparkAISummit 25