Stateful Processing for Aggregations...

看着我天真的大眼睛说发布于2018/12/05 11:16

注脚

1.Structured Streaming in Apache Spark: Easy, Fault Tolerant and Scalable Stream Processing Juliusz Sompolski 10th Extremely Large Databases Conference (XLDB) October 11th 2017, Clermont-Ferrand, France

2.About Databricks TEAM Started Spark project (now Apache Spark) at UC Berkeley in 2009 MISSION Making Big Data Simple PRODUCT Unified Analytics Platform

3.About Me Software Engineer working in the new Databricks engineering office in Amsterdam Opened in January 2017 So far expanded to 11 people and growing!

4. building robust stream processing apps is hard

5.Complexities in stream processing Complex Complex Complex Data Workloads Systems Diverse data formats Event time processing Diverse storage (json, avro, binary, …) systems and formats Combining streaming (SQL, NoSQL, parquet, ... ) Data can be dirty, with interactive late, out-of-order System failures queries, machine learning

6. you should not have to reason about streaming

7. you should write simple queries & Spark should continuously update the answer

8. Structured Streaming stream processing on Spark SQL engine fast, scalable, fault-tolerant rich, unified, high level APIs deal with complex data and complex workloads rich ecosystem of data sources integrate with many storage systems

9. Treat Streams as Unbounded Tables data stream unbounded input table new data in the data stream = new rows appended to a unbounded table

10.Conceptual Model Trigger: every 1 sec t=1 t=2 t=3 Time Treat input stream as data up data up data up Input to t = 1 to t = 2 to t = 3 an input table Every trigger interval, input table is effectively growing

11.Conceptual Model Trigger: every 1 sec t=1 t=2 t=3 Time If you apply a query on data up data up data up Input to t = 1 to t = 2 to t = 3 the input table, the result table changes Query with the input result result result Result up to up to up to t=1 t=2 t=3 Every trigger interval, Output we can output the changes in the result

12.Conceptual Model t=1 t=2 t=3 Time Full input does not data up data up data up need to be Input to t = 1 to t = 2 to t = 3 processed every Query trigger result result result Spark does not Result up to up to up to t=1 t=2 t=3 materialize the full input table Output

13.Conceptual Model t=1 t=2 t=3 Time data up data up data up Spark converts Input to t = 1 to t = 2 to t = 3 query to an Query incremental query that operates only result result result on new data to Result up to up to up to t=1 t=2 t=3 generate output Output

14.Anatomy of a Streaming Query spark.readStream Source .format("kafka") .option("subscribe", "input") • Specify one or more .load() locations to read data .groupBy($"value".cast("string")) from .count() .writeStream • Built in support for .format("kafka") Files/Kafka/Socket, .option("topic", "output") pluggable. .trigger("1 minute") ● Additional connectors, e.g. Amazon .outputMode(OutputMode.Complete()) Kinesis available on Databricks platform .option("checkpointLocation", "…") .start() • Can union() multiple sources.

