Simplifying Change Data Capture using Databricks Delta

In this talk, we will present recent enhancements to the techniques previously discussed in this blog: https://databricks.com/blog/2018/10/29/simplifying-change-data-capture-with-databricks-delta.html. We will start by discussing the different CDC architectures that can be deployed in concert with Databricks Delta. We will then use notebooks to demonstrate updated CDC SQL and look at performance tuning considerations for both batch as well as streaming CDC pipelines into Delta.
展开查看详情

1.WIFI SSID:SparkAISummit | Password: UnifiedAnalytics

2. Simplifying Change Data Capture Using Delta Lakes Ameet Kini, Databricks April 24, 2019 #UnifiedAnalytics #SparkAISummit

3.About Me Current: Regional Manager (Federal) of Resident Architects @ Databricks … A while ago • OS Geo on Spark 0.6 • Almost spoke at Spark Summit 2014 … A while while ago • MapReduce @ Google (in 2006 pre Hadoop) • Kernel Dev @ Oracle …

4.Outline • Journey through evolution of CDC in Databricks – Pretty architecture diagrams • Understand what goes behind the scenes – “Pretty” SQL Query plans J • Preview of key upcoming features #UnifiedAnalytics #SparkAISummit 4

5.Change Data Capture What: Collect and Merge changes From: One or more sources To: One or more destinations #UnifiedAnalytics #SparkAISummit 5

6.Historically… #UnifiedAnalytics #SparkAISummit 6

7.CDC with Databricks circa 2017 #UnifiedAnalytics #SparkAISummit 7

8.What worked and what did not? Did not work Worked • No $$$ savings or EDW compute offload • Least Disruptive adding Databricks to existing stack • EDW overloaded, which added constraints on when S3 • Easy to get started with spark.read.jdbc refresh jobs could be scheduled • Refresh rates are at best nightly due to concurrent read / write limitations of vanilla Parquet #UnifiedAnalytics #SparkAISummit 8

9.Delta simplifies the stack… #UnifiedAnalytics #SparkAISummit 9

10. With Delta circa 2018 Every refresh period, run these two 1. INSERT into staging table 2. INSERT OVERWRITE modified partitions of final table Oracle CDC Tables captured using database triggers See https://databricks.com/blog/2018/10/29/simplifying-change-data-capture-with-databricks-delta.html #UnifiedAnalytics #SparkAISummit 10

11.What worked and what did not? Worked Did not work • Delta removed dependency on EDW for CDC • Scheme relied on effective partitioning to minimize • Refresh rates went from nightly to sub-hourly updates, requires domain specific knowledge • Easy to scale to multiple pipelines using features like • Where there is no effective partitioning, Step 2 is notebook workflows and jobs effectively overwriting most of table…S..L..O..W See https://databricks.com/blog/2018/10/29/simplifying-change-data-capture-with-databricks-delta.html #UnifiedAnalytics #SparkAISummit 11

12. Efficient Upserts in Delta Expanded syntax of MERGE INTO introduced in Databricks Runtime 5.1 Target Table Source Table MERGE INTO users USING changes ON users.userId = changes.userId WHEN MATCHED AND FLAG=’D’ THEN DELETE WHEN MATCHED AND FLAG<>’D’ Deletes A single command THEN UPDATE address = changes.addresses to process all Updates WHEN NOT MATCHED three action types THEN INSERT (userId, address) Inserts VALUES (changes.userId, changes.address) See https://databricks.com/blog/2019/03/19/efficient-upserts-into-data-lakes-databricks-delta.html #UnifiedAnalytics #SparkAISummit 12

13.Works for Streaming and Batch streamingSessionUpdatesDF.writeStream.foreachBatch { (microBatchOutputDF: DataFrame, batchId: Long) => microBatchOutputDF.createOrReplaceTempView("updates") microBatchOutputDF.sparkSession.sql(s""" MERGE INTO users USING changes ON users.userId = changes.userId WHEN MATCHED AND FLAG=’D’ THEN DELETE WHEN MATCHED AND FLAG<>’D’ THEN UPDATE address = changes.addresses WHEN NOT MATCHED THEN INSERT (userId, address) VALUES (changes.userId, changes.address) """) }.start() See https://databricks.com/blog/2019/03/19/efficient-upserts-into-data-lakes-databricks-delta.html #UnifiedAnalytics #SparkAISummit 13

