- 快召唤伙伴们来围观吧
- 微博 QQ QQ空间 贴吧
- 文档嵌入链接
- 复制
- 微信扫一扫分享
- 已成功复制到剪贴板
Migrating to Apache Spark at Netflix
展开查看详情
1 .Migrating to Spark at Netflix Ryan Blue Spark Summit 2019
2 .Spark at Netflix
3 . Long ago . . . ● ETL was mostly written in Pig, with some in Hive ● Pipelines required data engineering ● Data engineers had to understand the processing engine
4 .Today Job executions
5 .Today Cluster runtime
6 .Today S3 bytes read S3 bytes written
7 . Today ● Spark is > 90% of job executions – high tens-of-thousands daily ● Data platform is easier to use and more efficient ● Customers from all parts of the business
8 .How did we get there?
9 . Not included ● High-profile Spark features: DataFrames, codegen, etc. ● S3 optimizations and committers ● Parquet filtering, tuning, and compression ● Notebook environment
10 .Spark deployments
11 . Following upstream Spark ● Rebase ● Backport ○ Pull in a new version ○ Pick only what’s needed ○ Easy to get new features ○ Time consuming ○ Easy to break things ○ Safe?
12 . Netflix: Parallel branches ● Maintain supported versions in parallel using backports ● Periodic rebase to add new minor versions: 1.6, 2.0, 2.1, 2.3 ● Recommend version based on actual use and experience ● Requires patching job submission
13 . Benefits of parallel branches ● Easily test another branch before spending time ● Avoids coordinating versions across major applications ● Fast iteration: deploy changes several times per week
14 . Testing ● Unstable branches ● Nightly canaries for stable and unstable ● CI runs unit tests for unstable ● Integration tests validate every deployment
15 . Supported versions ● 1.6 – scale problems ● 2.0 – a little too unpolished ● 2.1 – solid, with some additional love ● 2.3 – slow migration, faster in some cases
16 .Challenges
17 . Stability ● 1.6 is unstable above 500 executors ○ Use of the Actor model caused coarse locking ○ RPC dependencies make lock issues worse ○ Runaway retry storms ● Spark needs distributed tracing
18 . Stability ● Much better in 2.1, plus patches ○ Remove block status data from heartbeats (SPARK-20084) ○ Multi-threaded listener bus (SPARK-18838) ○ Unstable executor requests (SPARK-20540) ● 2.1 and 2.3 still have problems with 100,000+ tasks ○ Applications hang after shutdown ○ Increase job maxPartitionBytes or coalesce
19 . Unlikely problems ● Happen all the time at scale ● Scale in several dimensions ○ Large clusters, lots of disks to fail ○ High tens-of-thousands of executions ○ Many executors, many tasks, diverse workloads
20 . Unlikely problems ● Fix CommitCoordinator and OutputCommitter problems ● Turn off YARN preemption in production ● Use cgroups to contain greedy apps ● Use general-purpose features ○ Blacklisting to avoid cascading failure ○ Speculative execution to tolerate slow nodes ○ Adaptive execution reduces risk
21 . Memory management ● Fix persistent OOM causes ○ Use less driver memory for broadcast joins (SPARK-22170) ○ Add PySpark memory region and limits (SPARK-25004) ○ Base stats on row count, not size on disk
22 . Memory management ● Educate users about memory regions ○ Spark memory vs JVM memory vs overhead ○ Know what region fixes your problem (e.g., spilling) ○ Never set spark.executor.memory without also setting spark.memory.fraction
23 .Best practices
24 . Basics ● Avoid RDDs ○ Kryo problems plagued 1.6 apps ○ Let the optimizer improve jobs over time ● Aggressively broadcast ○ Remove the broadcast timeout ○ Set broadcast threshold much higher
25 . Configuration ● 3 rules: ○ Don’t copy configuration ○ If you don’t know what it does, don’t change it ○ Never change timeouts ● Document defaults and recommendations
26 . Parallelism ● Know how to control parallelism ○ spark.sql.shuffle.partitions, spark.sql.files.maxPartitionBytes ○ repartition vs coalesce ● Use the least-intrusive option ○ Set shuffle parallelism high and use adaptive execution ○ Allow Spark to improve
27 . Avoid wide stages ● Keep tasks in low tens-of-thousands ○ Too many tasks and the driver can’t handle heartbeats ○ Jobs hang for 10+ minutes after shutdown ● Reduce pressure on shuffle service ○ map tasks * reduce tasks = shuffle shards
28 . Dynamic Allocation ● Fixed --num-executors accidents (SPARK-13723) ● Use materialize instead of caching ○ Materialize: convert to RDD, back to DF, and count ○ Stores cache data in shuffle servers ○ Also avoids over-optimization
29 . Sort before writing ● Add ORDER BY ○ Partition columns, filter columns, and one high cardinality column ● Benefits ○ Cluster by partition columns – minimize output files ○ Cluster by common filter columns – faster reads ○ Automatic skew estimation – faster writes (wall time) ● Needs adaptive execution support