Fault-Tolerant Streaming Computation at Scale



1.Discretized Streams: Fault-Tolerant Streaming Computation at Scale Matei Zaharia , Tathagata Das, Haoyuan Li, Scott Shenker , Ion Stoica University of California, Berkeley

2.Stream processing and the problem Processing real time data as they arrive: Site Activity Statistics: Facebook users clicking on ads Computer Clusters Monitoring: mining real time logs Spam detection and prevention: detecting spam tweets before it’s too late Problems: Scalability to 100s of nodes: Fault tolerance ? stragglers? Generic enough to model a wide variety of systems ?

3.Goals Scalability to 100s of nodes Minimal cost beyond base processing (little over head) Second-scale latency Second-scale recovery from faults and stragglers

4.Existing Systems Mostly, stateful continuous operators Difficult to handle faults and stragglers Internal states Non-determinism (possible out of order messages) Replication (copies of each node) Relatively quick recovery, but … Costly (2x Hardware) Needs synchronization (special protocols, eg . Flux) Possible delays!

5.Upstream backup (buffer messages to replay) Buffer out-messages up to some checkpoint to be replayed Upon a node failure, a stand-by node steps in Parents replay buffered messages to the step-in node so that it reaches the desired state Incurs system delay! (waiting till the step-in node to get in state to start processing new data) Storm*, MapReduce Online, Time Stream

6.Stragglers? Neither handles stragglers Replicas would be delayed (synchronization !) Upstream-backup would treat stragglers as failures Recovery mechanism kicks-in again, incurring more delay!

7.Discretized Streams (D-Streams) Computations structured in a set of short, stateless, and deterministic tasks Finer-granularity dependencies Allows parallel recovery and speculation State stored in-memory as Resilient Distributed Datasets (RDDs) Unification with batch processing

8.D-Streams Stream of deterministic batch computations on small time intervals Map, filter, reduce, groupBy , … etc Produce either intermediate state (RDDs) or program outputs (pushed to external storage maybe)

9.RDDs Immutable (read-only) dataset Data partitioned between workers Keeps the lineage of how the RDD was computed Eg ., by doing a map on another RDD (a dependency) Avoids replication by recomputing lost partitions from dependencies using linage Usually in-memory, but RDD partitions data can be persisted to disk

