Stateful Processing for Aggregations...



1.Structured Streaming in Apache Spark: Easy, Fault Tolerant and Scalable Stream Processing Juliusz Sompolski 10th Extremely Large Databases Conference (XLDB) October 11th 2017, Clermont-Ferrand, France

2.About Databricks TEAM Started Spark project (now Apache Spark) at UC Berkeley in 2009 MISSION Making Big Data Simple PRODUCT Unified Analytics Platform

3.About Me Software Engineer working in the new Databricks engineering office in Amsterdam Opened in January 2017 So far expanded to 11 people and growing!

4. building robust stream processing apps is hard

5.Complexities in stream processing Complex Complex Complex Data Workloads Systems Diverse data formats Event time processing Diverse storage (json, avro, binary, …) systems and formats Combining streaming (SQL, NoSQL, parquet, ... ) Data can be dirty, with interactive late, out-of-order System failures queries, machine learning

6. you should not have to reason about streaming

7. you should write simple queries & Spark should continuously update the answer

8. Structured Streaming stream processing on Spark SQL engine fast, scalable, fault-tolerant rich, unified, high level APIs deal with complex data and complex workloads rich ecosystem of data sources integrate with many storage systems

9. Treat Streams as Unbounded Tables data stream unbounded input table new data in the data stream = new rows appended to a unbounded table

10.Conceptual Model Trigger: every 1 sec t=1 t=2 t=3 Time Treat input stream as data up data up data up Input to t = 1 to t = 2 to t = 3 an input table Every trigger interval, input table is effectively growing

11.Conceptual Model Trigger: every 1 sec t=1 t=2 t=3 Time If you apply a query on data up data up data up Input to t = 1 to t = 2 to t = 3 the input table, the result table changes Query with the input result result result Result up to up to up to t=1 t=2 t=3 Every trigger interval, Output we can output the changes in the result

12.Conceptual Model t=1 t=2 t=3 Time Full input does not data up data up data up need to be Input to t = 1 to t = 2 to t = 3 processed every Query trigger result result result Spark does not Result up to up to up to t=1 t=2 t=3 materialize the full input table Output

13.Conceptual Model t=1 t=2 t=3 Time data up data up data up Spark converts Input to t = 1 to t = 2 to t = 3 query to an Query incremental query that operates only result result result on new data to Result up to up to up to t=1 t=2 t=3 generate output Output

14.Anatomy of a Streaming Query spark.readStream Source .format("kafka") .option("subscribe", "input") • Specify one or more .load() locations to read data .groupBy($"value".cast("string")) from .count() .writeStream • Built in support for .format("kafka") Files/Kafka/Socket, .option("topic", "output") pluggable. .trigger("1 minute") ● Additional connectors, e.g. Amazon .outputMode(OutputMode.Complete()) Kinesis available on Databricks platform .option("checkpointLocation", "…") .start() • Can union() multiple sources.

