Apache Spark At Scale in the Cloud

Using Apache Spark to analyze large datasets in the cloud presents a range of challenges. Different stages of your pipeline may be constrained by CPU, memory, disk and/or network IO. But what if all those stages have to run on the same cluster? In the cloud, you have limited control over the hardware your cluster runs on.

You may have even less control over the size and format of your raw input files. Performance tuning is an iterative and experimental process. It’s frustrating with very large datasets: what worked great with 30 billion rows may not work at all with 400 billion rows. But with strategic optimizations and compromises, 50+ TiB datasets can be no big deal.

By using Spark UI and simple metrics, explore how to diagnose and remedy issues on jobs:

Sizing the cluster based on your dataset (shuffle partitions)
Ingestion challenges – well begun is half done (globbing S3, small files)
Managing memory (sorting GC – when to go parallel, when to go G1, when offheap can help you)
Shuffle (give a little to get a lot – configs for better out of box shuffle) – Spill (partitioning for the win)
Scheduling (FAIR vs FIFO, is there a difference for your pipeline?)
Caching and persistence (it’s the cost of doing business, so what are your options?)
Fault tolerance (blacklisting, speculation, task reaping)
Making the best of a bad deal (skew joins, windowing, UDFs, very large query plans)
Writing to S3 (dealing with write partitions, HDFS and s3DistCp vs writing directly to S3)


1.WIFI SSID:Spark+AISummit | Password: UnifiedDataAnalytics

2.Spark At Scale In the Cloud Rose Toomey, Coatue Management #UnifiedDataAnalytics #SparkAISummit

3.About me NYC. Finance. Technology. Code. • Each job I wrote code but found that the data challenges just kept growing – Lead API Developer at Gemini Trust – Director at Novus Partners • Now: coding and working with data full time – Software Engineer at Coatue Management

4.How do you process this… Numbers are approximate. • Dataset is 35+ TiB raw • Input files are 80k+ unsplittable compressed row-based format with heavy skew, deeply nested directory structure • Processing results in 275+ billion rows cached to disk • Lots of data written back out to S3 – Including stages ending in sustained writes of tens of TiB 4

5.On a very big Spark cluster… Sometimes you just need to bring the entire dataset into memory. The more nodes a Spark cluster has, the more important configuration tuning becomes. Even more so in the cloud, where you will regularly experience I/O variance and unreliable nodes.

6.In the cloud? • Infrastructure management is hard – Scaling resources and bandwidth in a datacenter is not instant – Spark/Hadoop clusters are not islands – you’re managing an entire ecosystem of supporting players • Optimizing Spark jobs is hard Let’s limit the number of hard things we’re going to tackle at once.

7.Things going wrong at scale Everything is relative. In smaller clusters, these configurations worked fine. • Everything is waiting on everything else because Netty doesn't have enough firepower to shuffle faster • Speculation meets skew and relaunches the very slowest parts of a join, leaving most of the cluster idle • An external service rate limits, which causes blacklisting to sideline most of a perfectly good cluster 7

8.Spark at scale in the cloud Building Scaling Scheduling Tuning • Composition • Memory • Speculation Patience • Structure • Networking • Blacklisting Tolerance • S3 Acceptance

9. • What kind of nodes should the Putting together a big Spark cluster cluster have? Big? Small? Medium? • What's your resource limitation for the number of executors? – Just memory (standalone) – Both memory and vCPUs (YARN) • Individual executors should have how much memory and how many Galactic Wreckage in Stephan's Quintet virtual CPUs? 9

10. One Very Big Standalone Node One mega instance configured with many "just right" executors, each provisioned with • < 32 GiB heap (sweet spot for GC) • 5 cores (for good throughput) • Minimizes shuffle overhead • Like the pony, not offered by your cloud provider. Also, poor fault tolerance. 10

11. Multiple Medium-sized Nodes When looking at medium sized nodes, we have a choice: • Just one executor • Multiple executors But a single executor might not be the best resource usage: • More cores on a single executor is not necessarily better • When using a cluster manager like YARN, more executors could be a more efficient use of CPU and memory 11

12.“Desperate affairs require desperate measures.” Many Small Nodes Vice Admiral Horatio Nelson • 500+ small nodes • Each node over-provisioned relative to multiple executor per node configurations • Single executor per node • Most fault tolerant but big communications overhead 12

13.Why ever choose the worst solution? Single executor per small (or medium) node is the worst configuration for cost, provisioning, and resource usage. Why not recommend against it? • Resilient to node degradation and loss • Quick transition to production: relative over-provisioning of resources to each executor behaves more like a notebook • Awkward instance sizes may provision more quickly than larger instances 13

