Apache Spark Core—Deep Dive—Proper Optimization (continues)

Optimizing spark jobs through a true understanding of spark core. Learn: What is a partition? What is the difference between read/shuffle/write partitions? How to increase parallelism and decrease output files? Where does shuffle data go between stages? What is the “right” size for your spark partitions and files? Why does a job slow down with only a few tasks left and never finish? Why doesn’t adding nodes decrease my compute time?
展开查看详情

1.WIFI SSID:SparkAISummit | Password: UnifiedAnalytics

2.Spark Core – Proper Optimization Daniel Tomes, Databricks #UnifiedAnalytics #SparkAISummit

3.Me • Norman, OK – Undergrad OU – SOONER – Masters – OK State • ConocoPhillips • Raleigh, NC • Cloudera • Databricks /in/tomes #UnifiedAnalytics #SparkAISummit 3

4.Talking Points • Spark Hierarchy • The Spark UI • Rightsizing & Optimizing • Advanced Optimizations #UnifiedAnalytics #SparkAISummit 4

5.Spark Hierarchy #UnifiedAnalytics #SparkAISummit 5

6.Spark Hierarchy • Actions are eager – Made of transformations (lazy) • narrow • wide (requires shuffle) – Spawn jobs • Spawn Stages – Spawn Tasks » Do work & utilize hardware #UnifiedAnalytics #SparkAISummit 6

7.Navigating The Spark UI DEMO #UnifiedAnalytics #SparkAISummit 7

8.Understand Your Hardware • Core Count & Speed • Memory Per Core (Working & Storage) • Local Disk Type, Count, Size, & Speed • Network Speed & Topology • Data Lake Properties (rate limits) • Cost / Core / Hour – Financial For Cloud – Opportunity for Shared & On Prem #UnifiedAnalytics #SparkAISummit 8

9.Get A Baseline Goal • Is your action efficient? – Long Stages, Spills, Laggard Tasks, etc? • CPU Utilization – GANGLIA / YARN / Etc – Tails #UnifiedAnalytics #SparkAISummit 9

10.Minimize Data Scans (Lazy Load) • Data Skipping – HIVE Partitions – Bucketing • Only Experts – Nearly Impossible to Maintain – Databricks Delta Z-Ordering • What is It • How To Do It #UnifiedAnalytics #SparkAISummit 10

11.#UnifiedAnalytics #SparkAISummit 11

12.No Lazy Loading With Lazy Loading Simple Extra Shuffle Partitions #UnifiedAnalytics #SparkAISummit 12

13.Without Partition Filter Shrink Partition Range Using a Filter on HIVE Partitioned Column With Partition Filter #UnifiedAnalytics #SparkAISummit 13

14.Partitions – Definition Each of a number of portions into which some operating systems divide memory or storage HIVE PARTITION == SPARK PARTITION #UnifiedAnalytics #SparkAISummit 14

15.Spark Partitions – Types • Input – Controls - Size • spark.default.parallelism (don’t use) • spark.sql.files.maxPartitionBytes (mutable) – assuming source has sufficient partitions • Shuffle – Control = Count • spark.sql.shuffle.partitions • Output – Control = Size • Coalesce(n) to shrink • Repartition(n) to increase and/or balance (shuffle) • df.write.option(“maxRecordsPerFile”, N) #UnifiedAnalytics #SparkAISummit 15

16.Partitions – Shuffle – Default Default = 200 Shuffle Partitions #UnifiedAnalytics #SparkAISummit 16

17.Partitions – Right Sizing – Shuffle – Master Equation • Largest Shuffle Stage – Target Size <= 200 MB/partition • Partition Count = Stage Input Data / Target Size – Solve for Partition Count EXAMPLE Shuffle Stage Input = 210GB x = 210000MB / 200MB = 1050 spark.conf.set(“spark.sql.shuffle.partitions”, 1050) BUT -> If cluster has 2000 cores spark.conf.set(“spark.sql.shuffle.partitions”, 2000) #UnifiedAnalytics #SparkAISummit 17

18.Cluster Spec Stage 21 -> Shuffle Fed By Stage 19 & 20 96 cores @ 7.625g/core THUS 3.8125g Working Mem Stage 21 Shuffle Input = 45.4g + 8.6g == 54g 3.8125g Storage Mem Default Shuffle Partition == 200 == 54000mb/200parts =~ 270mb/shuffle part Spills #UnifiedAnalytics #SparkAISummit 18

19.Cluster Spec 96 cores @ 7.625g/core 3.8125g Working Mem 3.8125g Storage Mem 480 shuffle partitions – WHY? Target shuffle part size == 100m p = 54g / 100m == 540 540p / 96 cores == 5.625 NO SPILL 96 * 5 == 480 If p == 540 another 60p have to be loaded and processed after first cycle is complete #UnifiedAnalytics #SparkAISummit 19

20. Input Partitions – Right Sizing • Use Spark Defaults (128MB) unless… – Increase Parallelism – Heavily Nested/Repetitive Data – Generating Data – i.e. Explode – Source Structure is not optimal (upstream) – UDFs spark.conf.set("spark.sql.files.maxPartitionBytes", 16777216) #UnifiedAnalytics #SparkAISummit 20

21. 128mb 16mb #UnifiedAnalytics #SparkAISummit 21

22.#UnifiedAnalytics #SparkAISummit 22

23. Output Partitions – Right Sizing • Write Once -> Read Many – More Time to Write but Faster to Read • Perfect writes limit parallelism – Compactions (minor & major) Write Data Size = 14.7GB Desired File Size = 1500MB Max write stage parallelism = 10 96 – 10 == 86 cores idle during write #UnifiedAnalytics #SparkAISummit 23

24.Only 10 Cores Used Average File Size == 1.5g All 96 Cores Used Average File Size == 0.16g #UnifiedAnalytics #SparkAISummit 24

25.Output Partitions – Composition • df.write.option("maxRecordsPerFile", n) • df.coalesce(n).write… • df.repartition(n).write… • df.repartition(n, [colA, …]).write… • spark.sql.shuffle.partitions(n) • df.localCheckpoint(…).repartition(n).write… • df.localCheckpoint(…).coalesce(n).write… #UnifiedAnalytics #SparkAISummit 25

26.Partitions – Why So Serious? • Avoid The Spill • Maximize Parallelism – Utilize All Cores – Provision only the cores you need #UnifiedAnalytics #SparkAISummit 26

27.Advanced Optimizations • Finding Imbalances • Persisting • Join Optimizations • Handling Skew • Expensive Operations • UDFs • Multi-Dimensional Parallelism #UnifiedAnalytics #SparkAISummit 27

28.Balance Input Partitions • Maximizing Resources Requires Balance Shuffle Partitions – Task Duration Output Files Spills – Partition Size GC Times • SKEW – When some partitions are significantly larger than most Straggling Tasks #UnifiedAnalytics #SparkAISummit 28

29.75th percentile ~ 2m recs max ~ 45m recs stragglers take > 22X longer IF no spillage With spillage, 100Xs longer #UnifiedAnalytics #SparkAISummit 29