10.Example pageViews = readStream ( “http://... ”, “1s”) ones = pageViews.map ( event => (event.url, 1 ) ) counts = ones.runningReduce ( (a, b) => a + b)

11.RDDs cont’d Periodic checkpoints System will replicate every, say, tenth RDD Prevents infinite recomputations Recovery is often fast: parallel recomputation of lost partitions on separate nodes Parallel in respect to partitions, and time-intervals Computing different partitions of different timer-intervals in the same time More parallelism than upstream backup even when having more continuous operators per node

12.Timing ? Assumes data arriving between [t, t+1) are actually [t, t+1) records Late records (according to embedded timestamp)? Two possible solutions: System can wait for slack time before starting processing [t, t+1) batch would wait till, say, t+5 before it packages the RDD and starts processing so it can catch late [t, t+1) messages that are arriving between [t+1, t+5) Output is delayed (here by 4 time units) Application level updates Compute for [t, t+1) and output its result catch any late [t, t+1) record, arriving up till, say, t+5 Aggregate, reduce and output an updated value for [t, t+1) based on the records caught in [t, t+5)

13.D-Streams API Connect to streams: External streams ( eg . listening on a port) Loading periodically from a storage system ( eg . HDFS) D-Streams support: Transformations: create a new D-Stream from one or more D-Streams Map, reduce, groupBy , join … etc Output operations: write data to external systems save, foreachRDD

14.D-Stream API cont’d Windowing Group records from a sliding window into one RDD words.window (“5s”) // “words” is the D-Stream of words as they are arriving A D-Stream of 5-second intervals, each is an RDD having the words that arrived in that time period partitioned over workers, ie ., words in [0,5), [1,6), [2,7) .. etc DStream of data window length sliding interval

15.D-Stream API cont’d Incremental Aggregation Computing an aggregate (max, count .. etc ) over a sliding window Eg ., pairs.reduceByWindow (“5s”, _+_ ) //”pairs” is the (word,1) D-Stream Computes aggregate per interval, aggregates by the last 5 seconds Function has to be associative

16.D-Stream API cont’d Incremental Aggregation Associatve and invertible Efficient Eg , pairs.reduceByWindow (“5s”, _ + _ , _ - _ )

17.D-Stream API cont’d State tracking (Key, Event) -> (Key, State) Initialize : create state from event Update: ( State,Event ) -> NewState Timeout: drop old states eg .: Sessions = events.track ( (key, ev ) => 1, (key, state, ev ) => ev == Exit? null : 1, “30s”) //( ClientID , 1) for each active session // .count()

18.D-Streams Consistency? No chance of out of sync outputs Microbatched temporal data and clear states (determinism) It’s all RDDs Can play well with offline data loaded into RDDs Perform ad-hoc queries Attach console shell, perform queries on available RDDs Query the state without the need to output! (debugging)

19.Summary Aspect D-Streams Continuous processing Systems Latency 0.5-2s 1-100ms unless micro-batched for consistency Consistency Records processed atomically with the interval they arrive in May wait short of time to sync operators before proceeding Late Records Slack time or application level correction Slack time or out of order processing Fault Recovery Fast parallel recovery Replication or serial recovery on one node Straggler Recovery Possible via speculative execution Typically not handled Mixing with batching Simple unification through RDDs API Some DBs do, but not in message queueing systems

20.System Architecture Implemented into Spark Streaming, based on modified Spark processing engine A master : Tracks D-Stream lineage graph and schedules tasks to compute new RDD partitions Workers: receive data, store the partitions of input and computed RDDs, and execute tasks A Client Library: used to send data into the system

21.Application Execution Defining one or more stream inputs Ensure that data is replicated across two worker nodes before acknowledging source If a worker fails, unacknowledged data is sent to another worker (by source) Block store on each worker manages the data, tracked by the master, so that every node can find dependency partitions A uniqueID for each block, any node has a particular block ID can serve it (if multiple nodes compute it) Dropped in LRU fashion Nodes have their clocks synchronized via NTP Each node sends master a list of block IDs it has Let the tasks begin, master (no further synchronization needed – determinism!) Tasks carried out as their parents finish

22.Execution level optimization Operators that can be grouped get pipelined into a single task Map, map, filter .. etc (narrow-dependencies) Tasks placed based on data locality Controls partitioning of RDDs As local as possible across interval for reused data

23.Optimization for Stream Processing Network Communication Now uses async IO to let tasks with remote input, such as reduce, fetch them faster Timestep Pipelining Allow running overlapping timestep tasks tasks to utilize all nodes Task Scheduling Multiple optimizations, eg ., hand-tuning the size of control messages to be able to launch parallel jobs of hundreds of tasks every few hundred milliseconds. Storage Layer Async checkpointing of RDDs, zero-copy when possible. Lineage cutoff Forget lineage after an RDD has been checkpointed .

24.Optimization cont’d Master Recovery Mitigate single point of failure These optimizations also benefited Spark batch operations and showed 2x improvement

25.Memory Management Least Recently Used dropping of partitions Data spilled to disk if no enough memory Configurable maximum history timeout System will forget timed-out blocks (think garbage collection) History t imeout > checkpoint interval In many applications, memory use is not onerous State within computation is much smaller than the input data

26.Fault Recovery Deterministic nature of D-Streams allows parallel recovery RDDs are immutable State is fully explicit Fine-granularity dependencies Regular checkpoints helps speed up recomputations Parallel Recovery A node fails -> different partitions in different RDDs are lost Master identifies lost partitions and sends out tasks to nodes to recompute time according to lineage and dependencies The whole cluster partakes in recovery Parallel both in partitions in each timestep and across timesteps (if dependency allows)

27.Parallel Recovery 1 min since last checkpoint

28.Handling Stragglers Detection : eg ., 1.4x slower than the median task in its job stage Run speculative backup copies of slow tasks recompute straggled partitions on other nodes, in parallel Produces same results (determinism)

29.Master Recovery Writing the state of computation reliably when starting each time step Graph of the user D-Streams and Scala function objects representing user code Time of the last checkpoint IDs of RDDs since the checkpoint Having workers connect to a new master and report their RDD partitions No problem if an RDD is computed twice (no difference) Fine to lose some running tasks while a new master loads up, can be recomputed Idempotent output operators; not to overwrite what has been outputted

30.Evaluation Lots of benchmarks and comparisons Seriously … lots Scaling Performance Grep : finding patterns WordCount , sliding window over 30s TopKCount , most k frequent words over the past 30s WordCount , TopKCount : reduceByWindow All use m1.xlarge EC2 (4 cores, 15GB RAM) 1s latency : 500ms intervals ( microbatches ) 2s latency : 1s intervals 100-bytes input records

31.Evaluation cont’d Linear scaling to 100 nodes 64M records/s Grep , at sub-second latency (640K records/s/node) 25M records/s TopK , WC at sub-second latency (250K records/s/node ) Comparison with Commercial Systems Oracle CEP 1M records/s on a 16-core machine StreamBase 245K records/s on 8 cores Esper 500K records/s on 4 cores Spark advantage, scales linearly

32.Evaluation cont’d Comparison with S4 and Storm S4: no fault tolerance guarantees Storm: at-least-once delivery guarantee S4: 7.5K records/s/node for Grep 1K records/s/node for WC Storm: both 100-byte and 1000-byte records 115K records/s/node for 100-byte Grep Even with precautions to improve performance Faster with 1000-byte, but still 2x slower than Spark

33.Fault Recovery time



36.Response to Stragglers

37.Evaluation on Real World Applications Video distribution Monitoring ( Conviva’s ) Identify and respond to video delivery problems across geography, CDNs, ISPs, and devices Receives events from video players Complex analysis, more than 50 metrics computed Two separate systems online (custom system), offline data (Hadoop/Hive) have to be kept in sync (metrics computed in the same way) Several minutes lag before ready for ad-hoc queries Porting to Spark Wrapping Hadoop’s map/reduce, 2 seconds batches Uses track to update state from events, and sliding reduceByKey to aggregate metrics over sessions Unification of the online/offline systems, runs ad-hoc queries

38.Video Distribution Monitoring Evaluation

39.Crowd Sourced Traffic Estimation Mobile Millennium: Machine Learning system to estimate automobile traffic conditions in cities Aerial roads lack sensors -> use GPS signals from vehicles Data is noisy (inaccurate near tall buildings) and sparse (one signal from each car per minute) Highly compute intensive EM algorithm to infer conditions MCMC and a traffic model to estimate a travel time distribution for each road link Previous implementation: 30-minute window batch iterative jobs in Spark Ported to online EM, merges new data every 5 seconds Used offline data to avoid overfitting (sparse data), up to 10 days back data

40.Crowd Sourced Traffic Estimation Mobile Millennium: Machine Learning system to estimate automobile traffic conditions in cities Aerial roads lack sensors -> use GPS signals from vehicles Data is noisy (inaccurate near tall buildings) and sparse (one signal from each car per minute) Highly compute intensive EM algorithm to infer conditions MCMC and a traffic model to estimate a travel time distribution for each road link Previous implementation: 30-minute window batch iterative jobs in Spark Ported to online EM, merges new data every 5 seconds Used offline data to avoid overfitting (sparse data), up to 10 days back data