The Internals of Stateful Stream Processing in Spark Structured Streaming

Let’s talk about state management in Spark Structured Streaming. During this talk you will learn the streaming concepts that are particularly relevant for stateful stream processing in Structured Streaming, e.g. watermark and output modes, but also GroupState and GroupStateTimeout. We will be exploring simple stateful processing (with groupBy operator) and more advanced use cases with KeyValueGroupedDataset.mapGroupsWithState and the most advanced KeyValueGroupedDataset.flatMapGroupsWithState operator. In other words, you will learn how to use the stateful streaming API and understand the internals.

展开查看详情

1.The Internals of Stateful Stream Processing in Spark Structured Streaming © Jacek Laskowski / @JacekLaskowski / jacek@japila.pl

2.Jacek Laskowski ● Freelance IT consultant ● Specializing in Spark, Kafka, Kafka Streams, Scala ● Development | Consulting | Training | Speaking ● "The Internals Of" online books ● Among contributors to Apache Spark ● Among Confluent Community Catalyst (Class of 2019 - 2020) ● Contact me at jacek@japila.pl ● Follow @JacekLaskowski on twitter for more #ApacheSpark #ApacheKafka #KafkaStreams

3.Friendly reminder Pictures...take a lot of pictures! 📷 © Jacek Laskowski / @JacekLaskowski / jacek@japila.pl

4.Why Should You Care? 1. In case of troubles in production, everything counts 😎 © Jacek Laskowski / @JacekLaskowski / jacek@japila.pl

5.Stateful Stream Processing 1. Stateful Stream Processing is a stream processing with state 2. State is simply a collection of keys and their current values 3. State can be explicit (available to a developer) or implicit (internal) 4. In Spark Structured Streaming, a streaming query is stateful when is one of the following: a. Streaming Aggregation b. Arbitrary Stateful Streaming Aggregation c. Stream-Stream Join d. Streaming Deduplication e. Streaming Limit 5. Read up on Stateful Stream Processing in The Internals of Spark Structured Streaming online book © Jacek Laskowski / @JacekLaskowski / jacek@japila.pl

6.StateStore 1. StateStore is the abstraction of key-value stores for managing state in Stateful Stream Processing a. abort b. commit c. get d. getRange e. id f. iterator g. metrics h. put i. remove j. version 2. Identified by operator and partition IDs © Jacek Laskowski / @JacekLaskowski / jacek@japila.pl

7.HDFSBackedStateStore 1. HDFSBackedStateStore is a concrete StateStore that uses a Hadoop DFS-compatible file system for versioned state persistence 2. The default and only known implementation of StateStore 3. Created when StateStore utility is requested to retrieve the StateStore for a given ID and version (via HDFSBackedStateStoreProvider) © Jacek Laskowski / @JacekLaskowski / jacek@japila.pl

8.StateStoreProvider 1. StateStoreProvider is the abstraction of StateStore providers that manage StateStores in Stateful Stream Processing a. getStore(version: Long): StateStore b. init c. stateStoreId d. supportedCustomMetrics 2. spark.sql.streaming.stateStore.providerClass internal configuration property a. Fully-qualified class name of a StateStoreProvider b. Default: HDFSBackedStateStoreProvider 3. Identified by operator and partition IDs © Jacek Laskowski / @JacekLaskowski / jacek@japila.pl

9.HDFSBackedStateStoreProvider 1. HDFSBackedStateStoreProvider is a concrete StateStoreProvider that uses a Hadoop DFS-compatible file system for versioned state checkpointing 2. The default and only known implementation of StateStoreProvider a. spark.sql.streaming.stateStore.providerClass internal configuration property 3. HDFSStateStoreProvider uses HDFSBackedStateStores to manage state (one per state version) 4. Manages versioned compressed state in delta and snapshot files a. Uses cache internally for faster access to state versions b. Periodically “compresses” delta files into snapshots © Jacek Laskowski / @JacekLaskowski / jacek@japila.pl

10.StateStoreCoordinator 1. StateStoreCoordinator keeps track of state stores on Spark executors (per host and executor ID) 2. ThreadSafeRpcEndpoint RPC endpoint a. ReportActiveInstance b. GetLocation c. DeactivateInstances 3. Used by StateStoreRDD for the location preferences of partitions (based on the location of the stores) © Jacek Laskowski / @JacekLaskowski / jacek@japila.pl

11.StateStoreRDD 1. StateStoreRDD is an RDD that represents (part of) a stateful streaming query a. Single micro-batch actually 2. Executing storeUpdateFunction with the StateStore and data per stateful physical operator and partition IDs a. FlatMapGroupsWithStateExec b. StateStoreRestoreExec c. StateStoreSaveExec d. StreamingDeduplicateExec e. StreamingGlobalLimitExec 3. StreamingQuery.explain 4. Uses StateStoreCoordinator for the preferred locations of a partition for job scheduling © Jacek Laskowski / @JacekLaskowski / jacek@japila.pl

12.Streaming Aggregation (1 of 2) 1. In Spark Structured Streaming, streaming aggregation is a streaming query that was described (build) using the following high-level streaming operators: a. Dataset.groupBy, Dataset.rollup, Dataset.cube (RelationalGroupedDataset) b. Dataset.groupByKey (KeyValueGroupedDataset) c. SQL’s GROUP BY clause (including WITH CUBE and WITH ROLLUP) 2. High-level operators create a logical plan with one or more Aggregate logical operators a. Similarly to good ol’ aggregations in Spark SQL 3. IncrementalExecution uses StatefulAggregationStrategy execution planning strategy for planning streaming aggregations a. StateStoreRestoreExec and StateStoreSaveExec physical operators © Jacek Laskowski / @JacekLaskowski / jacek@japila.pl

13. Streaming Aggregation (2 of 2) 👉 IncrementalExecution — QueryExecution of Streaming Queries © Jacek Laskowski / @JacekLaskowski / jacek@japila.pl

14.Stream-Stream Join (1 of 2) 1. In Spark Structured Streaming, streaming join is a streaming query that was described (build) using the following high-level streaming operators: a. Dataset.join b. SQL’s JOIN clause 2. High-level operators create a logical plan with one or more Join logical operators a. Similarly to good ol’ aggregations in Spark SQL 3. IncrementalExecution uses StreamingJoinStrategy execution planning strategy for planning streaming joins a. StreamingSymmetricHashJoinExec physical operator © Jacek Laskowski / @JacekLaskowski / jacek@japila.pl

15.Stream-Stream Join (2 of 2) © Jacek Laskowski / @JacekLaskowski / jacek@japila.pl

16.“The Internals Of” Online Books 1. The Internals of Spark SQL 2. The Internals of Spark Structured Streaming 3. The Internals of Apache Spark

17.Questions? 1. Follow @jaceklaskowski on twitter (DMs open) 2. Upvote my questions and answers on StackOverflow 3. Contact me at jacek@japila.pl 4. Connect with me at LinkedIn © Jacek Laskowski / @JacekLaskowski / jacek@japila.pl