Productizing Structured Streaming Jobs

“Structured Streaming was a new streaming API introduced to Spark over 2 years ago in Spark 2.0, and was announced GA as of Spark 2.2. Databricks customers have processed over a hundred trillion rows in production using Structured Streaming. We received dozens of questions on how to best develop, monitor, test, deploy and upgrade these jobs. In this talk, we aim to share best practices around what has worked and what hasn’t across our customer base. We will tackle questions around how to plan ahead, what kind of code changes are safe for structured streaming jobs, how to architect streaming pipelines which can give you the most flexibility without sacrificing performance by using tools like Databricks Delta, how to best monitor your streaming jobs and alert if your streams are falling behind or are actually failing, as well as how to best test your code.”
展开查看详情

1.Productizing Structured Streaming Jobs Burak Yavuz April 24, 2019 – SAIS 2019 San Francisco

2.Who am I ● Software Engineer – Databricks - “We make your streams come true” ● Apache Spark Committer ● MS in Management Science & Engineering - Stanford University ● BS in Mechanical Engineering - Bogazici University, Istanbul

3.Writing code is fun… … is that all we do?

4.Image from: https://www.smartsheet.com/sites/default/files/IC-Software-Development-Life-Cycle.jpg

5.Let’s look at the operational aspects of data pipelines

6.Agenda How to • Test • Monitor • Deploy • Update Structured Streaming Jobs

7. Structured Streaming stream processing on Spark SQL engine fast, scalable, fault-tolerant rich, unified, high level APIs deal with complex data and complex workloads rich ecosystem of data sources integrate with many storage systems

8.Structured Streaming @ 1000s of customer streaming apps in production on Databricks 1000+ trillions of rows processed in production

9.Streaming word count

10.Anatomy of a Streaming Query spark.readStream .format("kafka") Source .option("subscribe", "input") .load() • Specify one or more locations .groupBy($"value".cast("string")) to read data from .count() .writeStream • Built in support for .format("kafka") .option("topic", "output") Files/Kafka/Socket, .trigger("1 minute") pluggable. .outputMode(OutputMode.Complete()) .option("checkpointLocation", "…") • Can include multiple sources .start() of different types using union()

