Designing Structured Streaming Pipelines—How to Architect Things Right

Structured Streaming has proven to be the best platform for building distributed stream processing applications. Its unified SQL/Dataset/DataFrame APIs and Spark’s built-in functions make it easy for developers to express complex computations. However, expressing the business logic is only part of the larger problem of building end-to-end streaming pipelines that interact with a complex ecosystem of storage systems and workloads. It is important for the developer to truly understand the business problem needs to be solved. What are you trying to consume? Single source? Joining multiple streaming sources? Joining streaming with static data? What are you trying to produce? What is the final output that the business wants? What type of queries does the business want to run on the final output? When do you want it? When does the business want to the data? What is the acceptable latency? Do you really want to millisecond-level latency? How much are you willing to pay for it? This is the ultimate question and the answer significantly determines how feasible is it solve the above questions. These are the questions that we ask every customer in order to help them design their pipeline. In this talk, I am going to go through the decision tree of designing the right architecture for solving your problem.
展开查看详情

1.Designing Structured Streaming Pipelines How to Architect Things Right Tathagata “TD” Das @tathadas #UnifiedAnalytics #SparkAISummit

2. Structured Streaming Distributed stream processing built on SQL engine High throughput, second-scale latencies Fault-tolerant, exactly-once Great set of connectors Philosophy: Data streams are unbounded tables Users write batch-like code on a table Spark will automatically run code incrementally on streams 2

3.Structured Streaming @ 1000s of customer streaming apps in production on Databricks 1000+ trillions of rows processed in production

4. Structured Streaming Example Read JSON data from Kafka ETL Parse nested JSON Store in structured Parquet table Get end-to-end failure guarantees #UnifiedAnalytics #SparkAISummit 4

5. Anatomy of a Streaming Query spark.readStream.format("kafka") Source .option("kafka.boostrap.servers",...) .option("subscribe", "topic") Specify where to read data from .load() Built-in support for Files / Kafka / creates a DataFrame Kinesis* key value topic partition offset timestamp Can include multiple sources of [binary] [binary] topicA 0 345 1486087873 different types using join() / union() [binary] [binary] topicB 3 2890 1486086721 *Available only on Databricks Runtime 5

6. Anatomy of a Streaming Query spark.readStream.format("kafka") Transformations .option("kafka.boostrap.servers",...) .option("subscribe", "topic") Cast bytes from Kafka to a string, .load() parse it as a json, and generate .selectExpr("cast (value as string) as json") nested columns .select(from_json("json", schema).as("data")) 100s of built-in, optimized SQL functions like from_json user-defined functions, lambdas, function literals with map, flatMap… #UnifiedAnalytics #SparkAISummit 6

7. Anatomy of a Streaming Query spark.readStream.format("kafka") Sink .option("kafka.boostrap.servers",...) .option("subscribe", "topic") Write transformed output to .load() external storage systems .selectExpr("cast (value as string) as json") .select(from_json("json", schema).as("data")) Built-in support for Files / Kafka .writeStream Use foreach to execute arbitrary .format("parquet") .option("path", "/parquetTable/") code with the output data Some sinks are transactional and exactly once (e.g. files) 7

8. Anatomy of a Streaming Query spark.readStream.format("kafka") .option("kafka.boostrap.servers",...) Processing Details .option("subscribe", "topic") .load() Trigger: when to process data .selectExpr("cast (value as string) as json") - Fixed interval micro-batches .select(from_json("json", schema).as("data")) - As fast as possible micro-batches .writeStream .format("parquet") - Continuously (new in Spark 2.3) .option("path", "/parquetTable/") .trigger("1 minute") .option("checkpointLocation", "…") Checkpoint location: for tracking the .start() progress of the query #UnifiedAnalytics #SparkAISummit 8

9. Spark automatically streamifies! t=1 t=2 t=3 spark.readStream.format("kafka") Read from Kafka .option("kafka.boostrap.servers",...) Kafka Source .option("subscribe", "topic") .load() .selectExpr("cast (value as string) as json") Project .select(from_json("json", schema).as("data")) device, signal Optimized new data Operator new data new data .writeStream process process process .format("parquet") codegen, off- .option("path", "/parquetTable/") Filter heap, etc. .trigger("1 minute") signal > 15 .option("checkpointLocation", "…") .start() Write to Parquet Parquet Sink DataFrames, Logical Optimized Series of Incremental Datasets, SQL Plan Plan Execution Plans Spark SQL converts batch-like query to a series of incremental execution plans operating on new batches of data

10. Anatomy of a Streaming Query spark.readStream.format("kafka") .option("kafka.boostrap.servers",...) .option("subscribe", "topic") STRUCTURED Complex ETL STREAMING .load() .selectExpr("cast (value as string) as json") .select(from_json("json", schema).as("data")) .writeStream Raw data from Kafka available .format("parquet") .option("path", "/parquetTable/") as structured data in seconds, .trigger("1 minute") ready for querying .option("checkpointLocation", "…") .start()

