Solving sessionization problem with Apache Spark batch and streaming Processing

展开查看详情

1.Solving sessionization problem with Apache Spark batch and streaming processing Bartosz Konieczny @waitingforcode1

2.About me Bartosz Konieczny #dataEngineer #ApacheSparkEnthusiast #AWSuser #waitingforcode.com #becomedataengineer.com #@waitingforcode #github.com/bartosz25 #canalplus #Paris 2

3.3

4.Sessions "user activity followed by a closing action or a period of inactivity" 4

5.© https://pixabay.com/users/maxmann-665103/ from https://pixabay.com 5

6.Batch architecture <triggers> sessions orchestrator data producer generator input logs (streaming broker) previous output input logs window raw sync consumer sessions (DFS) sessions (DFS) (DFS) 6

7.Streaming architecture output sessions data producer sessions generator (DFS) input logs (streaming broker) <uses> metadata state checkpoint location 7

8.Batch implementation

9. The code val previousSessions = loadPreviousWindowSessions(sparkSession, previousSessionsDir) val sessionsInWindow = sparkSession.read.schema(Visit.Schema) .json(inputDir) val joinedData = previousSessions.join(sessionsInWindow, sessionsInWindow("user_id") === previousSessions("userId"), "fullouter") .groupByKey(log => SessionGeneration.resolveGroupByKey(log)) .flatMapGroups(SessionGeneration.generate(TimeUnit.MINUTES.toMillis(5), windowUpperBound)).cache() joinedData.filter("isActive = true").write.mode(SaveMode.Overwrite).json(outputDir) joinedData.filter(state => !state.isActive) .flatMap(state => state.toSessionOutputState) .coalesce(50).write.mode(SaveMode.Overwrite) .option("compression", "gzip") .json(outputDir) 9

10. Full outer join val previousSessions = loadPreviousWindowSessions(sparkSession, previousSessionsDir) processing logic val sessionsInWindow = sparkSession.read.schema(Visit.Schema) .json(inputDir) val joinedData = previousSessions.join(sessionsInWindow, sessionsInWindow("user_id") === previousSessions("userId"), "fullouter") .groupByKey(log => SessionGeneration.resolveGroupByKey(log)) full outer join .flatMapGroups(SessionGeneration.generate(TimeUnit.MINUTES.toMillis(5), windowUpperBound)) joinedData.filter("isActive = true").write.mode(SaveMode.Overwrite).json(outputDir) joinedData.filter(state => !state.isActive) previous .flatMap(state => state.toSessionOutputState) window new input .coalesce(50).write.mode(SaveMode.Overwrite) active logs .option("compression", "gzip") sessions .json(outputDir) 10

11. Watermark simulation val previousSessions = loadPreviousWindowSessions(sparkSession, previousSessionsDir) val sessionsInWindow = sparkSession.read.schema(Visit.Schema) .json(inputDir) val joinedData = previousSessions.join(sessionsInWindow, sessionsInWindow("user_id") === previousSessions("userId"), "fullouter") .groupByKey(log => SessionGeneration.resolveGroupByKey(log)) .flatMapGroups(SessionGeneration.generate(TimeUnit.MINUTES.toMillis(5), windowUpperBound)) joinedData.filter("isActive = true").write.mode(SaveMode.Overwrite).json(outputDir) joinedData.filter(state => !state.isActive) .flatMap(state => state.toSessionOutputState) case class SessionIntermediaryState(userId: .coalesce(50).write.mode(SaveMode.Overwrite) Long, … expirationTimeMillisUtc: Long, .option("compression", "gzip") isActive: Boolean) .json(outputDir) 11

12. Save modes val previousSessions = loadPreviousWindowSessions(sparkSession, SaveMode.Append ⇒ previousSessionsDir) duplicates & invalid results val sessionsInWindow = sparkSession.read.schema(Visit.Schema) (e.g. multiplied revenue!) .json(inputDir) SaveMode.ErrorIfExists ⇒ val joinedData = previousSessions.join(sessionsInWindow, failures & maintenance sessionsInWindow("user_id") === previousSessions("userId"), "fullouter") .groupByKey(log => SessionGeneration.resolveGroupByKey(log)) burden .flatMapGroups(SessionGeneration.generate(TimeUnit.MINUTES.toMillis(5), SaveMode.Ignore ⇒ no windowUpperBound)) data & old data present in joinedData.filter("isActive = true").write.mode(SaveMode.Overwrite).json(outputDir) case of reprocessing joinedData.filter(state => !state.isActive) .flatMap(state => state.toSessionOutputState) SaveMode.Overwrite ⇒ .coalesce(50).write.mode(SaveMode.Overwrite) always fresh data & easy .option("compression", "gzip") .json(outputDir) maintenance 12