11.Anatomy of a Streaming Query spark.readStream .format("kafka") Transformation .option("subscribe", "input") .load() • Using DataFrames, .groupBy('value.cast("string") as 'key) .agg(count("*") as 'value) Datasets and/or SQL. .writeStream .format("kafka") • Catalyst figures out how to .option("topic", "output") execute the transformation .trigger("1 minute") .outputMode(OutputMode.Complete()) incrementally. .option("checkpointLocation", "…") .start() • Internal processing always exactly-once.

12.Anatomy of a Streaming Query spark.readStream .format("kafka") Sink .option("subscribe", "input") .load() • Accepts the output of each .groupBy('value.cast("string") as 'key) .agg(count("*") as 'value) batch. .writeStream .format("kafka") • When supported sinks are .option("topic", "output") transactional and exactly .trigger("1 minute") .outputMode(OutputMode.Complete()) once (Files). .option("checkpointLocation", "…") .start() • Use foreach to execute arbitrary code.

13.Anatomy of a Streaming Query spark.readStream .format("kafka") Output mode – What's output .option("subscribe", "input") .load() • Complete – Output the whole answer .groupBy('value.cast("string") as 'key) every time .agg(count("*") as 'value) • Update – Output changed rows .writeStream .format("kafka") • Append – Output new rows only .option("topic", "output") .trigger("1 minute") Trigger – When to output .outputMode("update") .option("checkpointLocation", "…") • Specified as a time, eventually .start() supports data size • No trigger means as fast as possible

14.Anatomy of a Streaming Query spark.readStream .format("kafka") Checkpoint .option("subscribe", "input") .load() • Tracks the progress of a .groupBy('value.cast("string") as 'key) .agg(count("*") as 'value) query in persistent storage .writeStream .format("kafka") • Can be used to restart the .option("topic", "output") query if there is a failure. .trigger("1 minute") .outputMode("update") .option("checkpointLocation", "…") .start()

15.Reference Architecture

16.Data Pipelines @ Databricks Streaming Analytics Reporting Event Based Bronze Tables Silver Tables Gold Tables

17.Event Based File Sources • Launched Structured Streaming connectors: AWS SQS • s3-sqs on AWS (DBR 3.5) AWS S3 • abs-aqs on Azure (DBR 5.0) • As blobs are generated: • Events are published to SQS/AQS • Spark reads these events • Then reads original files from blob storage system Azure Blob Storage Event Grid Queue Storage

18.Properties of Bronze/Silver/Gold • Bronze tables • No data processing • Deduplication + JSON => Parquet conversion • Data kept around for a couple weeks in order to fix mistakes just in case • Silver tables • Tens/Hundreds of tables • Directly queryable tables • PII masking/redaction • Gold tables • Materialized views of silver tables • Curated tables by the Data Science team

19.Why this Architecture? • Maximize Flexibility • Maximize Scalability • Lower Costs

20. See TD’s talk: “Designing Structured Streaming Pipelines—How to Architect Things Right” April 25 2:40pm – Streaming Track

21.Testing

22.Testing spark.readStream - How do we test this .format("kafka") code? .option("subscribe", "input") .load() - Do we need to set up .groupBy('value.cast("string") as 'key) Kafka? .agg(count("*") as 'value) .writeStream - How do we verify .format("kafka") result correctness? .option("topic", "output") .trigger("1 minute") .outputMode("update") .option("checkpointLocation", "…") .start()

23.Testing Strategy 1: Don’t care about sources and sinks. Just test your business logic, using batch DataFrames .groupBy('value.cast("string") as 'key) .agg(count("*") as 'value) Pros: Cons: - Easy to do in - Not all batch operations Scala/Python are supported in Streaming

24.Testing Strategy 2: Leverage the StreamTest test harness available in Apache Spark val inputData = MemoryStream[Array[Byte]] val stream = inputData.toDS().toDF("value") .groupBy('value.cast("string") as 'key) .agg(count("*") as 'value) testStream(stream, OutputMode.Update)( AddData(inputData, "a".getBytes(), "b".getBytes()), CheckAnswer(("a" -> 1), ("b" -> 1)) )

25.Testing Strategy 2: Leverage the StreamTest test harness available in Apache Spark val inputData = MemoryStream[Array[Byte]] Source is in val stream = inputData.toDS().toDF("value") memory .groupBy('value.cast("string") as 'key) Schema can be set .agg(count("*") as 'value) arbitrarily to mimic real source testStream(stream, OutputMode.Update)( AddData(inputData, "a".getBytes(), "b".getBytes()), CheckAnswer(("a" -> 1), ("b" -> 1)) )

26.Testing Strategy 2: Leverage the StreamTest test harness available in Apache Spark val inputData = MemoryStream[Array[Byte]] val stream = inputData.toDS().toDF("value") .groupBy('value.cast("string") as 'key) Transformation .agg(count("*") as 'value) unchanged. testStream(stream, OutputMode.Update)( AddData(inputData, "a".getBytes(), "b".getBytes()), CheckAnswer(("a" -> 1), ("b" -> 1)) )

27.Testing Strategy 2: Leverage the StreamTest test harness available in Apache Spark Starts a stream outputting testStream(stream, OutputMode.Update)( data to a memory sink AddData(inputData, ...), CheckAnswer(("a" -> 1), ("b" -> 1)) )

28.Testing Strategy 2: Leverage the StreamTest test harness available in Apache Spark testStream(stream, OutputMode.Update)( Add data to AddData(inputData, "a".getBytes(), "b".getBytes()), the CheckAnswer(("a" -> 1), ("b" -> 1)) source )

29.Testing Strategy 2: Leverage the StreamTest test harness available in Apache Spark testStream(stream, OutputMode.Update)( AddData(inputData, "a".getBytes(), "b".getBytes()), Process all data and CheckAnswer(("a" -> 1), ("b" -> 1)) check result )