14.Used by MySQL Replication What: CDC MySQL tables into Delta From: MySQL tables in binlog format To: Delta Public Preview in DBR-5.3 https://docs.databricks.com/delta/mysql-delta.html#mysql-delta #UnifiedAnalytics #SparkAISummit 14

15. With Delta Now Every refresh period, run these two 1. INSERT into staging table Every refresh period, MERGE 2. INSERT changes into OVERWRITE table modified partitions of final table Oracle CDC Tables captured using database triggers #UnifiedAnalytics #SparkAISummit 15

16.Visually Users Updates Old Files New Files 1 2 3 10 Partition 1 Files with ”Insert” Records Files with ”Update” Records 4 5 6 11 Partition 2 Files with ”Delete” Records 7 8 9 12 Partition 3 Delta marks these files stale and eligible for vacuum #UnifiedAnalytics #SparkAISummit 16

17.Outline • Journey through evolution of CDC in Databricks – Pretty architecture diagrams • Understand what goes behind the scenes – “Pretty” SQL Query plans J • Preview of key upcoming features #UnifiedAnalytics #SparkAISummit 17

18.A deep dive into MERGE #UnifiedAnalytics #SparkAISummit 18

19.A tale of two joins MERGE runs two joins • Inner Join – Between Source and Target – Goal: find files that need to be modified (e.g., files 2, 4, 8) • Outer Join – Between Source and subset-of-files-identified-by-Inner-Join – Goal: write out modified and unmodified data together (e.g., ”New Files”) #UnifiedAnalytics #SparkAISummit 19

20.Say, if you run this… Merging a 1000-row source into a 100 million row target, using TPC-DS #UnifiedAnalytics #SparkAISummit 20

21.The two joins under the hood… Runs two joins • Inner Join – Between Source and Target – Goal: find files that need to be modified (e.g., files 2, 4, 8) • Outer Join – Between Source and subset-of-files-identified-by-Inner-Join – Goal: write out modified and unmodified data together (e.g., ”New Files”) Outer Join takes 32s Inner Join takes 7s #UnifiedAnalytics #SparkAISummit 21

22.Let’s peek inside the inner join Optimizer picks Broadcast Hash Join Suitable choice when joining small table (source) with large (target) #UnifiedAnalytics #SparkAISummit 22

23.But what if it picks Sort Merge instead? Same 7s inner join now takes 16s … 2x slower! #UnifiedAnalytics #SparkAISummit 23

24.This is what Sort Merge looks like #UnifiedAnalytics #SparkAISummit 24

25.Inner Join Summary If |source| << |target|, nudge optimizer into picking broadcast hash join • Ensure stats are collected on join columns • Increase spark.sql.autoBroadcastJoinThreshold appropriately (default: 10MB) • Use optimizer hints (with joins, does not apply to MERGE) SELECT /*+ BROADCAST(source) */ ... #UnifiedAnalytics #SparkAISummit 25

26.Next: Outer Join Runs two joins • Inner Join – Between Source and Target – Goal: find files that need to be modified (e.g., files 2, 4, 8) • Outer Join – Between Source and subset-of-files-identified-by-Inner-Join – Goal: write out modified and unmodified data together (e.g., ”New Files”) S3 writes Outer Join takes 32s Inner Join takes 7s #UnifiedAnalytics #SparkAISummit 26

27.Outer Join latency tied to… …amount of data re-written (gray boxes below), S3 writes are slow Here we’re writing 3 new files #UnifiedAnalytics #SparkAISummit 27

28.Let’s see the numbers… Target table store_sales_100m • 100 million rows • Compacted into 5 parquet files of 1G each (OPTIMIZE ameet.store_sales_100m) Source table • 1000 rows • Drawn from 1, 3, and all 5 files #UnifiedAnalytics #SparkAISummit 28

29.Outer Join is write-bound 100 90 Key take-aways 80 • Outer Join time is directly tied to amount of Time (seconds) 70 60 data written 50 84 40 66 • Inner Join time is a small proportion of overall 30 time and does not change as amount of data 32 20 written increases 10 7 6 7 0 1x1GB 3x1GB 5x1GB # of files modified Inner Join Outer Join #UnifiedAnalytics #SparkAISummit 29