Make your PySpark Data Fly with Arrow

In the big data world, it’s not always easy for Python users to move huge amounts of data around. Apache Arrow defines a common format for data interchange, while Arrow Flight introduced in version 0.11.0, provides a means to move that data efficiently between systems. Arrow Flight is a framework for Arrow-based messaging built with gRPC. It enables data microservices where clients can produce and consume streams of Arrow data to share it over the wire. In this session, I’ll give a brief overview of Arrow Flight from a Python perspective, and show that it’s easy to build high performance connections when systems can talk Arrow. I’ll also cover some ongoing work in using Arrow Flight to connect PySpark with TensorFlow – two systems with great Python APIs but very different underlying internal data.
展开查看详情

1.Make Your PySpark Data Fly with Apache Arrow! Bryan Cutler Software Engineer @BryanCutler DBG / May 2, 2018 / © 2019 IBM Corporation

2.About Bryan @BryanCutler on Github Software Engineer, IBM Center for Open-Source Data & AI Technologies (CODAIT) Big Data Machine Learning & AI Apache Spark committer Apache Arrow committer TensorFlow I/O maintainer DBG / May 2, 2018 / © 2018 IBM Corporation

3.Center for Open Source Data and AI Technologies CODAIT codait.org CODAIT aims to make AI solutions dramatically easier to create, 12 Improving Enterprise AI Lifecycle in Open deploy, and manage in the 10Source enterprise 8 Relaunch of the Spark Technology 6 Column 1 Column 2 Center (STC) to reflect expanded 4 Column 3 mission 2 DBG / May 2, 2018 / © 2018 IBM Corporation 0 Row 1 Row 2 Row 3 Row 4

4.Agenda Overview of Apache Arrow Intro to Arrow Flight How to talk Arrow Flight in Action DBG / May 2, 2018 / © 2018 IBM Corporation

5.Apache Arrow Overview DBG / May 2, 2018 / © 2018 IBM Corporation

6.Apache Arrow About Arrow Standard format for in-memory columnar data ● Implementations in many languages and growing ● Built for efficient analytic operations on modern hardware Has built in primitives for basic exchange of Arrow data ● Zero-copy data within a process ● IPC with Arrow record batch messages DBG / May 2, 2018 / © 2018 IBM Corporation

7.Apache Arrow Why use Arrow Arrow brings many benefits ● Common standard with cross- language support ● Better interoperability between frameworks ● Avoid costly data serialization DBG / May 2, 2018 / © 2018 IBM Corporation

8.Apache Arrow Who is using Arrow The Apache® Software Foundation Announces Apache Arrow™ Momentum ● Adopted by dozens of Open Source and commercial technologies ● Exceeded 1,000,000 monthly downloads within first three years as an Apache Top-Level Project ● Apache Spark, NVIDIA RAPIDS, pandas, and Dremio, among others https://arrow.apache.org/powered_by Source: https://blogs.apache.org/foundation/entry/the-apache-software-foundation-announces46 DBG / May 2, 2018 / © 2018 IBM Corporation

9.Arrow Flight DBG / May 2, 2018 / © 2018 IBM Corporation

10.Arrow Flight Introduction Arrow Flight is an Arrow-native RPC framework Defines a standard protocol for data exchange Makes it easy to efficiently move data around a network by providing 1: ● Arrow Data as a Service ● Batch Streams ● Stream Management DBG / May 2, 2018 / © 2018 IBM Corporation

11.Arrow Flight Arrow Data as a Service Extensible data service ● Clients get/put Arrow data ● List available data ● Custom actions ● Can think of it as ODBC for in-memory data DBG / May 2, 2018 / © 2018 IBM Corporation

12.Arrow Flight Stream Batching Arrow Stream is a schema + record batches A Flight is composed of multiple streams ● Streams could come from different endpoints ● Transfer data in bulk for efficiency ● Location info can be used to improve data locality Flight Record Record Record Record Batch Batch Batch Batch Stream 1 Stream 2 DBG / May 2, 2018 / © 2018 IBM Corporation

13.Arrow Flight Stream Management Service manages Flights for the clients ● Flight Info gives a list of endpoints with locations of each stream in the Flight ● Streams are referenced by a ticket – A ticket is an opaque struct that is unique for each stream ● Flight descriptors differentiate between flights – Can define how Flight is composed – Batch size, or even a SQL query DBG / May 2, 2018 / © 2018 IBM Corporation

14.Arrow Flight FlightDescriptor Types Simple path-like: “datasets/cats­dogs/training” Custom proto:  message MyDescriptor {    string sql_query = 1;    int32 records_per_batch = 2;  }  Message MyTicket {    MyDescriptor desc = 1;    string uuid = 2;  } DBG / May 2, 2018 / © 2018 IBM Corporation

