Migrating to Apache Spark at Netflix

In the last two years, Netflix has seen a mass migration to Spark from Pig and other MR engines. This talk will focus on the challenges of that migration and the work that has made it possible. This will include contributions that Netflix has made to Spark to enable wider adoption and on-going projects to make Spark appeal to a broader range of analysts, beyond data and ML engineers.

1.Migrating to Spark at Netflix Ryan Blue Spark Summit 2019

2.Spark at Netflix

3. Long ago . . . ● ETL was mostly written in Pig, with some in Hive ● Pipelines required data engineering ● Data engineers had to understand the processing engine

4.Today Job executions

5.Today Cluster runtime

6.Today S3 bytes read S3 bytes written

7. Today ● Spark is > 90% of job executions – high tens-of-thousands daily ● Data platform is easier to use and more efficient ● Customers from all parts of the business

8.How did we get there?

9. Not included ● High-profile Spark features: DataFrames, codegen, etc. ● S3 optimizations and committers ● Parquet filtering, tuning, and compression ● Notebook environment

10.Spark deployments

11. Following upstream Spark ● Rebase ● Backport ○ Pull in a new version ○ Pick only what’s needed ○ Easy to get new features ○ Time consuming ○ Easy to break things ○ Safe?

12. Netflix: Parallel branches ● Maintain supported versions in parallel using backports ● Periodic rebase to add new minor versions: 1.6, 2.0, 2.1, 2.3 ● Recommend version based on actual use and experience ● Requires patching job submission

13. Benefits of parallel branches ● Easily test another branch before spending time ● Avoids coordinating versions across major applications ● Fast iteration: deploy changes several times per week

14. Testing ● Unstable branches ● Nightly canaries for stable and unstable ● CI runs unit tests for unstable ● Integration tests validate every deployment

15. Supported versions ● 1.6 – scale problems ● 2.0 – a little too unpolished ● 2.1 – solid, with some additional love ● 2.3 – slow migration, faster in some cases


17. Stability ● 1.6 is unstable above 500 executors ○ Use of the Actor model caused coarse locking ○ RPC dependencies make lock issues worse ○ Runaway retry storms ● Spark needs distributed tracing

18. Stability ● Much better in 2.1, plus patches ○ Remove block status data from heartbeats (SPARK-20084) ○ Multi-threaded listener bus (SPARK-18838) ○ Unstable executor requests (SPARK-20540) ● 2.1 and 2.3 still have problems with 100,000+ tasks ○ Applications hang after shutdown ○ Increase job maxPartitionBytes or coalesce

19. Unlikely problems ● Happen all the time at scale ● Scale in several dimensions ○ Large clusters, lots of disks to fail ○ High tens-of-thousands of executions ○ Many executors, many tasks, diverse workloads

20. Unlikely problems ● Fix CommitCoordinator and OutputCommitter problems ● Turn off YARN preemption in production ● Use cgroups to contain greedy apps ● Use general-purpose features ○ Blacklisting to avoid cascading failure ○ Speculative execution to tolerate slow nodes ○ Adaptive execution reduces risk

21. Memory management ● Fix persistent OOM causes ○ Use less driver memory for broadcast joins (SPARK-22170) ○ Add PySpark memory region and limits (SPARK-25004) ○ Base stats on row count, not size on disk

22. Memory management ● Educate users about memory regions ○ Spark memory vs JVM memory vs overhead ○ Know what region fixes your problem (e.g., spilling) ○ Never set spark.executor.memory without also setting spark.memory.fraction

23.Best practices

24. Basics ● Avoid RDDs ○ Kryo problems plagued 1.6 apps ○ Let the optimizer improve jobs over time ● Aggressively broadcast ○ Remove the broadcast timeout ○ Set broadcast threshold much higher

25. Configuration ● 3 rules: ○ Don’t copy configuration ○ If you don’t know what it does, don’t change it ○ Never change timeouts ● Document defaults and recommendations

26. Parallelism ● Know how to control parallelism ○ spark.sql.shuffle.partitions, spark.sql.files.maxPartitionBytes ○ repartition vs coalesce ● Use the least-intrusive option ○ Set shuffle parallelism high and use adaptive execution ○ Allow Spark to improve

27. Avoid wide stages ● Keep tasks in low tens-of-thousands ○ Too many tasks and the driver can’t handle heartbeats ○ Jobs hang for 10+ minutes after shutdown ● Reduce pressure on shuffle service ○ map tasks * reduce tasks = shuffle shards

28. Dynamic Allocation ● Fixed --num-executors accidents (SPARK-13723) ● Use materialize instead of caching ○ Materialize: convert to RDD, back to DF, and count ○ Stores cache data in shuffle servers ○ Also avoids over-optimization

29. Sort before writing ● Add ORDER BY ○ Partition columns, filter columns, and one high cardinality column ● Benefits ○ Cluster by partition columns – minimize output files ○ Cluster by common filter columns – faster reads ○ Automatic skew estimation – faster writes (wall time) ● Needs adaptive execution support