Improving Apache Spark’s Reliability with DataSourceV2

DataSourceV2 is Spark’s new API for working with data from tables and streams, but “v2” also includes a set of changes to SQL internals, the addition of a catalog API, and changes to the data frame read and write APIs. This talk will cover the context for those additional changes and how “v2” will make Spark more reliable and predictable for building enterprise data pipelines. This talk will include: * Problem areas where the current behavior is unpredictable or unreliable * The new standard SQL write plans (and the related SPIP) * The new table catalog API and a new Scala API for table DDL operations (and the related SPIP) * Netflix’s use case that motivated these changes
展开查看详情

1.Improving Spark’s Reliability with DataSourceV2 Ryan Blue Spark Summit 2019

2.Data at Netflix

3. Cloud-native data warehouse ● YARN compute clusters are expendable ● Expendable clusters require architectural changes ○ GENIE is a job submission service that selects the cluster ○ METACAT is a cluster-independent metastore ○ S3 is the source of truth for data

4. S3 is eventually consistent ● File list calls may be inaccurate ● Hive tables rely on accurate listing for correctness ● S3 queries may be incorrect, sometimes

5. S3 is eventually consistent ● File list calls may be inaccurate ● Hive tables rely on accurate listing for correctness ● S3 queries may be incorrect, sometimes

6.At Netflix’s scale, sometimes is every day.

7. A reliable S3 warehouse (in 2016) ● Requires consistent listing – S3MPER ● Requires in-place writes – BATCH PATTERN ● Requires atomic metastore changes – METACAT

8. Changes needed in Spark ● Integrate S3 batch pattern committers ● Spark versions ○ 1.6 – Hive path only ○ 2.0 – DataSource path for reads, not writes ○ 2.1+ – Use DataSource path for reads and writes

9.Problems and Roadblocks

10. DataFrameWriter ● Behavior is not defined ● What do save and saveAsTable do differently? ○ Create different logical plans . . . that are converted to other logical plans ● When you use “overwrite” mode, what happens? ○ Depends on the data source

11. SaveMode ● Delegates behavior to the source when tables don’t exist ● Overwrite might mean: ○ Replace table – data and metadata (Some code paths) ○ Replace all table data (Some code paths) ○ Replace static partitions (DataSource tables) ○ Replace dynamic partitions (Hive tables, SPARK-20236)

12. Validation ● What is “correct” for CTAS/overwrite when the table exists? ● PreprocessTableCreation vs PreprocessTableInsertion ○ Depends on the DataFrameWriter call ● Spark automatically inserts unsafe casts (e.g. string to int) ● Path tables have no schema validation on write

13.“[These] should do the same thing, but as we've already published these 2 interfaces and the implementations may have different logic, we have to keep these 2 different commands.”

14.“[These] should do the same thing, but as we've already published these 2 interfaces and the implementations may have different logic, we have to keep these 2 different commands.” 😕

15. Commands ● RunnableCommand wraps a logical in a pseudo-physical plan ● Commands created inside run made it worse

16. Community Roadblocks ● Substantial behavior changes for 2.0 ○ Committed with no time to review . . . to the 2.0 release branch ● Behavior not up for discussion ● Parts of PRs merged without attribution

17.Iceberg and DataSourceV2

18. A reliable S3 warehouse (in 2019) ● Iceberg: tables without unpleasant surprises ● Fix tables, not the file system ● While fixing reliability and scale, fix usability: ○ Reliable schema evolution ○ Automatic partitioning ○ Configure tables, not jobs

19. Last year ● Need a way to plug in Iceberg cleanly ● Maintaining a separate write path takes time ● Spark’s write path had solidified ● DataSourceV2 was proposed . . .

20. Why DataSourceV2? ● Isn’t v2 just an update to the read/write API? ● Existing design problems also affect v2 ○ No write validation – yet another logical plan ○ SaveMode passed to sources ● Opportunity: avoid needing v3 to fix behavior

21. What’s different in DSv2 ● Define a set of common logical plans ○ CTAS, RTAS, Append, OverwriteByExpression, etc. ○ Document user expectations and behavior ○ Implement consistent behavior in Spark for all v2 sources ● SPIP: Standardize SQL logical plans https://issues.apache.org/jira/browse/SPARK-23521

22. Standard Logical Plans ● Specialize physical plans, not logical plans ○ No more InsertIntoDataSourceTable and InsertIntoHiveTable ○ No forgetting to apply rules to a new logical plan ● Apply validation rules universally ○ Same rules for Append and Overwrite ● Avoid using RunnableCommand

23. Consistent behavior ● Create, alter, and drop tables in Spark, not sources ○ CTAS when table exists: fail the query in Spark ○ Requires a catalog plugin API ● SPIP: Spark API for Table Metadata https://issues.apache.org/jira/browse/SPARK-27067

24. Catalog API ● Multi-catalog support ○ Create tables in the source of truth ○ Avoiding this caused strange Spark behavior ● SPIP: Identifiers for multi-catalog support https://issues.apache.org/jira/browse/SPARK-27066

25. Status ● Goal: working DSv2 in Spark 3.0 ○ Independent of the v1 path ○ Default behavior to v1 ● SPIPs have been adopted by community votes ● Append and overwrite plans are added and working ● Waiting on catalog API to add CTAS and DDL

26.Thank you! Questions? Up next: Migrating to Spark at Netflix At 11:50 today, in Room 2006