Optimizing Delta/Parquet Data Lakes for Apache Spark

This talk outlines data lake design patterns that can yield massive performance gains for all downstream consumers. We will talk about how to optimize Parquet data lakes and the awesome additional features provided by Databricks Delta. * Optimal file sizes in a data lake * File compaction to fix the small file problem * Why Spark hates globbing S3 files * Partitioning data lakes with partitionBy * Parquet predicate pushdown filtering * Limitations of Parquet data lakes (files aren’t mutable!) * Mutating Delta lakes * Data skipping with Delta ZORDER indexes
展开查看详情

1.WIFI SSID:SparkAISummit | Password: UnifiedAnalytics

2.Optimizing data lakes for Apache Spark Matthew Powers, Prognos #UnifiedAnalytics #SparkAISummit

3.About But what about those poor data scientists that work with gzipped CSV lakes 😱 !3

4.What you will get from this talk… • Motivation to write Spark open source code • Practical knowledge to build better data lakes !4

5.Agenda • Community goals • Spark open source • Modern Scala libs • Parquet lakes • Incremental updates & small files • Partitioned lakes • Delta lakes !5

6. Loved by most Dreaded by some Source: 2019 Stackoverflow survey !6

7.Community goals • Passionate about community unification (standardization of method signatures) • Need to find optimal scalafmt settings • Strongly dislike UDFs • Spark tooling? !7

8.Spark helper libraries spark-daria (Scala) quinn (PySpark) !8

9.spark-fast-tests / chispa !9

10.spark-style-guide !10

11.Modern Scala libs uTest Mill Build Tool !11

12.Prognos data lakes Prognos AI platform to predict disease Apache Spark Other tech Data lake 1 Data lake 2 Data lake 3 !12

13.TL;DR • 1 GB files • No nested directories !13

14.Small file problem • Incrementally updating a lake will create a lot of small files • We can store data like this so it’s easy to compact !14

15.Suppose we have a CSV data lake • CSV data lake is constantly being updated • Want to convert it to a Parquet data lake • Want incremental updates every hour !15

16.CSV => Parquet !16

17.Compacting small files 10,000 incremental files and 166GB of data !17

18.Access data lake !18

19.!19

20.Why partition data lakes? • Data skipping • Massively improve query performance • I’ve seen queries run 50-100 times faster on partitioned lakes !20

21.Sample data !21

22.Filtering unpartitioned lake == Physical Plan == Project [first_name#12, last_name#13, country#14] +- Filter (((isnotnull(country#14) && isnotnull(first_name#12)) && (country#14 = Russia)) && StartsWith(first_name#12, M)) +- FileScan csv [first_name#12,last_name#13,country#14] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/powers/Documents/tmp/blog_data/people.csv], PartitionFilters: [], PushedFilters: [IsNotNull(country), IsNotNull(first_name), EqualTo(country,Russia), StringStartsWith(first_name,M)], ReadSchema: struct !22

23.Partitioning the data lake !23

24.Partitioned lake on disk !24

25.Filtering Partitioned data lake == Physical Plan == Project [first_name#74, last_name#75, country#76] +- Filter (isnotnull(first_name#74) && StartsWith(first_name#74, M)) +- FileScan csv [first_name#74,last_name#75,country#76] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/powers/Documents/tmp/blog_data/partitioned_lake], PartitionCount: 1, PartitionFilters: [isnotnull(country#76), (country#76 = Russia)], PushedFilters: [IsNotNull(first_name), StringStartsWith(first_name,M)], ReadSchema: struct !25

26. Comparing physical plans Unpartitioned Partitioned Project [first_name#12, last_name#13, country#14] Project [first_name#74, last_name#75, country#76] +- Filter (((isnotnull(country#14) && isnotnull(first_name#12)) +- Filter (isnotnull(first_name#74) && StartsWith(first_name#74, M)) && (country#14 = Russia)) && StartsWith(first_name#12, M)) +- FileScan csv [first_name#12,last_name#13,country#14] +- FileScan csv [first_name#74, last_name#75, country#76] Batched: false, Batched: false, Format: CSV, Format: CSV, Location: InMemoryFileIndex[…], Location: InMemoryFileIndex[….], PartitionCount: 1, PartitionFilters: [], PartitionFilters: [isnotnull(country#76), (country#76 = Russia)], PushedFilters: [IsNotNull(country), IsNotNull(first_name), PushedFilters: [IsNotNull(first_name), EqualTo(country,Russia), StringStartsWith(first_name,M)], StringStartsWith(first_name,M)], ReadSchema: struct ReadSchema: struct !26

27.Directly grabbing the partitions is faster !27

28.Real partitioned data lake • Updates every 3 hours • Has 5 million files • 15,000 files are being added every day • Still great for a lot of queries !28

29.Creating partitioned lakes (1/3) !29