15.Anatomy of a Streaming Query spark.readStream Transformation .format("kafka") .option("subscribe", "input") • Using DataFrames, .load() Datasets and/or SQL. .groupBy('value.cast("string") as 'key) .agg(count("*") as 'value) • Catalyst figures out .writeStream how to execute the .format("kafka") .option("topic", "output") transformation .trigger("1 minute") incrementally. .outputMode(OutputMode.Complete()) .option("checkpointLocation", "…") • Internal processing .start() always exactly-once.

16.Spark automatically streamifies! t=1 t=2 t=3 input = spark.readStream Read from JSON .format("json") JSON Source .load("subscribe") Project result = input device, signal Optimized Operator new files .select("device", "signal") new files process new files process process Codegen, .where("signal > 15") Filter off-heap, etc. signal > 15 result.writeStream .format("parquet") Write to Parquet .start("dest-path") Parquet Sink DataFrames, Logical Optimized Series of Incremental Datasets, SQL Plan Physical Plan Execution Plans Spark SQL converts batch-like query to a series of incremental execution plans operating on new batches of data

17.Anatomy of a Streaming Query spark.readStream .format("kafka") Sink .option("subscribe", "input") .load() • Accepts the output of .groupBy('value.cast("string") as 'key) each batch. .agg(count("*") as 'value) .writeStream • When sinks are .format("kafka") transactional, exactly .option("topic", "output") once semantics. .trigger("1 minute") .outputMode(OutputMode.Complete()) • Use foreach to .option("checkpointLocation", "…") .start() execute arbitrary code.

18.Anatomy of a Streaming Query spark.readStream .format("kafka") Output mode – What's .option("subscribe", "input") output .load() • Complete – Output the whole .groupBy('value.cast("string") as 'key) answer every time .agg(count("*") as 'value) .writeStream • Update – Output changed rows .format("kafka") • Append – Output new rows only .option("topic", "output") .trigger("1 minute") Trigger – When to output .outputMode("update") • Specified as a time, eventually .option("checkpointLocation", "…") supports data size .start() • No trigger means as fast as possible

19.Anatomy of a Streaming Query spark.readStream .format("kafka") Checkpoint .option("subscribe", "input") .load() • Tracks the progress of a .groupBy('value.cast("string") as 'key) query in persistent .agg(count("*") as 'value) storage .writeStream .format("kafka") • Can be used to restart .option("topic", "output") the query if there is a .trigger("1 minute") .outputMode("update") failure. .option("checkpointLocation", "…") .start()

20.Fault-tolerance with Checkpointing t=1 t=2 t=3 Checkpointing - metadata (e.g. offsets) of current batch stored in a write ahead log proce proce proce new files new files new files ss ss ss Huge improvement over Spark write Streaming checkpoints ahead Offsets saved as JSON, no binary log saved end-to-end Can restart after app code change exactly-once guarantees

21. Dataset/DataFrame SQL DataFrames Dataset spark.sql(" val df: DataFrame = val ds: Dataset[(String, Double)] = SELECT type, sum(signal) spark.table("devices") spark.table("devices") FROM devices .groupBy("type") .as[DeviceData] GROUP BY type .sum("signal")) .groupByKey(_.type) .mapValues(_.signal) ") .reduceGroups(_ + _) Most familiar to BI Great for Data Scientists Great for Data Engineers who Analysts familiar with Pandas, R want compile-time type Supports SQL-2003, Dataframes safety HiveQL You choose your hammer for whatever nail you have!

22.Complex Streaming ETL

23.Traditional ETL table 10101010 seconds file hours dump Raw, dirty, un/semi-structured data is dumped as files Periodic jobs run every few hours to convert raw data to structured data ready for further analytics 23

24.Traditional ETL table 10101010 seconds file hours dump Hours of delay before taking decisions on latest data Unacceptable when time is of essence [intrusion detection, anomaly detection, etc.]

25.Streaming ETL w/ Structured Streaming table 10101010 seconds Structured Streaming enables raw data to be available as structured data as soon as possible 25

26.Streaming ETL w/ Structured Streaming Example val rawData = spark.readStream .format("kafka") .option("kafka.boostrap.servers",...) .option("subscribe", "topic") Json data being received in .load() Kafka val parsedData = rawData Parse nested json and flatten .selectExpr("cast (value as string) as json")) .select(from_json("json", schema).as("data")) it .select("data.*") Store in structured Parquet val query = parsedData.writeStream table .option("checkpointLocation", "/checkpoint") .partitionBy("date") Get end-to-end failure .format("parquet") .start("/parquetTable") guarantees

27.Reading from Kafka Specify options to configure val rawData = spark.readStream .format("kafka") .option("kafka.boostrap.servers",...) How? .option("subscribe", "topic") kafka.boostrap.servers => broker1,broker2 .load() What? subscribe => topic1,topic2,topic3 // fixed list of topics subscribePattern => topic* // dynamic list of topics assign => {"topicA":[0,1] } // specific partitions Where? startingOffsets => latest(default) / earliest / {"topicA":{"0":23,"1":345} }

28.Reading from Kafka val rawData = spark.readStream .format("kafka") .option("kafka.boostrap.servers",...) rawData dataframe .option("subscribe", "topic") has the following .load() columns key value topic partition offset timestamp [binary] [binary] "topicA" 0 345 1486087873 [binary] [binary] "topicB" 3 2890 1486086721

29.Transforming Data Cast binary value to val parsedData = rawData .selectExpr("cast (value as string) as json") string .select(from_json("json", schema).as("data")) .select("data.*") Name it column json