11.My past talks: Deep dives A Deep Dive Into Structured Streaming Spark Summit 2016 STRUCTURED Complex ETL STREAMING A Deep Dive into Stateful Stream Processing in Structured Streaming Spark + AI Summit 2018 #UnifiedAnalytics #SparkAISummit 11

12.This talk: Streaming Design Patterns Zoomed out view of your data pipeline STRUCTURED INSIGHTS DATA Complex ETL STREAMING 12

13.Another streaming design pattern talk? Most talks This talk Focus on a pure streaming Spark is more than a streaming engine engine Explain one way of Spark has multiple ways of achieving the end goal achieving the end goal with tunable perf, cost and quality 13

14.This talk How to think about design Common design patterns How we are making this easier 14

15.Streaming Pipeline Design ???? Data streams Insights 15

16. ???? What? Why? How? 16

17.What? What is your input? What is your output? What is your data? What results do you need? What format and system is What throughput and your data in? latency do you need? 17

18.Why? humans? computers? Why do you want this output in this way? Who is going to take actions based on it? When and how are they going to consume it? 18

19.Why? Common mistakes! #1 "I want my dashboard with counts to No point of updating every be updated every second" second if humans are going to take actions in minutes or hours #2 "I want to generate automatic alerts No point taking fast actions on with up-to-the-last second count" low quality data and results (but my input data is often delayed) 19

20.Why? Common mistakes! #3 "I want to train machine learning Key-value stores are not great models on the results" for large, repeated data scans (but my results are in a key-value store) which machine learning workloads perform 20

21.How? How to process How to store the data? the results? ???? ???? 21

22.Streaming Design Patterns What? Complex ETL How? Why? 22

23.Pattern 1: ETL What? Input: unstructured input Output: structured stream from files, Kafka, etc. tabular data 01:06:45 WARN id = 1 , update failed 01:06:45 INFO id=23, update success 01:06:57 INFO id=87: update postpo … Why? Query latest structured data interactively or with periodic jobs 23

24.P1: ETL What? How? Convert unstructured input to Process: Use Structured Streaming query to transform structured tabular data unstructured, dirty data Latency: few minutes Run 24/7 on a cluster with default trigger Store: Save to structured scalable storage that supports data skipping, etc. Why? E.g.: Parquet, ORC, or even better, Delta Lake Query latest structured data ETL QUERY interactively or with periodic jobs 01:06:45 WARN id = 1 , update failed STRUCTURED 01:06:45 INFO id=23, update success 01:06:57 INFO id=87: update postpo … STREAMING 24

25. P1: ETL How? Read with snapshot guarantees while writes are in progress Concurrently reprocess data with full ACID guarantees Store: Save to Coalesce small files into larger files Fix mistakes in existing data REPROCESS ETL QUERY 01:06:45 WARN id = 1 , update failed STRUCTURED 01:06:45 INFO id=23, update success 01:06:57 INFO id=87: update postpo … STREAMING 25

26.P1: Cheaper ETL What? How? Convert unstructured input to Process: Still use Structured Streaming query! structured tabular data Run streaming query with "trigger.once" for Latency: few minutes hours processing all available data since last batch Not have clusters up 24/7 Set up external schedule (every few hours?) to periodically start a cluster and run one batch Why? RESTART ON Query latest data interactively SCHEDULE or with periodic jobs 01:06:45 WARN id = 1 , update failed STRUCTURED 01:06:45 INFO id=23, update success Cheaper solution 01:06:57 INFO id=87: update postpo … STREAMING 26

27.P1: Query faster than ETL! What? How? Latency: hours seconds Query data in Kafka directly using Spark SQL Can process up to the last records received by Kafka when the query was started Why? Query latest up-to-the last SQL second data interactively 27

28.Pattern 2: Key-value output What? Input: new data Output: updated for each key values for each key KEY LATEST VALUE { "key1": "value1" } Aggregations (sum, count, …) { "key1": "value2" } key1 value2 { "key2": "value3" } key2 value3 Sessionizations Lookup latest value for key (dashboards, websites, etc.) Why? OR Summary tables for querying interactively or with periodic jobs 28

29.P2.1: Key-value output for lookup What? How? Generate updated values for keys Process: Use Structured Streaming with Latency: seconds/minutes stateful operations for aggregation Store: Save in key-values stores optimized for single key lookups Why? Lookup latest value for key STATEFUL AGGREGATION LOOKUP { "key1": "value1" } { "key1": "value2" } STRUCTURED { "key2": "value3" } STREAMING 29