- 快召唤伙伴们来围观吧
- 微博 QQ QQ空间 贴吧
- 文档嵌入链接
- 复制
- 微信扫一扫分享
- 已成功复制到剪贴板
Scalable Progressive Analytics on Big Data in the Cloud
展开查看详情
1 . Scalable Progressive Analytics on Big Data in the Cloud Badrish Chandramouli1 Jonathan Goldstein1 Abdul Quamar2∗ 1 2 Microsoft Research, Redmond University of Maryland, College Park {badrishc, jongold}@microsoft.com, abdul@cs.umd.edu ABSTRACT completion before such problems are diagnosed, often after hours Analytics over the increasing quantity of data stored in the Cloud of expensive compute time are exhausted. has become very expensive, particularly due to the pay-as-you-go Data scientists therefore typically choose to perform their ad-hoc Cloud computation model. Data scientists typically manually ex- querying on extracted samples of data. This approach gives them tract samples of increasing data size (progressive samples) using the control to carefully choose from a huge variety [11, 28, 27] of domain-specific sampling strategies for exploratory querying. This sampling strategies in a domain-specific manner. For a given sam- provides them with user-control, repeatable semantics, and result ple, it provides precise (e.g., relational) query semantics, repeatable provenance. However, such solutions result in tedious workflows execution using a query processor and optimizer, result provenance that preclude the reuse of work across samples. On the other hand, in terms of what data contributed to an observed result, and query existing approximate query processing systems report early results, composability. Further, since choosing a fixed sample size a pri- but do not offer the above benefits for complex ad-hoc queries. We ori for all queries is impractical, data scientists usually create and propose a new progressive analytics system based on a progress operate over multiple progressive samples of increasing size [28]. model called Prism that (1) allows users to communicate progres- sive samples to the system; (2) allows efficient and deterministic 1.1 Challenges query processing over samples; and (3) provides repeatable seman- In an attempt to help data scientists, the database community tics and provenance to data scientists. We show that one can re- has proposed approximate query processing (AQP) systems such as alize this model for atemporal relational queries using an unmod- CONTROL [20] and DBO [23] that perform progressive analytics. ified temporal streaming engine, by re-interpreting temporal event We define progressive analytics as the generation of early results to fields to denote progress. Based on Prism, we build Now!, a pro- analytical queries based on partial data, and the progressive refine- gressive data-parallel computation framework for Windows Azure, ment of these results as more data is received. Progressive analytics where progress is understood as a first-class citizen in the frame- allows users to get early results using significantly fewer resources, work. Now! works with “progress-aware reducers”- in particular, it and potentially end (and possibly refine) computations early once works with streaming engines to support progressive SQL over big sufficient accuracy or query incorrectness is observed. data. Extensive experiments on Windows Azure with real and syn- The general focus of AQP systems has, however, been on auto- thetic workloads validate the scalability and benefits of Now! and matically providing confidence intervals for results, and selecting its optimizations, over current solutions for progressive analytics. processing orders to reduce bias [21, 9, 15, 17, 31]. The premise of AQP systems is that users are not involved in specifying the seman- 1. INTRODUCTION tics of early results; rather, the system takes up the responsibility With increasing volumes of data stored and processed in the of defining and providing accurate early results. To be useful, the Cloud, analytics over such data is becoming very expensive. The system needs to automatically select effective sampling strategies pay-as-you-go paradigm of the Cloud causes computation costs to for a particular combination of query and data. This can work for increase linearly with query execution time, making it possible for narrow classes of workloads, but does not generalize to complex a data scientist to easily spend large amounts of money analyzing ad-hoc queries. A classic example is the infeasibility of sampling data. The problem is exacerbated by the exploratory nature of ana- for join trees [10]. In these cases, a lack of user involvement with lytics, where queries are iteratively discovered and refined, includ- “fast and loose” progress has shortcomings; hence, data scientists ing the submission of many off-target and erroneous queries (e.g., tend to prefer the more laborious but controlled approach presented bad parameters). In traditional systems, queries must execute to earlier. We illustrate this using a running example. ∗ Work performed during internship at Microsoft Research. Example 1 (CTR) Consider an advertising platform where an an- alyst wishes to compute the click-through-rate (CTR) for each ad. We require two sub-queries (Qc and Qi ) to compute (per ad) the Permission to make digital or hard copies of all or part of this work for personal or classroom use is granted without fee provided that copies are number of clicks and impressions, respectively. Each query may not made or distributed for profit or commercial advantage and that copies be non-trivial; for example, Qc needs to process clicks on a per- bear this notice and the full citation on the first page. To copy otherwise, to user basis to consider only legitimate (non-automated) clicks from republish, to post on servers or to redistribute to lists, requires prior specific a webpage whitelist. Further, Qi may need to process a different set permission and/or a fee. Articles from this volume were invited to present of logged data. The final query Qctr joins (for each ad) the results their results at The 39th International Conference on Very Large Data Bases, of Qc and Qi , and computes their ratio as the CTR. Figure 1 shows August 26th - 30th 2013, Riva del Garda, Trento, Italy. Proceedings of the VLDB Endowment, Vol. 6, No. 14 a toy input sorted by user, and the final results for Qc , Qi , and Qctr . Copyright 2013 VLDB Endowment 2150-8097/13/14... $ 10.00.
2 . User Ad ... Ad Imprs Ad CTR Ad CTR u0 a0 ... Ad Clicks Ad Clicks a0 1 a0 2.0 a0 3.0 User Ad ... u0 a0 ... a0 3 Ad CTR a0 2 a0 4 a0 0.5 a0 0.75 u0 a0 ... u1 a0 ... a0 0.6 a0 3 a0 5 a0 0.6 a0 0.6 u1 a0 ... u2 a0 ... Ad Imprs (a) (b) (c) (d) u2 a0 ... u2 a0 ... a0 5 Figure 3: (a) Progressive Qc output; (b) Progressive Qi output; (a) (b) (c) (d) (c) & (d) Two possible progressive Qctr results. Figure 1: (a) Click data; (b) Impression data; (c) Final result of Qc and Qi ; (d) Final result of Qctr . useful. Map-reduce-online (MRO) [12] adds a limited form of Next, Figure 3 (a) and (b) show progressive results for the same pipelining to MR, but MRO reports a heuristic progress metric (av- queries Qc and Qi . Without user involvement in defining progres- erage fraction of data processed across mappers) that does not elim- sive samples, the exact sequence of progressive counts can be non- inate the problems discussed above (§ 6 covers related work). deterministic across runs, although the final counts are precise. To summarize, data scientists prefer user-controlled progressive Further, depending on the relative speed and sequence of results sampling because it helps avoid the above issues, but the lack of for Qc and Qi , Qctr may compose arbitrary progressive results, re- system support results in a tedious and error-prone workflow that sulting in significant variations in progressive CTR results. Fig- precludes the reuse of work across progressive samples. We need a ures 3(c) and (d) show two possible results for Qctr . For example, a system that (1) allows users to communicate progressive samples to CTR of 2.0 would result from combining the first tuple from Qc and the system; (2) allows efficient and deterministic query processing Qi . Some results that are not even meaningful (e.g., CTR > 1.0) over progressive samples, without the system itself trying to reason are possible. Although both results eventually get to the same final about specific sampling strategies or confidence estimation; and yet CTR, there is no mechanism to ensure that the inputs being corre- (3) continues to offer the desirable features outlined above. lated to compute progressive CTRs are deterministic and compara- ble (e.g., computed using the same sample of users). 1.2 Contributions The above example illustrates several challenges: 1) Prism: A New Progress Model & Implementation 1) User-Control: Data scientists usually have domain expertise that We propose (§ 2) a new progress model called Prism (Progressive they leverage to select from a range of sampling strategies [11, 28, sampling model). The key idea is for users to encode their cho- 27] based on their specific needs and context. In Example 1, we sen progressive sampling strategy into the data by augmenting tu- may progressively sample both datasets identically in user-order ples with explicit progress intervals (PIs). PIs denote logical points for meaningful progress, avoiding the join sampling problem [10]. where tuples enter and exit the computation, and explicitly assign Users may also need more flexibility; for instance, with a star- tuples to progressive samples. PIs offer remarkable flexibility for schema dataset, they may wish to fully process the small dimension encoding sampling strategies and ordering for early results, includ- table before sampling the fact table, for better progressive results. ing arbitrarily overlapping sample sequences and special cases such 2) Semantics: Relational algebra provides precise semantics for as the star-schema join mentioned earlier (§ 2.5 has more details). SQL queries. Given a set of input tables, the correct output is de- PIs propagate through Prism operators. Combined with progres- fined by the input and query alone, and is independent of dynamic sive operator semantics, PIs provide closed-world determinism: the properties such as the order of processing tuples. However, for exact sequence of early results is a deterministic function of aug- complex queries, existing AQP systems use operational semantics, mented inputs and the logical query alone. They are independent of where early results are on a best-effort basis. Thus, it is unclear physical plans, which enables side-effect-free query optimization. what a particular early result means to the user. Provenance is explicit; result tuples have PIs that denote the ex- 3) Repeatability & Optimization: Two runs of a query in AQP may act set of contributing inputs. Prism also allows meaningful query provide a different sequence of early results, although they have composition, as operators respect PIs. If desired, users can encode to both converge to the same final answer. Thus, without limiting confidence interval computations as part of their queries. the class of queries which are progressively executed, it is hard to The introduction of a new progress model into an existing re- understand what early answers mean, or even recognize anomalous lational engine appears challenging. However, interestingly, we early answers. Even worse, changing the physical operators in the show (§ 2.4) that a progressive in-memory relational engine based plan (e.g., changing operators within the ripple join family [16]) on Prism can be realized immediately using an unmodified tem- can significantly change what early results are seen! poral streaming engine, by carefully reusing its temporal fields to 4) Provenance: Users cannot easily establish the provenance of denote progress. Tuples from successive progressive samples get early results, e.g., link an early result (CTR=3.0) to particular con- incrementally processed when possible, giving a significant perfor- tributing tuples, which is useful to debug and reason about results. mance benefit. Note here that the temporal engine is unaware that 5) Query Composition: The problem of using operational seman- it is processing atemporal relational queries; we simply re-interpret tics is exacerbated when one starts to compose queries. Example 1 its temporal fields to denote progress points. While it may appear shows that one may get widely varying results (e.g., spurious CTR that in-memory queries can be memory intensive since the final values) that are hard to reason about. answer is computed over the entire dataset, Prism allows us to ex- 6) Scale-Out: Performing progressive analytics at scale exacer- ploit sort orders and foreign key dependencies in the input data and bates the above challenges. The CTR query from Example 1 is queries to reduce memory usage significantly (§ 2.6). expressed as two map-reduce Job Prism generalizes AQP Our progress semantics are compatible (MR) jobs that partition data by partitioning with queries for which prior AQP techniques with statistical guar- keys UserId, feeding a third job that AdId antees apply, and thus don’t require user involvement. These tech- partitions data by a different key niques simply correspond to different PI assignment policies for (AdId); see Figure 2. In a input data. For instance, variants of ripple join [16] are different PI complex distributed multi-stage assignments for a temporal symmetric-hash-join, with confidence UserId UserId workflow, accurate deterministic intervals computed as part of the query. Thus, Prism is orthogo- progressive results can be very Figure 2: CTR; MR jobs. nal to and can leverage this rich area of prior work, while adding
3 . PI Ad Clicks Progress interval [0, 1) a0 1 Click PI User Ad [1, 2) a0 2 Input Data [0, ∞) u0 a0 [2, ∞) a0 3 Impression PI User Ad [0, ∞) u0 a0 PI Ad Imprs PI Ad CTR Input Data [0, ∞) u0 a0 [1, ∞) u1 a0 [0, 1) a0 2 [0, 1) a0 0.5 [1, ∞) u1 a0 [2, ∞) u2 a0 [1, 2) a0 3 [1, 2) a0 0.66 [2, ∞) u2 a0 [2, ∞) u2 a0 [2, ∞) a0 5 [2, ∞) a0 0.6 1 2 3 (a) (b) (c) (d) 2 3 5 Figure 4: (a,b) Input data with progress intervals; (c) Progres- 0.5 0.66 0.6 sive results of Qc and Qi ; (d) Progressive output of Qctr . Early results (on partial data) Final result (on full data) the benefit of repeatable and deterministic semantics. In summary, 0 1 2 Progress Domain Prism gives progressive results the same form of determinism and Figure 5: Input and output progress intervals, query semantics. user control that relational algebra provides final results. intervals that precisely define how data progressively contributes to 2) Applying Prism in a Scaled-Out Cloud Setting result computation. Users express their data analytics as relational The Prism model is particularly suitable for progressive analytics queries that consist of a DAG of progressive operators. An exten- on big data in the Cloud, since queries in this setting are complex, sion of traditional database operators, progressive operators under- and memory- and CPU-intensive. Traditional scalable distributed stand and propagate progress intervals based on precisely defined frameworks such as MR are not pipelined, making them unsuitable operator semantics. The result of query processing is a sequence of for progressive analytics. MRO adds pipelining, but does not offer augmented tuples whose progress intervals denote early results and the semantic underpinnings of progress necessary to achieve the their associated regions of validity in the progress domain. Each of desirable features outlined earlier. these steps is elaborated in the following subsections. We address this problem by designing and building a new frame- work for progressive analytics called Now! (§ 3). Now! runs on 2.1 Logical Progress and Progress Intervals Windows Azure; it understands and propagates progress (based Prism defines a logical linear progress domain P as the range on the Prism model) as a first-class citizen inside the framework. of non-negative integers [0, ∞). Progress made by a query at any Now! generalizes the popular data-parallel MR model and supports given point during computation is explicitly indicated by a non- progress-aware reducers that understand explicit progress in the decreasing progress point p ∈ P. Progress point ∞ indicates the data. In particular, Now! can work with a temporal engine (we use point of query completion. StreamInsight [3]) as a progress-aware reducer to enable scaled-out Next, we associate a progress interval (PI) from the progress do- progressive relational (SQL) query support in the Cloud. Now! is a main to every tuple in the input data. More formally, each tuple T novel contribution in its own right, with several important features: is augmented with two new attributes, a progress-start P+ and a • Fully pipelined progressive computation and data movement across progress-end P- , that jointly denote a PI [P+ , P- ). P+ indicates the multiple stages with different partitioning keys, in order to avoid progress point at which a tuple T starts participating in the compu- the high cost of sending intermediate results to Cloud storage. tation, and P- (if not ∞) denotes the progress point at which tuple • Elimination of sorting in the framework using progress-ordered T stops contributing to the computation. PIs enable users to spec- data movement, partitioned computation pushed inside progress- ify domain-specific progressive sampling strategies. PI assignment aware reducers, and support for the traditional reducer API. can be controlled by data scientists to ensure quicker and more • Progress-based merge of multiple map outputs at a reducer node. meaningful early results, either directly or using a layer between • Concurrent scheduling of multi-stage map and reduce jobs with the system and the user. Figures 4(a) and (b) show PIs for our run- a new scheduling policy and flow control scheme. ning example inputs; they are also depicted in Figure 5 (top). We We also extend Now! (§ 4) with a high performance mode that provide several concrete examples of PI assignment in Section 2.5. eliminates disk writes, and discuss high availability (by leveraging progress semantics in a new way) and straggler management. 2.2 Progressive Operators and Queries We perform a detailed evaluation (§ 5) of Now! on Windows Progressive Operators Every relational operator O has a progres- Azure (with StreamInsight) over real and benchmark datasets up to sive counterpart, which computes augmented output tuples from 100GB, with up to 75 large-sized Azure compute instances. Exper- augmented input tuples. Logically, the output at progress point p is iments show that we can scale effectively and produce meaningful the operation O applied to input tuples whose PIs are stabbed by p. early results, making Now! suitable in a pay-as-you-go environ- Figures 4(c) and 5 (bottom) show the results of Qc and Qi , which ment. Now! provides a substantial reduction in processing time, behave as Count operators. We see that Qc produces a progressive memory and CPU usage as compared to current schemes; perfor- count of 1 at progress point 0, which it revises to 2 and 3 at progress mance is significantly enhanced by exploiting sort orders and using points 1 and 2. As a result, the PIs for these tuples are [0, 1), [1, 2) our memory-only processing mode. and [2, ∞) respectively. Paper Outline § 2 provides the details of our proposed model The P- for an output tuple may not always be known at the same for progressive computation Prism. We present Now! in detail in time as when the operator determine its P+ . Thus, an operator may § 3; and discuss several extensions in § 4. The detailed evaluation output a tuple having an eventual PI of [P+ ,P- ) in two separate of Now! is covered in § 5, and related work is discussed in § 6. pieces: (1) at progress point P+ , it generates a start-edge tuple T 1 with a PI [P+ , ∞) indicating that the tuple participates in the result 2. Prism SEMANTICS & CONSTRUCTION forever; (2) at the later progress point P- , it generates an end-edge At a high level, our progress model (called Prism) defines a log- tuple T 2 with the actual PI [P+ ,P- ). We use the term progress-sync ical linear progress domain that represents the progress of a query. to denote the progress point associated with a tuple (or its subse- Sampling strategies desired by data scientists are encoded into the quent update). The start-edge tuple T 1 has a progress-sync of P+ , data before query processing, using augmented tuples with progress whereas the end-edge tuple T 2 has a progress-sync of P- .
4 . Every operator both processes and generates augmented tuples query. The unmodified SPE operates on these tuples as though they in non-decreasing progress-sync order. The eventual P- values for were temporal events, and produces output events with timestamp early results that get refined later are less than ∞, to indicate that fields that we re-interpret as tuples with PIs. the result is not final. For example, consider an Average operator Note that with this construction, the SPE is unaware that it is that reports a value a0 from progress point 0 to 10, and revises it being used as a progressive SQL processor. It processes and pro- to a1 from progress point 10 onwards. Tuple a0 has an eventual PI duces events whose temporal fields are re-interpreted to denote of [0, 10). This is reported as a start-edge [0, ∞) at progress point progress of an atemporal (relational) query. For instance, the tem- 0. At progress point 10, the operator reports an end-edge [0, 10) for poral symmetric-hash-join in an SPE effectively computes a se- the old average a0 , followed immediately by a start-edge [10, ∞) for quence of joins over a sequence of progressive samples very ef- the revised average a1 . Similarly, a progressive Join operator with ficiently. The resulting query processor transparently handles all of one tuple on each input with PIs [10, 20) and [15, 25) – if the join SQL, including user-defined functions, with all the desirable fea- condition is satisfied – produces a result tuple with PI [15, 20), the tures of our new progress model. intersection of the two input PIs. Note here that the output tuple’s PI ends at 20 because its left input is no longer valid at that point. 2.5 PI Assignment Progressive Queries Based on the above semantics, operators Any progressive sampling strategy at the inputs corresponds to a can be composed meaningfully to produce progressive queries. We PI assignment; several are discussed next. define Prism output for a relational query Q as: Inclusive & Non-inclusive Samples With inclusive samples (as used, for example, in EARL [25]), each sample is a superset of the Definition 1 (Prism Output) Associated with each input tuple is previous one. To specify these, input tuples are assigned a P- of ∞, a progress interval (PI). At every unique progress point p across all and non-decreasing P+ values based on when tuples become a part PI endpoints in the input data, there exists a set O p of output results of the sample, as shown in Figure 5 (top). In case of non-inclusive with PIs stabbed by p. O p is defined to be exactly the result of the samples, tuples have a finite P- to denote that they no longer par- query Q evaluated over input tuples with PIs stabbed by p. ticipate in computation beyond P- , and can even reappear with a greater P+ for a later sample (our technical report [7] includes a 2.3 Summary of Benefits of the Prism Model concrete example of expressing non-inclusive sampling using PIs). The results of Qctr for our running example are shown in Fig- Reporting Granularity Progress reporting granularity can be ures 4(d) and 5 (bottom); every CTR is meaningful as it is com- controlled by individual queries, by adjusting the way P+ moves puted on some prefix of users (for our chosen progress assignment), forward. Data is often materialized in a statistically relevant order, and CTR provenance is provided by PIs. The final CTR of 0.6 is and we may wish to include k additional tuples in each successive the only tuple active at progress point ∞, as expected. sample. We use a streaming AlterLifetime [8] operator that sets P+ It is easy to see that the output of a progressive query is a de- for the nth tuple to n/k and P- to ∞. This increases P+ by 1 after terministic function of the (augmented) input data and the logical every k tuples, resulting in the engine producing a new progressive query alone. Further, these progressive results are fixed for a given result every k tuples. We refer to the set of tuples with the same P+ input and logical query, and are therefore repeatable. Prism en- as a progress-batch. Data scientists often start with small progress- ables data scientists to use their domain knowledge to control pro- batches to get quick estimates, and then increase batch sizes (e.g., gressive samples; Section 2.5 provides several concrete examples. exponentially) as they get diminishing returns with more data. Early results in Prism carry the added benefit of provenance that Joins & Star Schemas In case of queries involving an equi-join, helps debug and reason about early results: the set of output tuples we may apply an identical sampling strategy (e.g., pseudo-random) with PIs stabbed by progress point p denote the progressive result over the join key in both inputs as this increases the likelihood of of the query at p. The provenance of these output tuples is simply getting useful early results. With a star-schema, we may set all tu- all tuples along their input paths whose PIs are stabbed by p. ples in the small dimension table to have a PI of [0, ∞), while pro- One can view Prism as a generalization of relational algebra with gressively sampling from the fact table as [0, ∞), [1, ∞), . . .. This progressive sampling as a first-class concept. Relational algebra causes a Join operator to “preload” the dimension table before pro- prescribes the final answer to a relational query but does not cover gressively sampling the fact table for meaningful early results. how we get there using partial results. The Prism algebra explicitly specifies, for any query, not only the final answer, but every inter- Stratified Sampling Stratified sampling groups data on a cer- mediate (progressive) result and its position in the progress domain. tain key and applies a sampling strategy (e.g., uniform) within each group to ensure that rare subgroups are sufficiently represented. 2.4 Implementing Prism BlinkDB [2] pre-computes stratified samples of different sizes and One can modify a database engine to add PI support to all oper- responds to queries within a given error and response time by choos- ators in the engine. However, we can realize Prism without incur- ing the correct sample to compute the query on. Stratified sampling ring this effort. The idea is to leverage a stream processing engine is easy to implement with Prism: we perform a GroupApply opera- (SPE) as the progressive query processor. In particular, the seman- tion [8] by the key, with an AlterLifetime inside the GroupApply to tics underlying a temporal SPE such as NILE [19], STREAM [4], create progress-batches as before. The temporal Union that merges or StreamInsight [3] (based on temporal databases [22]) can be groups respects timestamp ordering, resulting in a final dataset with leveraged to denote progress, with the added benefit of incremen- PIs that exactly represent stratified sampling. Stratified samples of tal processing across samples when possible. With StreamInsight’s increasing size can be constructed similarly. temporal model, for example, the event validity time interval [6] Other Examples For online aggregation, we may assign non- [V s , Ve ) directly denotes the PI [P+ ,P- ). T 1 is an insertion and T 2 decreasing P+ values over a pre-defined random order of tuples for is a retraction (or revision [33]). Likewise, T 1 and T 2 correspond quick result convergence. Active learning [11] changes the sam- to Istreams and Dstreams in STREAM, and positive and negative pling strategy based on outcomes from prior samples. Prior propos- tuples in NILE. We feed the input tuples converted into events to als for ordering data for quick convergence [17, 16, 30, 9] simply a continuous query corresponding to the original atemporal SQL correspond to different PI assignment schemes in Prism.
5 . PI same progress-sync. Data movement in Now! is fully pipelined Blob Blob Blob Blob Blob Blob annotated in terms of these progress-batches, in progress-sync order. Input Map Stage • Sort-free data shuffle MR sorts the map output by key, fol- Map Stage (Progress-aware batching) lowed by a merge to enable grouping by key at reducers. This Sort Disk Disk Disk No sort-merge operation in MR is a performance bottleneck [26]. In Progressive data Shuffle Sort shuffle contrast, the batched map output in Now! is partitioned and shuf- Merge Merge Merge fled across the network to reducers without sorting (§ 3.2.2), thus Reduce Stage Progress Aware Merge Progressive Reducer (Gen API) retaining progress-sync order with improved performance. In Memory • Progress-aware merge A progress-aware merge at reducers is Blob Blob Blob data transfer key to enabling the Prism model for progressive query results. Map Stage Each reducer groups together batches received from different (Progress-aware batching) Map Stage mappers, that belong to the same PI, into a single progress-batch, No and ensures that all progress-batches are processed in strict progress- Progressive data Disk Disk Disk Sort shuffle sync order (§ 3.2.3) along all data flow paths. Shuffle Progress Aware Merge Data flow between map and reduce in Now! uses TCP connec- Merge Merge Merge Reduce Stage Progressive Reducer (Gen API) tions which guarantee FIFO delivery. Since the input data is read in progress-sync order and all components retain this invariant, we Progressive Output Blob are guaranteed global progress-sync order for progress-batches. Blob Blob Blob Blob Blob 2) Progress-aware reducers: Now! introduces the notion of a progress- (a) Vanilla MR (b) Now! aware reducer (Section 3.2.4), that accepts and produces augmented tuples in progress-sync order, and logically adheres to the Prism Figure 6: System architecture (MR vs. Now!). query model. The progress-aware merge generates progress-batches in progress-sync order; these are fed directly to reducers that pro- 2.6 Performance Optimizations duce early results in progress-sync order. While one could write Query processing using an in-memory streaming engine can be custom reducers, we use an unmodified SPE (§ 2.4) as a progress- expensive since the final answer is over the entire dataset. Prism aware reducer for progressive relational queries. enables crucial performance optimizations that can improve perfor- 3) Multi-stage support: Now! supports concurrent scheduling of mance significantly in practical situations. Consider computation all jobs in a multi-stage query and co-location of mappers of de- Qc , which is partitionable by UserId. We can exploit the compile- pendent jobs with the reducers of feeding jobs on the same slave time property that progress-sync ordering is the same as (or cor- machine (Section 3.3). Data transfer between jobs is in-memory related to) the partitioning key, to reduce memory usage and con- providing significant savings in a Cloud deployment where blob sequently throughput. The key intuition is that although every tu- access is expensive. ple with PI [P+ , ∞) logically has a P- of ∞, it does not contribute 4) Flow control: Now! provides end-to-end flow control to avoid to any progress point beyond P+ . Thus, we can temporarily set buffer overflows at intermediate stages such as mapper output, re- P- to P+ +1 before feeding the tuples to the SPE. This effectively ducer input and reducer output for multi-stage MR. The flow con- causes the SPE to not have to retain information related to progress trol mechanism ensures data flows at a speed that can be sustained point P+ in memory once computation for P+ is done. The result by downstream consumers. We use a blocking concurrent queue tuples have their P- set back to ∞ to retain the original query se- (BCQ), a lock-free data structure which supports concurrent en- mantics (these query modifications are introduced using compile- queue and dequeue operations, for implementing an end-to-end time query rewrites). A similar optimization applies to equi-joins; flow control mechanism for Now! (our technical report [7] has more see [7] for details. We will see in Section 5 that this optimization details on flow control in Now!). can result in orders-of-magnitude performance benefits. 5) In-memory data processing: By default, Now! materializes map We next discuss our new big data framework called Now!, that output on disk to provide better data availability during failure re- implements Prism for MR-style computation at scale in a distributed covery. For better interactivity, we also support a high-performance setting (applying Prism to other models such as graphs is an inter- in-memory mode (see Section 4). esting area of future work). 3.2 Progress-aware data flow & computation 3. Now! ARCHITECTURE AND DESIGN Data flow in Now! is at the granularity of progress-batches and governed by PIs. This section describes the generation and flow of 3.1 Overview these progress-batches in the framework. At a high level, Now!’s architecture is based on the Map-Reduce 3.2.1 Progress-aware batching (MR) [14] computation paradigm. Figure 6 shows the overall de- The input data is partitioned into a number of input splits (one for sign of Now! (right) as compared to vanilla MR (left), for a query each mapper), data tuples in each of which are assigned progress with two stages and different partitioning keys. Blobs in the figure intervals in progress-sync order. The mapper reads its input split as indicate the format of input and output data on Windows Azure’s progress annotated tuples (progressive samples), and invokes the distributed Cloud storage, and can be replaced by any distributed user’s map function.The resulting augmented key-value pairs are persistent storage such as HDFS. The key points are as follows: partitioned by key to produce a sequence of progress-batches for 1) Progress-aware data flow: Now! implements the Prism progress each partition (downstream reducer). A progress-batch consists of model and provides support for data flow (§ 3.2) in strict progress- all tuples with the same progress-sync value (within the specific sync order. The main components of progress-aware data flow are: partition) and has a unique ID. Each progress-batch sequence is • Batching Now! reads input data annotated with PIs (progres- in strictly increasing progress-sync order. The input text reader sive samples) and creates batches (§ 3.2.1) of tuples with the appends an end-of-file (eof) marker to the mapper’s input when it
6 . PI User Ad Mappers [0, ∞) u0 a0 PI User Ad M0 M1 M2 M3 Mn [0, ∞) u0 a0 [0, ∞) u0 a0 PI User Ad [0, ∞) u0 a0 PI User Ad [0, ∞) u0 a0 Progress ordered [1, ∞) u1 a0 [0, ∞) u1 a0 download [0, ∞) u0 a0 [1, ∞) u1 a0 [1, ∞) u1 a1 [0, ∞) u1 a1 Download Manager [1, ∞) u1 a1 PI User Ad PI User Ad [2, ∞) u2 a1 [2, ∞) u2 a1 [1, ∞) u2 a1 [2, ∞) u2 a1 Map output [2, ∞) u2 a1 [1, ∞) u2 a1 queues [2, ∞) u2 a1 [2, ∞) u2 a1 [1, ∞) u2 a1 Progress-aware [2, ∞) u2 a1 [2, ∞) u2 a1 [1, ∞) u2 a1 merge (a) (b) (c) Progress- Figure 7: (a) Input data annotated with PIs; (b) Progress- Merged Aware Progressive map output output batches according to input data PI assignment; (c) Progress- Reducer batches with modified granularity using a batching function. Figure 8: Progress-aware merge. reaches the end of its input split. The mapper, on receipt of the eof marker, appends it to all progress-batch sequences. are given in Algorithm 1. The algorithm takes as input the number of mappers M, a set of BCQs B where qi ∈ B denotes the blocking Batching granularity. The batching granularity in the framework concurrent queue for mapper i, the current progress-sync value cmin is determined by the PI assignment scheme (§ 2.5) of the input of the merged batch that needs to be produced (cmin is initialized to data. Now!, also provides a control knob to the user, in terms of a the minimum progress-sync across the heads of the BCQs), and H, parameterized batching function, to vary the batching granularity where hi ∈ H is the progress-sync value currently at the head of qi of the map output as a factor of the PI annotation granularity of the (hi is initialized to the progress-sync value at the head of qi ). actual input. This avoids re-annotating the input data with PIs if the The algorithm initializes an empty set O as output. It iterates over user decides to alter the granularity of the progressive output. all mapper queues to find and dequeue the batches whose progress- Example 2 (Batching) Figure 7(a) shows a PI annotated input split sync values match cmin , adds them to O and updates hi to the new with three progressive samples. Figure 7(b) shows the correspond- value at the head of qi . It finally updates cmin and returns O, a ing batched map output, where each tuple in a batch has the same merged batch with all tuples having the same progress-sync value. progress-sync value. Figure 7(c) shows how progress granularity is O is then fed to the progressive reducer. If O = ∅, indicating end + of input on all BCQs, the framework passes an eof marker to the varied using a batching function that modifies P+ . Here, P+ = Pb is the batching function, with the batching parameter b set to 2. progressive reducer signaling termination of input. Algorithm 1: Progress-aware merge 3.2.2 Progressive data shuffle input : # of Mappers M, B = {q1 , . . . , q M }, cmin , H = {h1 , . . . , h M } Now! shuffles data between the mappers and reducers in terms output : Merged batch O of progress-batches without sorting. As an additional performance begin enhancement, Now! supports a mode for in-memory transfer of data O = ∅; between the mappers and reducers with flow control to avoid mem- for each qi ∈ Q do ory overflow. We pipeline progress-batches from the mapper to the if (hi ==∞) then continue; reducers using a fine-grained signaling mechanism, which allows progress-sync = peek(qi ); // peek blocks if qi = ∅ the mappers to inform the job tracker (master) the availability of a if (progress-sync==eof) then hi = ∞; continue; progress-batch. The job tracker then passes the progress-batch ID and location information to the appropriate reducers, triggering the hi =progress-sync; if (hi == cmin ) then respective map output downloads. O = O dequeue(qi ); The download mechanism on the reducer side has been designed progress-sync = peek(qi ); to support progress-sync ordered batch movement. Each reducer if (progress-sync==eof) then hi = ∞; maintains a separate blocking concurrent queue (BCQ) for each else hi =progress-sync; mapper associated with the job. As mentioned earlier, the BCQ is a cmin = min(H); return O; lock-free in-memory data structure which supports concurrent en- queue and dequeue operations and enables appropriate flow control end to avoid swamping of the reducer. The maximum size of the BCQ is a tunable parameter which can be set according to the available 3.2.4 Progress-aware reducer memory at the reducer . The reducer enqueues progress-batches, Let partition denote the set of keys that a particular reducer is downloaded from each mapper, into the corresponding BCQ as- responsible for. In traditional MR, the reducer gathers all values sociated with the mapper, in strict progress-sync order. Note that for each key in the partition and invokes a reduce function for each our batched sequential mode of data transfer means that continuous key, passing the group of values associated with that key. Now! connections do not need to be maintained between mappers and re- instead uses progress-aware reducers whose input is a sequence of ducers, which aids scalability. progress-batches associated with that partition in progress-sync or- 3.2.3 Progress-aware merge der. The reducer is responsible for per-key grouping and computa- tion, and produces a sequence of progress-batches in progress-sync Now! implements the Prism model using a progress-aware merge order as output. We use the following API to achieve this: mechanism which ensures flow of data in progress-sync order along all paths in the framework. Figure 8 shows the high level design of Unchanged map API: the progress-aware merge module within each reducer. Once a map void map(K1 key, V1 value, Context context output is available in each of the map output queues, the reducer Generalized Reduce API: invokes the progress-aware merge mechanism the details of which void reduce( Iterable<K2, V2> input, Context context)
7 .Algorithm 2: Scheduling O1 O2 Output Files input : R f , Ro , Mi , Md , dependency table Final output R1 R2 begin J3 M1 M2 for each r ∈ R f do Final Job J3 Blocking Dispatch r; Concurrent Queue if Dispatch successful then Make a note of tracker ID; R1 R2 Intermediate J2 J2 for each r ∈ Ro do Dispatch r; Job M1 M2 for each m ∈ Md do Blocking Concurrent Dispatch m, co-locating it with its feeder reducer; Initial Job J1 Queue R1 R2 for each m ∈ Mi do J1 Dispatch m closest to input data location; Data input M1 M2 M3 end F1 F2 F3 Input Files (a) (b) Data flow Task placement Here, V1 and V2 include PIs. Now! also supports the traditional Figure 9: Multi-stage map reduce data flow. reducer API to support older workflows, using a layer that groups them with the reducers of the previous stage in accordance with the active tuples by key for each progress point, invokes the traditional dependency table and using the task tracker information retained in reduce function for each key, and uses the reduce output to generate step 1 of the algorithm. Finally, all the map tasks in Mi are sched- tuples with PIs corresponding to that progress point. uled closest to the input data location. Placing tasks in this order Progressive SQL While users can write custom progress-aware ensures that if there exists a feasible placement of all MR tasks that reducers, we advocate using an unmodified temporal streaming en- would satisfy all job dependencies, we will find such a placement. gine (such as StreamInsight) as a reducer to handle progressive re- Data flow between jobs Figure 9 shows a sample placement of lational queries (§ 2.4). Streaming engines process data in times- map and reduce tasks for processing a query that constitutes three tamp order, which matches with our progress-sync ordered data jobs, J1 , J2 and J3 . Figure 9(a) shows the data flow between jobs movement. Temporal notions in events can be reinterpreted as and Figure 9(b) shows the placement of map and reduce tasks as progress points in the query. Further, streaming engines naturally per Now!’s scheduling algorithm (Ref Algorithm 2). The shaded handle efficient grouped subplans using hash-based key partition- portions in the figure indicate that the corresponding map and re- ing, which is necessary to process tuples in progress-sync order. duce tasks have been co-scheduled on the same slave machine. The 3.3 Support for Multi-stage scheduler also verifies that the number of dependent map tasks are equal to the number of feeder reduce tasks of a preceding job, thus We find that most analytics queries need to be expressed as multi- ensuring that there is one dependent map task for each feeder re- stage MR jobs. Now! supports a fully pipelined progressive job duce task that is co-scheduled on the same slave machine. execution across different stages using concurrent job scheduling Data flow between jobs is modeled on the producer-consumer and co-location of processes that need to exchange data across jobs. paradigm using a BCQ and takes place completely in memory avoid- Concurrent Job Scheduling The scheduler in Now! has been ing data materialization and shuffling overheads. Further, co-location designed to receive all the jobs in a multi-stage query as a job graph, of the reducers and mappers of dependent jobs does away with the from the application controller. Each job is converted into a set of overhead of data serialization, de-serialization and expensive net- map and reduce tasks. The scheduler extracts the type information work I/O between stages in a Cloud setting. from the job to construct a dependency table that tracks, for each task within each job, where it reads from and writes to (a blobs or some other job). The scheduler uses this dependency table to 4. DISCUSSION AND EXTENSIONS partition map tasks into a set of independent map tasks Mi which read their input from a blob/HDFS, and a set of dependent map 4.1 High Availability (HA) tasks Md whose input is the output of some previous stage reducer. Upadhyaya et al. [35] have recently shown how a multi-stage Similarly, reduce tasks are partitioned into a set of feeder tasks R f pipelined map-reduce system can support hybrid strategies of re- that provide output to mappers of subsequent jobs, and a set of play and checkpointing; these solutions are applicable in our set- output reduce tasks Ro that write their output to a blob/HDFS. ting. Specifically, the failure semantics for Now! are: Algorithm 2 shows the details of how the map and reduce tasks Map task failure: Any map task in progress or completed on a corresponding to different jobs are scheduled1 . First, all the re- failed worker node needs to be rescheduled as in vanilla MR. duce tasks in R f are scheduled on slave machines that have at least one map slot available to schedule a corresponding dependent map Reduce task failure: After a reduce task fails, one can replay its task in Md which would consume the feeder reduce task’s output. input starting from the last checkpoint (map output is materialized The scheduler maintains a state of the task tracker IDs of the slave on local storage to allow replay). Interestingly, Prism can further machines on which these feeder reduce tasks have been scheduled. reduce the cost of replay after a failure. The key insight is that Second, all the reducers in Ro are scheduled depending on the avail- processing at progress point p depends only on input tuples whose ability of reduce slots on various slave machines in a round robin PIs are stabbed by p. We can leverage this in two ways: manner. Third, all the map tasks in Md are dispatched, co-locating • We can filter out tuples with P- ≤ p during replay to significantly reduce the amount of data replayed and prune the intermediate 1 If the scheduler is given additional information such as the stream- map output saved on local storage2 . ing query plan executing inside reducers, we may be able to lever- age database cost estimation techniques to improve the scheduling 2 algorithm. This is a well studied topic in prior database research, This optimization does not apply to external input which has P- and the ideas translate well to our setting. set to ∞, but can apply to intermediate results in multi-stage jobs.
8 .• During replay, we can set P+ = max(p, P+ ) for replayed tuples this baseline to incorporate our new design features (see Section 3) so that the reducer does not need to re-generate early results for such as pipelining, progress-based batching, progress-sync merge, progress points earlier than p. multi-stage job support, concurrent job scheduling, etc. Now! de- Prior research [32] has reported that input sizes on production ployed on the Windows Azure Cloud platform, uses Azure blobs clusters are usually less than 100GB. Further, progressive queries as persistent storage and Azure VM roles as JobTracker and Task- are usually expected to end early. Therefore, Now! supports an ef- Tracker nodes. Multi-stage job graphs are generated by users and ficient no-HA mode, where intermediate map output is not materi- provided to Now!’s JobTracker as input; each job consists of input alized on local storage and no checkpointing is done. This requires files, a partitioning key (or mapper), and a progressive reducer. Al- a failure to cascade back to the source data (we simply restart the though Now! has been developed in C# and evaluated on Windows job). Restarting the job on failure is a cheap and practical solu- Azure, its design features are not tied to any specific platform. For tion for such systems as compared to traditional long-running jobs. example, Now! could be implemented over Hadoop using HDFS That said, we acknowledge that high availability with low recovery and deployed on the Amazon EC2 cloud. time (e.g., by restarting only the failed parts of the DAG) is impor- Now! makes it easy to employ StreamInsight as a reducer for pro- tant in some cases. Prior work [35, 38] has studied this problem; gressive SQL, by providing an additional API that allow users to these ideas apply in our setting. We leave the implementation and directly submit a graph of key, query pairs, where query is a SQL evaluation of such fine-grained HA in Now! as future work. query specified using LINQ [34]. Each node in this graph is auto- matically converted into a job. The job uses a special progressive 4.2 Straggler and Skew Management reducer that uses StreamInsight to process tuples. The Now! API can be used to build front-ends that automatically convert larger Stragglers A consequence of progress-sync merge is that if a Hive, SQL, or LINQ queries into job graphs. Although the system previous task makes slow progress, we need to slow down overall has been designed for the Cloud and uses Cloud storage, it also progress to ensure global progress-sync order. While progress-sync supports deployment on a cluster of machines (or private Cloud). order is necessary to derive the benefits of Prism, there are fixes Now! includes diagnostics for monitoring CPU, memory, and I/O that avoid sacrificing semantics and determinism: usage statistics. These statistics are collected by an instance of a • Consider n nodes with 1 straggler. If the processing skew is a log manager running on each machine which outputs these in the result of imbalanced load, we can dynamically move partitions form of logs which are stored as blobs in a separate container. from the straggler to a new node (we need to also move reducer state). We may instead fail the straggler altogether and re-start its 5.2 Experimental Setup computation by partitioning its load equally across the remaining System Configuration The input and final output of a job graph n − 1 nodes. The catch-up work gets done n − 1 times faster, are stored in Azure blobs. Each Azure VM role (instance) is a large- resulting in a quicker restoration of balance 3 . sized machine with 4 1.6GHz cores, 7GB RAM, 850GB of local • We could add support for compensating reducers, which can storage, and 400Mbps allocated I/O bandwidth. Each instance was continue to process new progress points, but maintain enough configured to support 5 map slots and 2 reduce slots. We experi- information to revise or compensate their state once late data is ment with up to 75 instances in our tests4 . received. Several engines have discussed support for compensa- Datasets We use the following datasets in our evaluation, with tions [6, 33], and fit well in this setting. dataset sizes based upon the aggregate amount of memory needed As we have not found stragglers to be a problem in our experiments to run our queries over them: on Windows Azure VMs, the current version of Now! does not ad- • Search data. This is a real 100GB search dataset from Bing, that dress this issue. A deeper investigation is left as future work. consists of userids and their search terms. The input splits were Data Skew Data skew can result from several reasons: created by sharding the data into a number of files/partitions, and • Some sampling strategies encoded using PIs may miss out on annotating with fine-grained PI values. outliers or rare sub-populations within a population. This can • TPC-H data. We used the dbgen tool to generate a 100GB TPC- be resolved using stratified sampling which can be easily imple- H benchmark dataset, for experiments using TPC-H queries. mented in Prism as discussed in Section 2.5. • Click data. This is a real 12GB dataset from the Microsoft Ad- • Skew in the data may result in some progress-batches being larger Center advertising platform, that comprises of clicks and impres- than others at the reducers. However, this is no different from sions on various ads over a 3 month period. skew in traditional map-reduce systems, and solutions such as [24] are applicable here. Queries We use the following progressive queries: Since skew is closely related to the straggler problem, techniques • Top-k correlated search. The query reports the top-k words that mentioned earlier for stragglers may also help mitigate skew. are most correlated with an input search term, according to a goodness score, in the search dataset. The query consists of two Now! jobs, one feeding the other. The first stage job uses the 5. EVALUATION data set as input and partitions by userid. Each reducer com- putes a histogram that reports, for each word, the number of 5.1 Implementation Details searches with and without the input term, and the total number of Now! is written in C# and deployed over Windows Azure. Now! searches. The second stage job groups by word, and aggregates uses the same master-slave architecture as Hadoop [36] with Job- the histograms from the first stage, computes a per-word good- Tracker and TaskTracker nodes. TaskTracker nodes are allocated a ness, and performs top-k to report the k most correlated words to fixed number of map and reduce slots. Heartbeats are used to en- the input term. We use “music” as the default term. sure that slave machines are available. We modified and extended 3 4 If failures occur halfway through a job on average, jobs run for Our Windows Azure subscription allowed no more than 300 2.5/(n − 1) times as long due to a straggler with this scheme. cores; this limited us to 75 4-core VM instances.
9 . Effect of progressive computation Effect of batch size Performance Analysis 100 100 Write to Blob 200 Query processing time (mins) SMR: 8GB MR Time taken (mins) 80 80 2nd Stage Reduce 150 Now!: 8GB SMR 2nd Stage Map D/n loads % Time taken 60 Now! 60 SMR: 6GB 100 Time to first batch (Now!) 2nd Stage Map Now!: 6GB 40 40 50 1st Stage Reduce 20 20 1st Stage Map D/n loads 0 0 1st Stage Map 0 20 40 60 80 100 80 600 1200 6000 0 Progress % Batch Size (MB) 30GB 15GB Input enumeration (a) (b) (c) Scalability with increase in data size Throughput Scalability Effect of map output materialization Map output shuffle time 10000 50 Query processing time (mins) Throughput scale-up SMR 6 Map o/p in-memory Scale up: Now! Scale-Up factor Now! 40 100 1000 5 Map o/p on disk Scale-up (in secs) 30 4 factor 75 Log scale 100 20 3 10 2 50 10 1 1 0 25 20 30 45 60 74 3 5 6 8 9 13 15 30 (1X) (1.5X) (2.25X) (3X) (3.7X) 0 Data size (GB) # Machines (d) (e) (f) Figure 10: Performance analysis.(a) Time taken to process a query in progress-sync order; (b) Effect of batching granularity; (c) Analysis of time taken by different elements for a two-stage Map-Reduce query. Scalability: (d) Effect of data size on query processing time; (e) Throughput scalability with increase in #machines; (f) Overheads of disk I/O (Map output materialization). • TPC-H Q3. We use a generalization of TPC-H query 3: and reducers and their subscripts (1, 2) represent the stage to which SELECT L_ORDERKEY, SUM(L_EXTENDEDPRICE*(1-L_DISCOUNT)) AS they belong (note that R1 = M2 ). A single stage job is depicted as REVENUE, O_ORDERDATE, O_SHIPPRIORITY FROM ORDERS, LINEITEM M1 − R1 . In our experiments, the number of mappers is equal to the WHERE L_ORDERKEY = O_ORDERKEY number of input splits (stored as blobs). The number of reducers is GROUP BY L_ORDERKEY, O_ORDERDATE, O_SHIPPRIORITY chosen based on the memory capacity of each worker node (7GB • CTR. The CTR (click-through-rate) query computes the MR job RAM) and the number of mappers feeding the reducers. graph shown in Figure 2 (our running example). It consists of three queries (Qc , Qi , and Qctr ) where Qc is a click query which 5.3 Experiments and Results computes the number of clicks from the click dataset, Qi is an impressions query which computes the number of ad impressions 5.3.1 Effect of Progressive Computation from the impression data set and Qctr computes the CTR. We evaluate Now!’s performance vs. SMR in terms of time taken to produce progressive results. The first experiment (see Fig- Baselines We evaluate Now! against several baseline systems: ure 10(a)) plots the time taken to run the top-k correlated search • Map-Reduce (MR). For standard map-reduce, we use Daytona [13], query which provides the top 100 words that were searched with a C# implementation of vanilla hadoop for Windows Azure. This “weather”, in terms of progress-batches plotted in progress-sync or- baseline provides an estimate of time taken to process a parti- der. The input data set was batched by the mapper into 75 progress- tioned query without progressive results. batches by Now!. For the SMR baseline, the data was ordered and • Stateful MR (SMR). Stateful MR is an extension of MR for it- split into 75 chunks (one per PI). Each chunk representing one PI, erative queries [5], that maintains reducer state across MR jobs. was processed as a separate MR job and the time taken taken for We use it for progressive results by chunking the input into batches, the same was recorded. Each point on the plot represents an aver- and submitting each chunk (in progress-sync order) as a separate age of five runs. We used datasets of two sizes (6 and 8GB). The MR job. Subsequent chunks use reducers that retain the prior experimental results show that Now! performs much better (6X im- job’s state. For each chunk, we run each MR stage as a vanilla provement) than SMR, which processes each progress batch as a MR job. With multi-stage jobs, we process one chunk through separate job and resorts to expensive intermediate output material- all stages before submitting the next chunk to the first stage. ization, hurting performance, particularly in a Cloud setting. Also, the time taken for the first 50% of the progress batches is under • MRO [12]. MRO pipelines data between the mappers and re- 20mins as opposed to 105mins for SMR, for the 8GB dataset, high- ducers, but is unaware of progress semantics and does not use lighting the benefit of platform support for progressive early results. progress-sync merge at the reducers. This can lead to different nodes progressing at different speeds. We approximate MRO in 5.3.2 Effect of Batching Now! by replacing the progress-aware merge with a union5 . We evaluate the performance of Now! for different progress- Job configuration and parameter settings. The configuration batch sizes and compare the same with SMR and MR. The MR for a two-stage job (with one job feeding another) is depicted as baseline processes the entire input as a single batch. The granu- M1 − R1 − M2 − R2 where M and R represent the number of mappers larity of batch size controls the number of progress batches. The dataset size used in this experiment is 6GB and the configuration is 5 This baseline benefits from our other optimizations such as con- 94-26-26-4. The experiment shows the results for 3 different batch current job scheduling, no sorting, and pipelining across stages. sizes: 80MB (75 batches), 600MB (10 batches) and 1200MB (5
10 . Resource Util: Now! (94-26-26-4) Resource Utilization: SMR (94-26-26-4) Resource Util: Now! (No Mem opt) Memory Util Mean (Memory Util) Memory Util Mean (Memory Util) Memory Util Mean (Mem Util) CPU Util Mean (CPU Util) CPU Util Mean (CPU Util) CPU Util Mean (CPU Util) 2000 300 Normalized CPU Util Time taken : 19 mins 4secs 1500 300 Normalized CPU Util Time taken: 53 mins 5000 300 Memory Util (MB) % CPU Utilization 250 Time taken : 86 mins 20 secs Memory Util (MB) % CPU Utilization Memory Util (MB) 1500 250 % CPU Util 4000 250 200 1000 200 200 1000 150 3000 150 150 100 500 100 2000 500 100 50 50 1000 50 0 0 0 0 0 0 0 20 40 60 80 100 0 20 40 60 80 100 0 20 40 60 80 100 Progress (% Time elasped) Progress (% Elapsed time) Progress (% Elapsed time) (a) (b) (c) Resource Util: Now! (Mem opt) Effect of sort order: Resource Util Effect of Sort Order: X-put scalability Memory Util Mean (Mem Util) Memory Util* Normalized CPU Util Time taken * Time taken Time taken (mins) Log scale % CPU Util Mean (CPU Util) Memory Util Normalized CPU Util * * With memory optimization 400 300 4 1200 1000 Time taken: 4 mins 26 secs Scale-up:12X Scale-up:12.8X Out of * With memory optimization Memory Util (MB) 250 % CPU Utilization 1000 memory 189mins Memory Util (GB) 300 3 200 % CPU Util 800 100 53mins 200 150 2 600 14.83mins 18.15mins 100 400 100 1 10 50 4.43mins 200 0 0 0 0 0 20 40 60 80 100 1 10GB (10 machines) 60GB (60 machines) Progress (% Elapsed time) 10GB (10 m/cs) 60GB (60 m/cs) 100GB (74 m/cs) (d) (e) (f) Figure 11: Resource Utilization. (a) CPU and memory utilization Now!; (b) CPU and memory utilization SMR;(c) CPU and memory utilization without memory optimization; (d) CPU and memory utilization with memory optimization. (e) Effect of sort order on memory and % CPU utilization for different data sizes; (f) Memory optimization effects on query processing time. batches), and compares them against vanilla MR which processes the choice of progressive reducer and the type of query. Our current the entire input of 6GB at once. results use StreamInsight as the progressive reducer. Figure 10(b) shows the change in total query processing time with change in batch size. As the batch-size decreases from 1200MB 5.3.4 Scalability to 80MB, the number of batches processed by the system increases Figure 10(d) evaluates the effect of increase in data size on query from 5 to 75. The query processing time of SMR increases dras- processing time in Now! as compared to SMR. We used the top- tically with the increase in the number of batches, which can be k correlated search query for the experiment and varied the data attributed to the fact that it processes each batch as a separate MR size from 2.8GB to 30GB. The results show that Now! provides a job and resorts to intermediate data materialization. The MR base- scale-up of up to 38X over SMR in terms of the ratio of their query line which processes the entire input as a single batch does better processing times. This can be attributed to pipelining, no sorting than SMR , but does not provide early results. in the framework and no intermediate data materialization between On the other hand, the query processing time for Now! does not jobs. Figure 10(e) shows the scale-up provided by Now! in terms of vary much with increase in number of batches as it is pipelined, throughput (#rows processed per second) with the increase in #ma- does not start a new MR job for each batch, and does not material- chines. For the top-k correlated search query (top 100 words corre- ize intermediate results between jobs. We do see a slight increase in lated to “music”), we achieved a 6X scale-up with 74 machines as query processing time when the number of batches increases from compared to the throughput on 20 machines, for 15GB data. 10 to 75, which can be attributed to a moderate increase in batching 5.3.5 Data Materialization Overheads overheads. However, the smallest batch-size provides the earliest Writing map outputs on the local disk, has a significant perfor- progressive results and at the finest granularity. The figure shows mance penalty, while on the other hand, intermediate data material- the time to generate the first progress batch i.e., the time when the ization provides higher availability in presence of failures . Figure user starts getting progressive results. The time to first batch in- 10(f) shows the overhead of disk I/O in materializing map output creases with increase in batch size (or progressive sample size), but on disk and subsequent disk access to shuffle the data to the reduc- is significantly lower than the total query processing time. ers within a job. Our results show an overhead of approx 90 secs for a dataset of 8GB for the 94-26-26-4 configuration. 5.3.3 Performance Breakdown Now! is tunable to work in both modes (with and without disk We analyzed the performance of Now! using our diagnostic mon- I/O) and can be chosen by the user depending on the application itoring module which logs CPU, memory, and I/O usage. Fig- needs and the execution environment. It is also pertinent to note ure 10(c) analyses the performance of the two-stage top-k corre- here that there is no data materialization on persistent storage (HDFS lated search query with k = 100, and plots the % time taken by dif- or Cloud) between different Map-Reduce stages in Now! which ferent components in Now!. Each data point in the figure is an av- provides a similar performance advantage for multi-stage jobs over erage over 10 runs, for two different datasets (15GB and 30GB) on MR/SMR as seen in section 5.3.1. 30 machines. The results indicate that the maximum time is spent in the first stage reducer followed by the second stage reduce and 5.3.6 Resource Utilization writing the final output to the blobs. The framework does not have We evaluated Now! for its resource utilization in terms of mem- any major bottlenecks in terms of pipelining of progress-batches. ory and CPU. Figures 11(a,b) compare the memory and CPU uti- The time taken by the two reduce stages would vary depending on lization of Now! and SMR for the 94-26-26-4 configuration for a
11 .dataset size of 8GB. The figures show the average real time memory Top-k Convergence 1 and CPU utilization over 30 slave machines each running 4 map- pers and 1 reducer plotted against time. The results indicate that Precision 0.8 there is no significant difference in the average memory utiliza- k-1000 k-500 tion for both platforms, and the average CPU utilization of Now! 0.6 k-100 k-50 is actually higher than that of SMR. However, we also show the k-10 normalized %CPU utilization for SMR which is the product of the 0.4 average CPU utilization and the normalization factor (ratio of time 0 20 40 60 80 100 % Progress taken by SMR to the time taken by Now!.) The normalized %CPU utilization is much higher as SMR takes approx 4.5X more time to (a) complete as compared to Now!. Thus, Now! is ideal for progressive computation on the Cloud, where resources are charged by time. CTR Estimation error Now! MRO (Skew 0.2) MRO (Skew 0.5) MRO (Skew 2) MRO (Skew 5) 5.3.7 Memory Optimization using Sort Orders 400 300 The next experiment investigates the benefit of our memory op- % CTR Error 200 timization (cf. Section 2.6) in case the progress-sync order is cor- 100 related with the partitioning key. Our TPC-H dataset uses progress 0 in terms of the L ORDERKEY attribute, and TPC-H Q3 also parti- tions by the same key. An optimized run can detect this at compile- -100 0 25 50 75 100 time and set P- =P+ +1, allowing the query to “forget” previous tu- % Progress (Progressive Output) ples when we move to the next progress-batch. An unoptimized (b) run would retain all tuples in memory in order to compute future join and aggregation results. We experiment with 10GB, 60GB and Figure 12: Qualitative analysis. (a) Top-k Convergence; (b) 100GB TPC-H datasets. Figures 11(c) and 11(d) show the varia- Error estimation of progressive results. other hand, progress semantics ensure that the data being correlated tion of memory and CPU utilization with progress with and with- always belongs to the same subset of users, which allows CTR to out memory optimization for the 10GB dataset. Figure 11(e) shows converge quickly and reliably, as expected. that the memory footprint of the optimized approach is much lower than the unoptimized approach, as expected. Further, it indicates 6. RELATED WORK that the lower memory utilization directly impacts CPU utilization Approximate Query Processing Online aggregation was orig- since the query needs to maintain and lookup much smaller join inally proposed by Hellerstein et al. [21], where the focus was synopses. Figure 11(f) shows that memory optimization gives an on grouped aggregation with statistically robust confidence inter- orders of magnitude reduction in time taken to process the TPC- vals based on random sampling. This was extended to handle join H Q3 for all the three datasets providing a throughput scale-up of queries using the ripple join [17] family of operators. Specialized approx 12X in two cases (10GB and 60GB). As indicated in the fig- sampling techniques have been widely studied in subsequent years ure, the 100GB run without memory optimization ran out of mem- (e.g., see [9, 20, 30]). Laptev et al. [25] propose iteratively com- ory (OOM) as the data per machine was much higher. puting MR jobs on increasing data samples until a desired approx- imation goal is achieved. BlinkDB [2] constructs a large number 5.3.8 Qualitative Evaluation of multi-dimensional samples offline using a particular sampling Result Convergence In order to determine the speed of conver- technique (stratified sampling) and chooses samples automatically gence we compute the precision (for the top-k correlated search based on a user-specified budget. query) of the progressive output values that we get as intermediate We follow a different approach: instead of the system taking results. Figure 12(a) varies k and plots precision against the number responsibility for query accuracy (e.g., as sampling techniques) of progress-batches processed for a data size of 15GB, with a con- which may not be possible in general, we involve the query writer figuration of 60-43-43-1 and 200 progress batches. The precision in the specification of progress semantics. A query processor using metric measures how close progressive results are to the final top-k. Prism can support a variety of user-defined progressive sampling We see that precision quickly reaches 90%, after a progress of less schemes; we view prior work described above as part of a layer be- than 20% as the top k values do not change much after sampling tween our generic progress engine and the user, that helps with the 20% of the data (lower k values converge quicker as expected). assignment of PIs in a semantically appropriate manner. This shows the utility of early results for real-world queries where MR Framework Variants Map-Reduce Online (MRO) [12] sup- the results converge very quickly to the final answer after process- ports progressive output by adding pipelining to MR. Early result ing small amounts of data. snapshots are produced by reducers, each annotated with a rough Progress Semantics We compare result quality against an MRO- progress estimate based on averaging progress scores from differ- style processing approach using the clicks dataset to compute CTR ent map tasks. Unlike our techniques, progress in MRO is an op- (Figure 2) progressively. We model variation in processing time us- erational and non-deterministic metric that cannot be controlled by ing a skew factor that measures how much faster Qi is, as compared users or used to formally correlate progress to query accuracy or to to Qc . A skew of 1 represents the hypothetical case where perfect specific input samples. From a data processing standpoint, unlike CTR information is known a priori, and queries follow this relative Now!, MRO sorts subsets of data by key and can incur redundant processing speed. Figure 12(b) shows the % error in CTR esti- computations as reducers repeat aggregations over increasing sub- mation plotted against % progress. The experiment shows that if sets (see [7] for more details). different queries proceed at different speeds, early results without Li et al. [26] propose scalable one pass analytics (SOPA), where user-defined progress semantics can become inaccurate (although they replace sort-merge in MR with a hash based grouping mecha- all techniques converge to the same final result). We see that even nism inside the framework. Our focus is on progressive queries, moderate skew values can result in significant inaccuracy. On the with a goal of establishing and propagating explicit progress in
12 .the platform. Like SOPA, we eliminate sorting in the framework, [3] M. Ali et al. Microsoft CEP Server and Online Behavioral but leave it to the reducer to process progress-sync ordered data. Targeting. 2009. Streaming engines use efficient hash-based grouping, allowing us [4] B. Babcock et al. Models and issues in data stream systems. 2002. to realize similar performance gains as SOPA inside our reducers. [5] R. Barga, J. Ekanayake, and W. Lu. Iterative mapreduce research on Azure. In SC, 2011. Distributed Stream Processing SPEs answer real-time tempo- [6] R. Barga et al. Consistent streaming through time: A vision for ral queries over windowed streams of data. We tackle a different event stream processing. 2007. problem: progressive results for atemporal queries over atempo- [7] B. Chandramouli et al. Scalable progressive analytics on big data in ral offline data, and show that our new progress model can in fact the cloud. Technical report, MSR. http://aka.ms/Jpe5f5. be realized by leveraging and re-interpreting the notion of time [8] B. Chandramouli et al. Temporal analytics on big data for web used by temporal SPEs. Now! is an MR-style distributed frame- advertising. In ICDE, 2012. work for progressive queries; it is markedly different from dis- [9] S. Chaudhuri, G. Das, and U. Srivastava. Effective use of block-level sampling in statistics estimation. In SIGMOD, 2004. tributed SPEs [1] as it leverages the explicit notion of progress to [10] S. Chaudhuri et al. On random sampling over joins. In SIGMOD, build a batched-sequential data-parallel framework that does not 1999. target real-time data or low-latency queries. The use of progress- [11] D. Cohn et al. Improving generalization with active learning. Mach. batched files for data movement allows Now! to amortize transfer Learn., 15, 1994. costs across reducer per-tuple computation cost. Now!’s architec- [12] T. Condie, N. Conway, P. Alvaro, J. M. Hellerstein, K. Elmeleegy, ture is designed along the lines of MR with extended map and re- and R. Sears. Mapreduce online. In NSDI, 2010. duce APIs, and is designed for a Cloud setting. [13] Daytona for Azure. http://aka.ms/unkcbq. Interactive Full-Data Analytics Dremel [29] and PowerDrill [18] [14] J. Dean and S. Ghemawat. Mapreduce: simplified data processing on large clusters. OSDI’04. are distributed system for interactive analysis of read-only large [15] A. Doucet, M. Briers, and S. Senecal. Efficient block sampling columnar datasets. Spark [37] provides in-memory data structures strategies for sequential monte carlo methods. Journal of to persist intermediate results in memory, and can be used to in- Computational and Graphical Statistics, 2006. teractively query big data sets or get medium-latency batchwise re- [16] P. J. Haas and J. M. Hellerstein. Ripple joins for online aggregation. sults on real-time data [38]. These engines have a different goal In SIGMOD 1999. from us; by fully committing memory and compute resources a [17] P. J. Haas and J. M. Hellerstein. Join algorithms for online priori, they provide full results to queries on hot in-memory data in aggregation. In IBM Research Report RJ 10126, 1998. milliseconds, for which they use careful techniques such as colum- [18] A. Hall, O. Bachmann, R. B¨ussow, S. G˘anceanu, and M. Nunkesser. Processing a trillion cells per mouse click. PVLDB July 2012. nar in-memory data organization for the (smaller) subset of data [19] M. Hammad et al. Nile: A query processing engine for data streams. that needs such interactivity. On the other hand, we provide generic 2004. interactivity over large datasets, in terms of early meaningful re- [20] J. M. Hellerstein and R. Avnur. Informix under control: Online query sults on progressive samples and refining results as more data is processing. Data Mining and Knowledge Discovery Journal, 2000. processed. Based on the early results, users can choose to poten- [21] J. M. Hellerstein, P. J. Haas, and H. J. Wang. Online aggregation. In tially end (or possibly refine) computations once sufficient accuracy SIGMOD, 1997. or query incorrectness is observed. [22] C. Jensen and R. Snodgrass. Temporal specialization. 1992. [23] C. Jermaine, S. Arumugam, A. Pol, and A. Dobra. Scalable 7. CONCLUSIONS approximate query processing with the dbo engine. SIGMOD ’07. Data scientists typically perform progressive sampling to extract [24] Y. Kwon et al. Skewtune: mitigating skew in mapreduce data for exploratory querying, which provides them user-control, applications. In SIGMOD, 2012. determinism, repeatable semantics, and provenance. However, the [25] N. Laptev, K. Zeng, and C. Zaniolo. Early accurate results for lack of system support for such progressive analytics results in a advanced analytics on mapreduce. PVLDB 2012. tedious and error-prone workflow that precludes the reuse of work [26] B. Li, E. Mazur, Y. Diao, A. McGregor, and P. J. Shenoy. A platform for scalable one-pass analytics using mapreduce. In SIGMOD 2011. across samples. We proposed a new progress model called Prism [27] O. Maron et al. Hoeffding races: Accelerating model selection that (1) allows users to communicate progressive samples to the search for classification and function approximation. In NIPS, 1993. system; (2) allows efficient and deterministic query processing over [28] M. D. McKay et al. Comparison of Three Methods for Selecting samples; and yet (3) provides repeatable semantics and provenance Values of Input Variables in the Analysis of Output from a Computer to data scientists. We showed that one can realize this model for Code. Technometrics, 21, 1979. atemporal relational queries using an unmodified temporal stream- [29] S. Melnik et al. Dremel: interactive analysis of web-scale datasets. ing engine, by re-interpreting temporal event fields to denote progress. PVLDB 2010. Based on this model, we built Now!, a new progressive data-parallel [30] N. Pansare, V. R. Borkar, C. Jermaine, and T. Condie. Online computation framework for Windows Azure, where progress is un- aggregation for large mapreduce jobs. PVLDB, 2011. [31] V. Raman, B. Raman, and J. M. Hellerstein. Online dynamic derstood and propagated as a first-class citizen in the framework. reordering for interactive data processing. VLDB ’99. Now! works with StreamInsight to provide progressive SQL sup- [32] A. Rowstron et al. Nobody ever got fired for using hadoop on a port over big data in Azure. Large-scale experiments showed orders- cluster. In HotCDP, 2012. of-magnitude performance gains achieved by our solutions, without [33] E. Ryvkina et al. Revision processing in a stream processing engine: sacrificing the benefits offered by our underlying progress model. A high-level design. In ICDE, 2006. While we have studied the application of Prism to MR-style com- [34] The LINQ Project. http://aka.ms/rjhi00. putation, applying it to other computation models (e.g., graphs) is [35] P. Upadhyaya, Y. Kwon, and M. Balazinska. A latency and an interesting area of future work. fault-tolerance optimizer for online parallel query plans. In SIGMOD, 2011. 8. REFERENCES [36] T. White. Hadoop: The Definitive Guide. 2009. [1] D. Abadi et al. The design of the Borealis stream processing engine. [37] M. Zaharia et al. Resilient distributed datasets: a fault-tolerant 2005. abstraction for in-memory cluster computing. NSDI’12. [2] S. Agarwal et al. Blinkdb: Queries with bounded errors and [38] M. Zaharia et al. Discretized streams: An efficient and fault-tolerant bounded response times on very large data. In EuroSys, 2013. model for stream processing on large clusters. In HotCloud, 2012.