15.Flight Example Ticket Sequence for Consumer To consume an entire Flight Consumer Flight Service ● Get FlightInfo for list of Get FlightInfo (FlightDescriptor) endpoints with tickets FlightInfo ● For each endpoint For Each Endpoint – Use ticket to get endpoint Get Stream (Ticket) stream – Process each RecordBatch in Stream the stream For Each Batch in Stream Get Next RecordBatch Process batch DBG / May 2, 2018 / © 2018 IBM Corporation

16.Arrow Flight Benefits ● Applications use client interface and exchange standard record batches ● Complex communication handled internally ● Efficient, uses batches and minimum copies ● Standardized protocol – Authentication – Support different transports – Able to handle backpressure DBG / May 2, 2018 / © 2018 IBM Corporation

17.Arrow Flight Current Status Common protocol defined using protocol buffers Prototype implementations in Java, C++, Python Still experimental, but lots of work being done to make production ready DBG / May 2, 2018 / © 2018 IBM Corporation

18.Arrow Flight How to Talk Arrow If a system wants to exchange Arrow Fight data, then needs to be able to produce/consume an Arrow stream ● Spark kind of does already, but not externalized ● See SPARK-24579 and SPARK-26413 ● Can build a Scala Flight connector with a little hacking DBG / May 2, 2018 / © 2018 IBM Corporation

19.Arrow Flight How to Talk Arrow TensorFlow I/O has Arrow Datasets ● Maintained by SIG-IO community – Also many other inputs to TF – Many sources from legacy contrib/ ● Several Arrow datasets – ArrowStreamDataset used here ● Input ops only for now ● Install: “pip install tensorflow-io” Check it out at https://github.com/tensorflow/io DBG / May 2, 2018 / © 2018 IBM Corporation

20.Flight in Action: Spark to TensorFlow DBG / May 2, 2018 / © 2018 IBM Corporation

21.Flight Example Define the Service Simple Service backed by an in-memory data store ● Keeps streams in memory ● Flight descriptor is a string id ● This is from the Java Flight examples DBG / May 2, 2018 / © 2018 IBM Corporation

22.Flight Example Make the Clients PySpark will put Arrow data ● Map partition op of DataFrame to Arrow ● Each partition sent as a stream of batches – A ticket is roughly the partition index TensorFlow Dataset will get Arrow data ● Request entire Flight, which is multiple streams ● Gets one batch at a time to process ● Op outputs tensors DBG / May 2, 2018 / © 2018 IBM Corporation

23.Flight Example Data Flow Flight Service Spark Worker TensorFlow Record Record Record Batch Batch Batch Stream 1 Flight = Record Stream 1 Batch Record Record + Stream 2 Batch Batch Record Stream 2 Batch Record Batch Process Batches DBG / May 2, 2018 / © 2018 IBM Corporation

24.Flight Example “”” PySpark Client Walkthrough “”” # Spark job to put partitions to service SparkFlightConnector.put( Application code is simple     df,           # Existing DataFrame     host, port,   # Flight Service ip – Only a few lines     'rad­spark'   # Data descriptor – Focus on working with data ) – Don’t need to worry about conversion, file “”” TensorFlow Client formats, networking “”” # Arrow tf.data.Dataset gets Flight data dataset = ArrowFlightDataset.from_schema( Example in Python but data never needs to     host, port,   # Flight Service ip go through Python!     'rad­spark',  # Data descriptor     to_arrow_schema(df.schema)  # Schema   Worker JVM → Flight Service → TF C++ ) # Iterate over Flight data as tensors it = dataset.make_one_shot_iterator() DBG / May 2, 2018 / © 2018 IBM Corporation

25.Arrow Flight Recap Apache Arrow – standard for in-memory data Arrow Flight – efficiently move data around network ● Arrow data as a service ● Stream batching ● Stream management Simple example with PySpark + TensorFlow ● Data transfer never goes through Python DBG / May 2, 2018 / © 2018 IBM Corporation

26.Links & References Apache Arrow and Flight specification https://arrow.apache.org/ https://github.com/apache/arrow/blob/master/format/Flight.proto TensorFlow I/O https://github.com/tensorflow/io Related Spark JIRAs SPARK-24579 SPARK-26413 Example Code https://github.com/BryanCutler/SparkArrowFlight References: Flight Overview by Arrow PMC Jacques Nadeau [1] https://www.slideshare.net/JacquesNadeau5/apache-arrow-flight-overview DBG / May 2, 2018 / © 2018 IBM Corporation

27.Arrow Flight Introduction Arrow Flight is an Arrow-native RPC framework Defines a standard protocol for data exchange Makes it easy to efficiently move data around a network by providing: ● Arrow Data as a Service ● Batch Streams ● Stream Management DBG / May 2, 2018 / © 2018 IBM Corporation

28.Thank you! MAX codait.org http://github.com/BryanCutler FfDL developer.ibm.com/code Sign up for IBM Cloud and try Watson Studio! https://ibm.biz/BdZgcx https://datascience.ibm.com/ DBG / May 2, 2018 / © 2018 IBM Corporation

29.DBG / May 2, 2018 / © 2018 IBM Corporation