15.Anatomy of a Streaming Query spark.readStream Transformation .format("kafka") .option("subscribe", "input") • Using DataFrames, .load() Datasets and/or SQL. .groupBy('value.cast("string") as 'key) .agg(count("*") as 'value) • Catalyst figures out .writeStream how to execute the .format("kafka") .option("topic", "output") transformation .trigger("1 minute") incrementally. .outputMode(OutputMode.Complete()) .option("checkpointLocation", "…") • Internal processing .start() always exactly-once.

16.Spark automatically streamifies! t=1 t=2 t=3 input = spark.readStream Read from JSON .format("json") JSON Source .load("subscribe") Project result = input device, signal Optimized Operator new files .select("device", "signal") new files process new files process process Codegen, .where("signal > 15") Filter off-heap, etc. signal > 15 result.writeStream .format("parquet") Write to Parquet .start("dest-path") Parquet Sink DataFrames, Logical Optimized Series of Incremental Datasets, SQL Plan Physical Plan Execution Plans Spark SQL converts batch-like query to a series of incremental execution plans operating on new batches of data

17.Anatomy of a Streaming Query spark.readStream .format("kafka") Sink .option("subscribe", "input") .load() • Accepts the output of .groupBy('value.cast("string") as 'key) each batch. .agg(count("*") as 'value) .writeStream • When sinks are .format("kafka") transactional, exactly .option("topic", "output") once semantics. .trigger("1 minute") .outputMode(OutputMode.Complete()) • Use foreach to .option("checkpointLocation", "…") .start() execute arbitrary code.

18.Anatomy of a Streaming Query spark.readStream .format("kafka") Output mode – What's .option("subscribe", "input") output .load() • Complete – Output the whole .groupBy('value.cast("string") as 'key) answer every time .agg(count("*") as 'value) .writeStream • Update – Output changed rows .format("kafka") • Append – Output new rows only .option("topic", "output") .trigger("1 minute") Trigger – When to output .outputMode("update") • Specified as a time, eventually .option("checkpointLocation", "…") supports data size .start() • No trigger means as fast as possible

19.Anatomy of a Streaming Query spark.readStream .format("kafka") Checkpoint .option("subscribe", "input") .load() • Tracks the progress of a .groupBy('value.cast("string") as 'key) query in persistent .agg(count("*") as 'value) storage .writeStream .format("kafka") • Can be used to restart .option("topic", "output") the query if there is a .trigger("1 minute") .outputMode("update") failure. .option("checkpointLocation", "…") .start()

20.Fault-tolerance with Checkpointing t=1 t=2 t=3 Checkpointing - metadata (e.g. offsets) of current batch stored in a write ahead log proce proce proce new files new files new files ss ss ss Huge improvement over Spark write Streaming checkpoints ahead Offsets saved as JSON, no binary log saved end-to-end Can restart after app code change exactly-once guarantees

21. Dataset/DataFrame SQL DataFrames Dataset spark.sql(" val df: DataFrame = val ds: Dataset[(String, Double)] = SELECT type, sum(signal) spark.table("devices") spark.table("devices") FROM devices .groupBy("type") .as[DeviceData] GROUP BY type .sum("signal")) .groupByKey(_.type) .mapValues(_.signal) ") .reduceGroups(_ + _) Most familiar to BI Great for Data Scientists Great for Data Engineers who Analysts familiar with Pandas, R want compile-time type Supports SQL-2003, Dataframes safety HiveQL You choose your hammer for whatever nail you have!

22.Complex Streaming ETL

23.Traditional ETL table 10101010 seconds file hours dump Raw, dirty, un/semi-structured data is dumped as files Periodic jobs run every few hours to convert raw data to structured data ready for further analytics 23

24.Traditional ETL table 10101010 seconds file hours dump Hours of delay before taking decisions on latest data Unacceptable when time is of essence [intrusion detection, anomaly detection, etc.]

25.Streaming ETL w/ Structured Streaming table 10101010 seconds Structured Streaming enables raw data to be available as structured data as soon as possible 25

26.Streaming ETL w/ Structured Streaming Example val rawData = spark.readStream .format("kafka") .option("kafka.boostrap.servers",...) .option("subscribe", "topic") Json data being received in .load() Kafka val parsedData = rawData Parse nested json and flatten .selectExpr("cast (value as string) as json")) .select(from_json("json", schema).as("data")) it .select("data.*") Store in structured Parquet val query = parsedData.writeStream table .option("checkpointLocation", "/checkpoint") .partitionBy("date") Get end-to-end failure .format("parquet") .start("/parquetTable") guarantees

27.Reading from Kafka Specify options to configure val rawData = spark.readStream .format("kafka") .option("kafka.boostrap.servers",...) How? .option("subscribe", "topic") kafka.boostrap.servers => broker1,broker2 .load() What? subscribe => topic1,topic2,topic3 // fixed list of topics subscribePattern => topic* // dynamic list of topics assign => {"topicA":[0,1] } // specific partitions Where? startingOffsets => latest(default) / earliest / {"topicA":{"0":23,"1":345} }

28.Reading from Kafka val rawData = spark.readStream .format("kafka") .option("kafka.boostrap.servers",...) rawData dataframe .option("subscribe", "topic") has the following .load() columns key value topic partition offset timestamp [binary] [binary] "topicA" 0 345 1486087873 [binary] [binary] "topicB" 3 2890 1486086721

29.Transforming Data Cast binary value to val parsedData = rawData .selectExpr("cast (value as string) as json") string .select(from_json("json", schema).as("data")) .select("data.*") Name it column json

30.Transforming Data Cast binary value to val parsedData = rawData .selectExpr("cast (value as string) as json") string .select(from_json("json", schema).as("data")) .select("data.*") Name it column json data (nested) Parse json string and json time device … expand into nested { "timestamp": 1486087873, "device": stamp from_json("json") columns, name it data "devA", …} { "timestamp": as "data" 14860 87873 devA … 1486082418, "device": 14860 devX "devX", …} … 86721

31.Transforming Data Cast binary value to val parsedData = rawData .selectExpr("cast (value as string) as json") string .select(from_json("json", schema).as("data")) .select("data.*") Name it column json (not nested) data (nested) Parse json string and time device … time stamp device … stamp expand into nested 14860 devA … select("data.*") 1486087 873 devA … columns, name it data 87873 14860 devX … 1486086 721 devX … 86721 Flatten the nested columns

32.Transforming Data Cast binary value to val parsedData = rawData .selectExpr("cast (value as string) as json") string .select(from_json("json", schema).as("data")) .select("data.*") Name it column json powerful built-in APIs Parse json string and to perform complex expand into nested data transformations columns, name it data from_json, to_json, explode, ... 100s of functions Flatten the nested columns (see our blog post)

33.Writing to Save parsed data as val query = parsedData.writeStream Parquet table in the given .format("parquet") .partitionBy("date") path .option("checkpointLocation", ...) .start("/parquetTable") Partition files by date so that future queries on time slices of data is fast e.g. query on last 48 hours of data

34.Fault tolerance Enable checkpointing val query = parsedData.writeStream .format("parquet") by setting the .partitionBy("date") checkpoint location for .option("checkpointLocation", ...) .start("/parquetTable") fault tolerance actually starts a start() continuous running StreamingQuery in the Spark cluster

35.Streaming Query StreamingQuery val query = parsedData.writeStream .format("parquet") .partitionBy("date") .option("checkpointLocation", ...) t=1 t=2 t=3 .start("/parquetTable")/") query is a handle to the continuously process process process data data data new new new running StreamingQuery Used to monitor and manage the execution

36.Data Consistency on Ad-hoc Queries complex,ad-hoc seconds! queries on latest data Data available for complex, ad-hoc analytics within seconds Parquet table is updated atomically, ensures prefix integrity Even if distributed, ad-hoc queries will see either all updates from streaming query or none, read more in our blog https://databricks.com/blog/2016/07/28/structured-streaming-in-apache-spark.html

37.Working With Time

38.Event Time Many use cases require aggregate statistics by event time E.g. what's the #errors in each system in the 1 hour windows? Many challenges Extracting event time from data, handling late, out-of-order data DStream APIs were insufficient for event-time processing

39.Event time Aggregations Windowing is just another type of grouping in Structured Streaming parsedData .groupBy(window("timestamp","1 hour")) .count() number of records every hour parsedData avg signal strength of each .groupBy( device in 10 min windows, "device", window("timestamp","10 mins", “5 mins”)) sliding every 5 minutes .avg("signal") Support UDAFs!

40.Stateful Processing for Aggregations t=1 t=2 t=3 Aggregates has to be saved as distributed state between triggers src src src Each trigger reads previous state and new data new data new data process process process writes updated state state state state State stored in memory, backed by write ahead log in HDFS/S3 sink sink sink Fault-tolerant, exactly-once guarantee! state updates are written to write log for checkpointing ahead log

41.Automatically handles Late Data Keeping state 13:00 14:00 15:00 16:00 17:00 allows late data to 12:00 - 1 12:00 - 3 12:00 - 3 12:00 - 5 12:00 - 3 update counts of 13:00 13:00 13:00 - 1 13:00 13:00 - 2 13:00 13:00 - 2 13:00 13:00 - 2 old windows 14:00 14:00 14:00 - 5 14:00 14:00 - 5 14:00 14:00 - 6 15:00 15:00 15:00 15:00 - 4 15:00 - 4 16:00 16:00 But size of the state increases 16:00 - 3 17:00 indefinitely if old windows are not red = state updated dropped with late data

42.Watermarking to limit State Watermark - moving threshold of how late data is expected to be and when to drop old state parsedData .withWatermark("timestamp", "10 minutes") .groupBy(window("timestamp","5 minutes")) .count()

43.Watermarking to limit State event time Watermark - moving threshold of how late data is max event time expected to be and when to 12:30 drop old state trailing gap of 10 mins Trails behind max seen event time watermark 12:20 PM data older than watermark not expected Trailing gap is configurable

44.Watermarking to limit State event time Data newer than watermark may be late, but allowed to aggregate max event time Data older than watermark is allowed late data "too late" and dropped lateness allowed of 10 to mins aggregate Windows older than watermark automatically deleted to limit the watermark data too late, amount of intermediate state dropped

45.Watermarking to limit State parsedData 12:18 .withWatermark("timestamp", "10 minutes") 12:15 minutes")) .groupBy(window("timestamp","5 data is late, but considered in .count() 12:15 12:14 12:13 counts Event Time 12:10 10 min 12:07 12:08 12:08 data too late, 12:05 wm = 12:04 ignored in counts, state 12:04 dropped 12:00 Processing 12:10 12:15 12:20 Time system tracks watermark updated to max observed 12:14 - 10m = 12:04 event time More details in blog post for next trigger, state < 12:04 deleted

46.Clean separation of concerns parsedData Query Semantics .withWatermark("timestamp", "10 minutes") .groupBy(window("timestamp","5 minutes")) .count() separated from .writeStream .trigger("10 seconds") .start() Processing Details

47.Clean separation of concerns parsedData Query Semantics .withWatermark("timestamp", "10 minutes") How to group data by .groupBy(window("timestamp","5 minutes")) time? .count() (same for batch & .writeStream streaming) .trigger("10 seconds") .start() Processing Details

48.Clean separation of concerns parsedData Query Semantics .withWatermark("timestamp", "10 minutes") How to group data by .groupBy(window("timestamp","5 minutes")) time? .count() (same for batch & .writeStream streaming) .trigger("10 seconds") .start() Processing Details How late can data be?

49.Clean separation of concerns parsedData Query Semantics .withWatermark("timestamp", "10 minutes") How to group data by .groupBy(window("timestamp","5 minutes")) time? .count() (same for batch & .writeStream streaming) .trigger("10 seconds") .start() Processing Details How late can data be? How often to emit updates?

50.Arbitrary Stateful Operations [Spark 2.2] ds.groupByKey(_.id) (flat)mapGroupsWithState .mapGroupsWithState allows any user-defined (timeoutConf) (mappingWithStateFunc) stateful function to a user-defined state def mappingWithStateFunc( key: K, Direct support for per-key values: Iterator[V], state: GroupState[S]): U = { timeouts in event-time or // update or remove state processing-time // set timeouts // return mapped value } Supports Scala and Java

51.Alerting Monitor a stream using custom stateful logic with timeouts. val alerts = stream .as[Event] .groupBy(_.id) .flatMapGroupsWithState(Append, GST.ProcessingTimeTimeout) { (id: Int, events: Iterator[Event], state: GroupState[…]) => ... } .writeStream .queryName("alerts") .foreach(new PagerdutySink(credentials))

52.Sessionization Analyze sessions of user/system behavior val sessions = stream .as[Event] .groupBy(_.session_id) .mapGroupsWithState(GroupStateTimeout.EventTimeTimeout) { (id: Int, events: Iterator[Event], state: GroupState[…]) => ... } .writeStream .parquet("/user/sessions")

53.Sneak-peek into the future

54.Stream-stream joins [Spark 2.3] val clickStream = spark.readStream ● Can join two streams ... together .select(‘clickImpressionId, ‘timestamp as “clickTS”, ...) ● State of such operation would grow indefinitely... val impressionsStream = spark.readStream ... .select(‘impressionId, ‘timestamp as “impressionTS”, …) impressionsStream.join(clickStream, expr(“clickImpressionId = impressionId”))

55.Stream-stream joins [Spark 2.3] val clickStream = spark.readStream ● Can join two streams ... together .select(‘clickImpressionId, ‘timestamp as “clickTS”, ...) ● Watermarking limits how .withWatermark(‘clickTS, “10 minutes”) late the data can come come val impressionsStream = spark.readStream ... .select(‘impressionId, ● Join condition limits how ‘timestamp as “impressionTS”, …) late we expect a click to .withWatermark(‘impressionTS, “10 minutes”) happen after an impression impressionsStream.join(clickStream, expr(“clickImpressionId = impressionId AND” + “clickTS BETWEEN impressionTS AND” + “impressionTS + interval 10 minutes”))

56.Stream-stream joins [Spark 2.3] val clickStream = spark.readStream ● Can join two streams ... together .select(‘clickImpressionId, ‘timestamp as “clickTS”, ...) ● With watermarking and join .withWatermark(‘clickTS, “10 minutes”) condition limiting when a match could come, outer val impressionsStream = spark.readStream ... joins are possible .select(‘impressionId, ‘timestamp as “impressionTS”, …) .withWatermark(‘impressionTS, “10 minutes”) impressionsStream.join(clickStream, expr(“clickImpressionId = impressionId AND” + “clickTS BETWEEN impressionTS AND” + “impressionTS + interval 10 minutes”), “leftouter”)

57.Continuous processing [Spark 2.3] A new execution mode that allows fully pipelined execution – Streaming execution without microbatches t=1 t=2 t=3 – Supports async checkpointing and ~1ms latency – No changes required to user code new files new files new files process process process Tracked in SPARK-20928

58.More Info Structured Streaming Programming Guide http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html Anthology of Databricks blog posts and talks about structured streaming: https://databricks.com/blog/2017/08/24/anthology-of-technical-assets-on-apache-sparks-structured- streaming.html

59.Try Apache Spark in Databricks! UNIFIED ANALYTICS PLATFORM • Collaborative cloud environment • Free version (community edition) DATABRICKS RUNTIME Try for free today • Apache Spark - optimized for the databricks.com cloud • Caching and optimization layer - DBIO • Enterprise security - DBES

60.https://spark-summit.org/eu-2017/ Discount code: Databricks

61.Pre-Spark Summit, Dublin

相关Slides

  • 讲解了Facebook在spark shuffle方面的优化,相关论文为 EuroSys ’18: Riffle: Optimized Shuffle Service for Large-Scale Data Analytics

  • Hive作为数据仓库的核心,其元数据管理已经成为大数据领域事实上的标准,各种大数据处理引擎都尝试对其兼容,本文描述社区如何讲Hive服务以及Hive MetaStore服务独立处理,并支持各种权限验证功能。

  • Spark 流式有两套系统:Spark Streaming 和 Structured Streaming。那么这两套系统的区别在哪里呢?以及为什么 Spark 有了 Spark Streaming 还有做 Structured Streaming 呢?我们应该如何去选择呢?

  • MLSQL的文档自助系统 更多信息访问官网: http://www.mlsql.tech