Migrating to Apache Spark at Netflix
展开查看详情
1.Migrating to Spark at Netflix Ryan Blue Spark Summit 2019
2.Spark at Netflix
3. Long ago . . . ● ETL was mostly written in Pig, with some in Hive ● Pipelines required data engineering ● Data engineers had to understand the processing engine
4.Today Job executions
5.Today Cluster runtime
6.Today S3 bytes read S3 bytes written
7. Today ● Spark is > 90% of job executions – high tens-of-thousands daily ● Data platform is easier to use and more efficient ● Customers from all parts of the business
8.How did we get there?
9. Not included ● High-profile Spark features: DataFrames, codegen, etc. ● S3 optimizations and committers ● Parquet filtering, tuning, and compression ● Notebook environment
10.Spark deployments
11. Following upstream Spark ● Rebase ● Backport ○ Pull in a new version ○ Pick only what’s needed ○ Easy to get new features ○ Time consuming ○ Easy to break things ○ Safe?
12. Netflix: Parallel branches ● Maintain supported versions in parallel using backports ● Periodic rebase to add new minor versions: 1.6, 2.0, 2.1, 2.3 ● Recommend version based on actual use and experience ● Requires patching job submission
13. Benefits of parallel branches ● Easily test another branch before spending time ● Avoids coordinating versions across major applications ● Fast iteration: deploy changes several times per week
14. Testing ● Unstable branches ● Nightly canaries for stable and unstable ● CI runs unit tests for unstable ● Integration tests validate every deployment
15. Supported versions ● 1.6 – scale problems ● 2.0 – a little too unpolished ● 2.1 – solid, with some additional love ● 2.3 – slow migration, faster in some cases
16.Challenges
17. Stability ● 1.6 is unstable above 500 executors ○ Use of the Actor model caused coarse locking ○ RPC dependencies make lock issues worse ○ Runaway retry storms ● Spark needs distributed tracing
18. Stability ● Much better in 2.1, plus patches ○ Remove block status data from heartbeats (SPARK-20084) ○ Multi-threaded listener bus (SPARK-18838) ○ Unstable executor requests (SPARK-20540) ● 2.1 and 2.3 still have problems with 100,000+ tasks ○ Applications hang after shutdown ○ Increase job maxPartitionBytes or coalesce
19. Unlikely problems ● Happen all the time at scale ● Scale in several dimensions ○ Large clusters, lots of disks to fail ○ High tens-of-thousands of executions ○ Many executors, many tasks, diverse workloads
20. Unlikely problems ● Fix CommitCoordinator and OutputCommitter problems ● Turn off YARN preemption in production ● Use cgroups to contain greedy apps ● Use general-purpose features ○ Blacklisting to avoid cascading failure ○ Speculative execution to tolerate slow nodes ○ Adaptive execution reduces risk
21. Memory management ● Fix persistent OOM causes ○ Use less driver memory for broadcast joins (SPARK-22170) ○ Add PySpark memory region and limits (SPARK-25004) ○ Base stats on row count, not size on disk
22. Memory management ● Educate users about memory regions ○ Spark memory vs JVM memory vs overhead ○ Know what region fixes your problem (e.g., spilling) ○ Never set spark.executor.memory without also setting spark.memory.fraction
23.Best practices
24. Basics ● Avoid RDDs ○ Kryo problems plagued 1.6 apps ○ Let the optimizer improve jobs over time ● Aggressively broadcast ○ Remove the broadcast timeout ○ Set broadcast threshold much higher
25. Configuration ● 3 rules: ○ Don’t copy configuration ○ If you don’t know what it does, don’t change it ○ Never change timeouts ● Document defaults and recommendations
26. Parallelism ● Know how to control parallelism ○ spark.sql.shuffle.partitions, spark.sql.files.maxPartitionBytes ○ repartition vs coalesce ● Use the least-intrusive option ○ Set shuffle parallelism high and use adaptive execution ○ Allow Spark to improve
27. Avoid wide stages ● Keep tasks in low tens-of-thousands ○ Too many tasks and the driver can’t handle heartbeats ○ Jobs hang for 10+ minutes after shutdown ● Reduce pressure on shuffle service ○ map tasks * reduce tasks = shuffle shards
28. Dynamic Allocation ● Fixed --num-executors accidents (SPARK-13723) ● Use materialize instead of caching ○ Materialize: convert to RDD, back to DF, and count ○ Stores cache data in shuffle servers ○ Also avoids over-optimization
29. Sort before writing ● Add ORDER BY ○ Partition columns, filter columns, and one high cardinality column ● Benefits ○ Cluster by partition columns – minimize output files ○ Cluster by common filter columns – faster reads ○ Automatic skew estimation – faster writes (wall time) ● Needs adaptive execution support