14.Onward! Now you have your cluster composition in mind, you’ll need to scale up your base infrastructure to support the number of nodes: • Memory and garbage collection • Tune RPC for cluster communications • Where do you put very large datasets? • How do you get them off the cluster? • No task left behind: scheduling in difficult times 14

15.Spark at scale in the cloud Building Scaling Scheduling Tuning • Composition • Memory • Speculation Patience • Structure • Networking • Blacklisting Tolerance • S3 Acceptance

16.Spark memory management SPARK-1000: Consolidate storage and execution memory Young Generation 1/3 management • NewRatio controls Old Generation 2/3 Young/Old proportion spark.memory.fraction ~60% ~40% Spark 50% execution • spark.memory.fraction dynamic – will take more metadata, user data sets storage and execution structures, OOM safety 50% storage space to ~60% tenured spark.memory.storageFraction 300m reserved space 16


18.Field guide to Spark GC tuning • Lots of minor GC - easy fix – Increase Eden space (high allocation rate) • Lots of major GC - need to diagnose the trigger – Triggered by promotion - increase Eden space – Triggered by Old Generation filling up - increase Old Generation space or decrease spark.memory.fraction • Full GC before stage completes – Trigger minor GC earlier and more often 18

19.Full GC tailspin Balance sizing up against tuning code • Switch to bigger and/or more nodes • Look for slow running stages caused by avoidable shuffle, tune joins and aggregation operations • Checkpoint both to preserve work at strategic points but also to truncate DAG lineage • Cache to disk only • Trade CPU for memory by compressing data in memory using spark.rdd.compress 19

20.Which garbage collector? Throughput or latency? • ParallelGC favors throughput • G1GC is low latency – Shiny new things like string deduplication – vulnerable to wide rows Whichever you choose, collect early and often. 20

21.Where to cache big datasets • To disk. Which is slow. • But frees up as much tenured space as possible for execution, and storing things which must be in memory – internal metadata – user data structures – broadcasting the skew side of joins 21


23.Perils of caching to disk 19/04/13 01:27:33 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_48_27005 ! When you lose an executor, you lose all the cached blocks stored by that executor even if the node is still running. • If lineage is gone, the entire job will fail • If lineage is present, RDD#getOrCompute tries to compensate for the missing blocks by re-ingesting the source data. While it keeps your job from failing, this could introduce enormous slowdowns if the source data is skewed, your ingestion process is complex, etc. 23

24.Self healing block management // use this with replication >= 2 when caching to disk in non-distributed filesystem spark.storage.replication.proactive = true Pro-active block replenishment in case of node/executor failures https://issues.apache.org/jira/browse/SPARK-15355 https://github.com/apache/spark/pull/14412 24

25.Spark at scale in the cloud Building Scaling Scheduling Tuning • Composition • Memory • Speculation Patience • Structure • Networking • Blacklisting Tolerance • S3 Acceptance

26.Tune RPC for cluster Netty server processing RPC requests communications is the backbone of both authentication and shuffle services. Insufficient RPC resources cause slow speed mayhem: clients disassociate, operations time out. org.apache.spark.network.util. TransportConf is the shared configfor both shuffle and authentication services. Ruth Teitelbum and Marlyn Meltzer reprogramming ENIAC, 1946 26

27.Scaling RPC // used for auth spark.rpc.io.serverThreads = coresPerDriver * rpcThreadMultiplier // used for shuffle spark.shuffle.io.serverThreads = coresPerDriver * rpcThreadMultiplier Where "RPC thread multiplier" is a scaling factor to increase the service's thread pool. • 8 is aggressive, might cause issues • 4 is moderately aggressive • 2 is recommended (start here, benchmark, then increase) • 1 (number of vCPU cores) is default but is too small for a large cluster 27

28.Shuffle The definitive presentation on shuffle tuning: Tuning Apache Spark for Large-Scale Workloads (Gaoxiang Liu and Sital Kedia) So this section focuses on • Some differences to configurations presented in Liu and Kedia's presentation, as well as • Configurations that weren't shown in this presentation 28

29.Strategy for lots of shuffle clients 1. Scale the server way up // mentioned in Liu/Kedia presentation but now deprecated // spark.shuffle.service.index.cache.entries = 2048 // default: 100 MiB spark.shuffle.service.index.cache.size = 256m // length of accept queue. default: 64 spark.shuffle.io.backLog = 8192 // default (not increased by spark.network.timeout) spark.rpc.lookupTimeout = 120s 29