13.Streaming implementation

14. The code val dataFrame = sparkSession.readStream .format("kafka") .option("kafka.bootstrap.servers", kafkaConfiguration.broker).option(...) .load() val query = dataFrame.selectExpr("CAST(value AS STRING)") .select(functions.from_json($"value", Visit.Schema).as("data")) .select($"data.*").withWatermark("event_time", "3 minutes") watermark - late events & state .groupByKey(row => row.getAs[Long]("user_id")) expiration .mapGroupsWithState(GroupStateTimeout.EventTimeTimeout()) stateful processing - sessions (mapStreamingLogsToSessions(sessionTimeout)) generation val writeQuery = query.writeStream.outputMode(OutputMode.Update()) .option("checkpointLocation", s"s3://my-checkpoint-bucket") checkpoint - fault-tolerance .foreachBatch((dataset: Dataset[SessionIntermediaryState], batchId: Long) => { BatchWriter.writeDataset(dataset, s"${outputDir}/${batchId}") }) 14

15. Checkpoint - fault-tolerance val writeQuery = query.writeStream.outputMode(OutputMode.Update()) .option("checkpointLocation", s"s3://sessionization-demo/checkpoint") .foreachBatch((dataset: Dataset[SessionIntermediaryState], load offsets batchId: Long) => { write BatchWriter.writeDataset(dataset, load state s"${outputDir}/${batchId}") to process & process data processed write state }) for t0 query write them offsets .start() for t1 query state store offset log commit log checkpoint location 15

16.Checkpoint - fault-tolerance confirm load offsets commit state processed to process & load state process data t2 offsets & write them for t1 query partition-based next for t1 query watermark state store offset log commit log checkpoint location 16

17. Stateful processing def mapStreamingLogsToSessions(timeoutDurationMs: Long)(key: Long, logs: Iterator[Row], write update currentState: GroupState[SessionIntermediaryState]): SessionIntermediaryState finalize file ={ if (currentState.hasTimedOut) { get make snapshot val expiredState = currentState.get update .expire currentState.remove()remove expiredState recover state put,remove get } else { val newState = currentState.getOption.map(state => state.updateWithNewLogs(logs, timeoutDurationMs)) .getOrElse(SessionIntermediaryState.createNew(logs, timeoutDurationMs)) currentState.update(newState) currentState.setTimeoutTimestamp(currentState.getCurrentWatermarkMs() + timeoutDurationMs) currentState.get } } 17

18. Stateful processing - write update - finalize file - make snapshot .mapGroupsWithState(...) get update state store 1.delta remove recover state put,remove get 2.delta TreeMap[Long, ConcurrentHashMap[UnsafeRow, UnsafeRow] 3.snapshot ] in-memory storage for the most recent versions checkpoint location 18

19. Watermark val sessionTimeout = TimeUnit.MINUTES.toMillis(5) val query = dataFrame.selectExpr("CAST(value AS STRING)") .select(functions.from_json($"value", Visit.Schema).as("data")) .select($"data.*") .withWatermark("event_time", "3 minutes") .groupByKey(row => row.getAs[Long]("user_id")) .mapGroupsWithState(GroupStateTimeout.EventTimeTimeout()) (Mapping.mapStreamingLogsToSessions(sessionTimeout)) 19

20.Watermark - late events .mapGroupsWithState(...) on-time event late event 20

21. Watermark - expired state State representation [simplified] {value, TTL configuration} Algorithm: 1. Update all states with new data → eventually extend TTL 2. Retrieve TTL configuration for the query → here: watermark 3. Retrieve all states that expired → no new data in this query & TTL expired 4. Call mapGroupsWithState on it with hasTimedOut param = true & no new data (Iterator.empty) // full implementation: org.apache.spark.sql.execution.streaming.FlatMapGroupsWithStateExec.InputProcessor 21

22.Data reprocessing

23.Batch

24.reschedule your job © https://pics.me.me/just-one-click-and-the-zoo-is-mine-8769663.png

25.Streaming

26.

27.State store 1. Restored state is the most recent snapshot 1.delta 2.delta 3.snapshot 2. Restored state is not the most recent snapshot but a snapshot exists 1.delta 2.delta 3.snapshot 4.delta 3. Restored state is not the most recent snapshot and a snapshot doesn't exist 1.delta 2.delta 3.delta 4.delta 27

28. State store configuration spark.sql.streaming.stateStore: → .minDeltasForSnapshot → .maintenanceInterval spark.sql.streaming: → .maxBatchesToRetainInMemory 28

29. Checkpoint configuration spark.sql.streaming.minBatchesToRetain 29