Shark: SQL and Rich Analytics at Scale

Shark is a new data analysis system that marries query processing with complex analytics on large clusters. It leverages a novel distributed memory abstraction to provide a unified engine that can run SQL queries and sophisticated analytics functions (e.g., iterative machine learning) at scale, and efficiently recovers from failures mid-query. This allows Shark to run SQL queries up to 100× faster than Apache Hive, and machine learning programs up to 100× faster than Hadoop. Unlike previous systems, Shark shows that it is possible to achieve these speedups while retaining a MapReduce-like execution engine, and the fine-grained fault tolerance properties that such engines provide.
展开查看详情

1.Shark: SQL and Rich Analytics at Scale Reynold Shi Xin Joshua Rosen Matei Zaharia Michael Franklin Scott Shenker Ion Stoica Electrical Engineering and Computer Sciences University of California at Berkeley Technical Report No. UCB/EECS-2012-214 http://www.eecs.berkeley.edu/Pubs/TechRpts/2012/EECS-2012-214.html November 26, 2012

2. Copyright © 2012, by the author(s). All rights reserved. Permission to make digital or hard copies of all or part of this work for personal or classroom use is granted without fee provided that copies are not made or distributed for profit or commercial advantage and that copies bear this notice and the full citation on the first page. To copy otherwise, to republish, to post on servers or to redistribute to lists, requires prior specific permission.

3. Shark: SQL and Rich Analytics at Scale Reynold Xin, Josh Rosen, Matei Zaharia, Michael J. Franklin, Scott Shenker, Ion Stoica AMPLab, EECS, UC Berkeley {rxin, joshrosen, matei, franklin, shenker, istoica}@cs.berkeley.edu ABSTRACT MapReduce environments (e.g., Google Dremel [24], Cloudera Im- pala [1]) employ a coarser-grained recovery model, where an entire Shark is a new data analysis system that marries query process- query has to be resubmitted if a machine fails.1 This works well ing with complex analytics on large clusters. It leverages a novel for short queries where a retry is inexpensive, but faces significant distributed memory abstraction to provide a unified engine that challenges in long queries as clusters scale up [3]. In addition, can run SQL queries and sophisticated analytics functions (e.g., it- these systems often lack the rich analytics functions that are easy erative machine learning) at scale, and efficiently recovers from to implement in MapReduce, such as machine learning and graph failures mid-query. This allows Shark to run SQL queries up to algorithms. Furthermore, while it may be possible to implement 100× faster than Apache Hive, and machine learning programs some of these functions using UDFs, these algorithms are often up to 100× faster than Hadoop. Unlike previous systems, Shark expensive, furthering the need for fault and straggler recovery for shows that it is possible to achieve these speedups while retain- long queries. Thus, most organizations tend to use other systems ing a MapReduce-like execution engine, and the fine-grained fault alongside MPP databases to perform complex analytics. tolerance properties that such engines provide. It extends such an engine in several ways, including column-oriented in-memory stor- To provide an effective environment for big data analysis, we age and dynamic mid-query replanning, to effectively execute SQL. believe that processing systems will need to support both SQL and The result is a system that matches the speedups reported for MPP complex analytics efficiently, and to provide fine-grained fault re- analytic databases over MapReduce, while offering fault tolerance covery across both types of operations. This paper describes a new properties and complex analytics capabilities that they lack. system that meets these goals, called Shark. Shark is open source and compatible with Apache Hive, and has already been used at 1 Introduction web companies to speed up queries by 40–100×. Shark builds on a recently-proposed distributed shared memory Modern data analysis faces a confluence of growing challenges. abstraction called Resilient Distributed Datasets (RDDs) [33] to First, data volumes are expanding dramatically, creating the need perform most computations in memory while offering fine-grained to scale out across clusters of hundreds of commodity machines. fault tolerance. In-memory computing is increasingly important in Second, this new scale increases the incidence of faults and strag- large-scale analytics for two reasons. First, many complex analyt- glers (slow tasks), complicating parallel database design. Third, the ics functions, such as machine learning and graph algorithms, are complexity of data analysis has also grown: modern data analysis iterative, going over the data multiple times; thus, the fastest sys- employs sophisticated statistical methods, such as machine learn- tems deployed for these applications are in-memory [23, 22, 33]. ing algorithms, that go well beyond the roll-up and drill-down ca- Second, even traditional SQL warehouse workloads exhibit strong pabilities of traditional enterprise data warehouse systems. Finally, temporal and spatial locality, because more-recent fact table data despite these increases in scale and complexity, users still expect to and small dimension tables are read disproportionately often. A be able to query data at interactive speeds. study of Facebook’s Hive warehouse and Microsoft’s Bing analyt- To tackle the “big data” problem, two major lines of systems ics cluster showed that over 95% of queries in both systems could have recently been explored. The first, composed of MapReduce [13] be served out of memory using just 64 GB/node as a cache, even and various generalizations [17, 9], offers a fine-grained fault toler- though each system manages more than 100 PB of total data [5]. ance model suitable for large clusters, where tasks on failed or slow The main benefit of RDDs is an efficient mechanism for fault nodes can be deterministically re-executed on other nodes. MapRe- recovery. Traditional main-memory databases support fine-grained duce is also fairly general: it has been shown to be able to express updates to tables and replicate writes across the network for fault many statistical and learning algorithms [11]. It also easily supports tolerance, which is expensive on large commodity clusters. In con- unstructured data and “schema-on-read.” However, MapReduce trast, RDDs restrict the programming interface to coarse-grained engines lack many of the features that make databases efficient, and deterministic operators that affect multiple data items at once, such have high latencies of tens of seconds to hours. Even systems that as map, group-by and join, and recover from failures by tracking the have significantly optimized MapReduce for SQL queries, such as lineage of each dataset and recomputing lost data. This approach Google’s Tenzing [9], or that combine it with a traditional database works well for data-parallel relational queries, and has also been on each node, such as HadoopDB [3], report a minimum latency shown to support machine learning and graph computation [33]. of 10 seconds. As such, MapReduce approaches have largely been Thus, when a node fails, Shark can recover mid-query by rerun- dismissed for interactive-speed queries [25], and even Google is developing new engines for such workloads [24]. 1 Dremel provides fault tolerance within a query, but Dremel is lim- Instead, most MPP analytic databases (e.g., Vertica, Greenplum, ited to aggregation trees instead of the more complex communica- Teradata) and several of the new low-latency engines proposed for tion patterns in joins.

4. !!Master!Node User 0.7 Shark HDFS!NameNode Query 1 Hive Metastore Resource!Manager!Scheduler (System! User 1.0 Catalog) Query 2 Master!Process Logistic 0.96 Regression !!Slave!Node !!Slave!Node 0 20 40 60 80 100 120 !Spark!Runtime !Spark!Runtime Execution!Engine Execution!Engine Figure 1: Performance of Shark vs. Hive/Hadoop on two SQL Memstore Memstore queries from an early user and one iteration of logistic regres- sion (a classification algorithm that runs ∼10 such steps). Re- Resource!Manager!Daemon Resource!Manager!Daemon sults measure the runtime (seconds) on a 100-node cluster. HDFS!DataNode HDFS!DataNode ning the deterministic operations used to build lost data partitions Figure 2: Shark Architecture on other nodes, similar to MapReduce. Indeed, it typically recovers within seconds, by parallelizing this work across the cluster. To run SQL efficiently, however, we also had to extend the RDD ment Shark to be compatible with Apache Hive. It can be used to execution model, bringing in several concepts from traditional an- query an existing Hive warehouse and return results much faster, alytical databases and some new ones. We started with an exist- without modification to either the data or the queries. ing implementation of RDDs called Spark [33], and added several Thanks to its Hive compatibility, Shark can query data in any features. First, to store and process relational data efficiently, we system that supports the Hadoop storage API, including HDFS and implemented in-memory columnar storage and columnar compres- Amazon S3. It also supports a wide range of data formats such sion. This reduced both the data size and the processing time by as text, binary sequence files, JSON, and XML. It inherits Hive’s as much as 5× over naïvely storing the data in a Spark program schema-on-read capability and nested data types [28]. in its original format. Second, to optimize SQL queries based on In addition, users can choose to load high-value data into Shark’s the data characteristics even in the presence of analytics functions memory store for fast analytics, as shown below: and UDFs, we extended Spark with Partial DAG Execution (PDE): CREATE TABLE latest_logs Shark can reoptimize a running query after running the first few TBLPROPERTIES ("shark.cache"=true) stages of its task DAG, choosing better join strategies or the right AS SELECT * FROM logs WHERE date > now()-3600; degree of parallelism based on observed statistics. Third, we lever- Figure 2 shows the architecture of a Shark cluster, consisting of age other properties of the Spark engine not present in traditional a single master node and a number of slave nodes, with the ware- MapReduce systems, such as control over data partitioning. house metadata stored in an external transactional database. It is Our implementation of Shark is compatible with Apache Hive built on top of Spark, a modern MapReduce-like cluster computing [28], supporting all of Hive’s SQL dialect and UDFs and allowing engine. When a query is submitted to the master, Shark compiles execution over unmodified Hive data warehouses. It augments SQL the query into operator tree represented as RDDs, as we shall dis- with complex analytics functions written in Spark, using Spark’s cuss in Section 2.4. These RDDs are then translated by Spark into Java, Scala or Python APIs. These functions can be combined with a graph of tasks to execute on the slave nodes. SQL in a single execution plan, providing in-memory data sharing Cluster resources can optionally be allocated by a cluster re- and fast recovery across both types of processing. source manager (e.g., Hadoop YARN or Apache Mesos) that pro- Experiments show that using RDDs and the optimizations above, vides resource sharing and isolation between different computing Shark can answer SQL queries up to 100× faster than Hive, runs it- frameworks, allowing Shark to coexist with engines like Hadoop. erative machine learning algorithms up to 100× faster than Hadoop, In the remainder of this section, we cover the basics of Spark and and can recover from failures mid-query within seconds. Figure 1 the RDD programming model, followed by an explanation of how shows three sample results. Shark’s speed is comparable to that of Shark query plans are generated and run. MPP databases in benchmarks like Pavlo et al.’s comparison with 2.1 Spark MapReduce [25], but it offers fine-grained recovery and complex analytics features that these systems lack. Spark is the MapReduce-like cluster computing engine used by More fundamentally, our work shows that MapReduce-like exe- Shark. Spark has several features that differentiate it from tradi- cution models can be applied effectively to SQL, and offer a promis- tional MapReduce engines [33]: ing way to combine relational and complex analytics. In the course 1. Like Dryad and Tenzing [17, 9], it supports general compu- of presenting of Shark, we also explore why SQL engines over pre- tation DAGs, not just the two-stage MapReduce topology. vious MapReduce runtimes, such as Hive, are slow, and show how a combination of enhancements in Shark (e.g., PDE), and engine 2. It provides an in-memory storage abstraction called Resilient properties that have not been optimized in MapReduce, such as the Distributed Datasets (RDDs) that lets applications keep data overhead of launching tasks, eliminate many of the bottlenecks in in memory across queries, and automatically reconstructs it traditional MapReduce systems. after failures [33]. 3. The engine is optimized for low latency. It can efficiently 2 System Overview manage tasks as short as 100 milliseconds on clusters of Shark is a data analysis system that supports both SQL query pro- thousands of cores, while engines like Hadoop incur a la- cessing and machine learning functions. We have chosen to imple- tency of 5–10 seconds to launch each task.

5. visits (URL, 1) need to replicate each byte written to another machine for fault- (HDFS file) pairs counts tolerance. DRAM in a modern server is over 10× faster than even a 10-Gigabit network. Second, Spark can keep just one copy of each RDD partition in memory, saving precious memory over a repli- cated system, since it can always recover lost data using lineage. Third, when a node fails, its lost RDD partitions can be rebuilt in parallel across the other nodes, allowing speedy recovery.3 Fourth, even if a node is just slow (a “straggler”), we can recompute nec- essary partitions on other nodes because RDDs are immutable so there are no consistency concerns with having two copies of a par- map reduceByKey tition. These benefits make RDDs attractive as the foundation for our relational processing in Shark. Figure 3: Lineage graph for the RDDs in our Spark example. Oblongs represent RDDs, while circles show partitions within 2.3 Fault Tolerance Guarantees a dataset. Lineage is tracked at the granularity of partitions. To summarize the benefits of RDDs explained above, Shark pro- vides the following fault tolerance properties, which have been dif- ficult to support in traditional MPP database designs: RDDs are unique to Spark, and were essential to enabling mid- query fault tolerance. However, the other differences are important 1. Shark can tolerate the loss of any set of worker nodes. The engineering elements that contribute to Shark’s performance. execution engine will re-execute any lost tasks and recom- On top of these features, we have also modified the Spark engine pute any lost RDD partitions using lineage.4 This is true for Shark to support partial DAG execution, that is, modification even within a query: Spark will rerun any failed tasks, or of the query plan DAG after only some of the stages have finished, lost dependencies of new tasks, without aborting the query. based on statistics collected from these stages. Similar to [20], we 2. Recovery is parallelized across the cluster. If a failed node use this technique to optimize join algorithms and other aspects of contained 100 RDD partitions, these can be rebuilt in parallel the execution mid-query, as we shall discuss in Section 3.1. on 100 different nodes, quickly recovering the lost data. 2.2 Resilient Distributed Datasets (RDDs) 3. The deterministic nature of RDDs also enables straggler mit- Spark’s main abstraction is resilient distributed datasets (RDDs), igation: if a task is slow, the system can launch a speculative which are immutable, partitioned collections that can be created “backup copy” of it on another node, as in MapReduce [13]. through various data-parallel operators (e.g., map, group-by, hash- 4. Recovery works even in queries that combine SQL and ma- join). Each RDD is either a collection stored in an external storage chine learning UDFs (Section 4), as these operations all com- system, such as a file in HDFS, or a derived dataset created by pile into a single RDD lineage graph. applying operators to other RDDs. For example, given an RDD of (visitID, URL) pairs for visits to a website, we might compute an 2.4 Executing SQL over RDDs RDD of (URL, count) pairs by applying a map operator to turn each Shark runs SQL queries over Spark using a three-step process sim- event into an (URL, 1) pair, and then a reduce to add the counts by ilar to traditional RDBMSs: query parsing, logical plan generation, URL. and physical plan generation. In Spark’s native API, RDD operations are invoked through a Given a query, Shark uses the Hive query compiler to parse the functional interface similar to DryadLINQ [19] in Scala, Java or query and generate an abstract syntax tree. The tree is then turned Python. For example, the Scala code for the query above is: into a logical plan and basic logical optimization, such as predi- val visits = spark.hadoopFile("hdfs://...") cate pushdown, is applied. Up to this point, Shark and Hive share val counts = visits.map(v => (v.url, 1)) an identical approach. Hive would then convert the operator into a .reduceByKey((a, b) => a + b) physical plan consisting of multiple MapReduce stages. In the case of Shark, its optimizer applies additional rule-based optimizations, RDDs can contain arbitrary data types as elements (since Spark such as pushing LIMIT down to individual partitions, and creates runs on the JVM, these elements are Java objects), and are au- a physical plan consisting of transformations on RDDs rather than tomatically partitioned across the cluster, but they are immutable MapReduce jobs. We use a variety of operators already present in once created, and they can only be created through Spark’s deter- Spark, such as map and reduce, as well as new operators we imple- ministic parallel operators. These two restrictions, however, enable mented for Shark, such as broadcast joins. Spark’s master then exe- highly efficient fault recovery. In particular, instead of replicating cutes this graph using standard MapReduce scheduling techniques, each RDD across nodes for fault-tolerance, Spark remembers the such placing tasks close to their input data, rerunning lost tasks, lineage of the RDD (the graph of operators used to build it), and and performing straggler mitigation [33]. recovers lost partitions by recomputing them from base data [33].2 While this basic approach makes it possible to run SQL over For example, Figure 3 shows the lineage graph for the RDDs com- Spark, doing so efficiently is challenging. The prevalence of UDFs puted above. If Spark loses one of the partitions in the (URL, 1) and complex analytic functions in Shark’s workload makes it diffi- RDD, for example, it can recompute it by rerunning the map on cult to determine an optimal query plan at compile time, especially just the corresponding partition of the input file. for new data that has not undergone ETL. In addition, even with The RDD model offers several key benefits our large-scale in- 3 memory computing setting. First, RDDs can be written at the speed To provide fault tolerance across “shuffle” operations like a par- of DRAM instead of the speed of the network, because there is no allel reduce, the execution engine also saves the “map” side of the shuffle in memory on the source nodes, spilling to disk if necessary. 2 4 We assume that external files for RDDs representing external data Support for master recovery could also be added by reliabliy log- do not change, or that we can take a snapshot of a file when we ging the RDD lineage graph and the submitted jobs, because this create an RDD from it. state is small, but we have not yet implemented this.

6. Table 2 Stage 2 such a plan, naïvely executing it over Spark (or other MapReduce runtimes) can be inefficient. In the next section, we discuss sev- eral extensions we made to Spark to efficiently store relational data and run SQL, starting with a mechanism that allows for dynamic, statistics-driven re-optimization at run-time. 3 Engine Extensions In this section, we describe our modifications to the Spark engine to enable efficient execution of SQL queries. 3.1 Partial DAG Execution (PDE) Systems like Shark and Hive are frequently used to query fresh data Join Join Result Result that has not undergone a data loading process. This precludes the Stage 1 Table 1 use of static query optimization techniques that rely on accurate a priori data statistics, such as statistics maintained by indices. The Map join Shuffle join lack of statistics for fresh data, combined with the prevalent use of UDFs, necessitates dynamic approaches to query optimization. Figure 4: Data flows for map join and shuffle join. Map join To support dynamic query optimization in a distributed setting, broadcasts the small table to all large table partitions, while we extended Spark to support partial DAG execution (PDE), a shuffle join repartitions and shuffles both tables. technique that allows dynamic alteration of query plans based on data statistics collected at run-time. We currently apply partial DAG execution at blocking “shuf- the join key. Each reducer joins corresponding partitions using a fle" operator boundaries where data is exchanged and repartitioned, local join algorithm, which is chosen by each reducer based on run- since these are typically the most expensive operations in Shark. By time statistics. If one of a reducer’s input partitions is small, then it default, Spark materializes the output of each map task in memory constructs a hash table over the small partition and probes it using before a shuffle, spilling it to disk as necessary. Later, reduce tasks the large partition. If both partitions are large, then a symmetric fetch this output. hash join is performed by constructing hash tables over both inputs. PDE modifies this mechanism in two ways. First, it gathers cus- In map join, also known as broadcast join, a small input table is tomizable statistics at global and per-partition granularities while broadcast to all nodes, where it is joined with each partition of a materializing map output. Second, it allows the DAG to be altered large table. This approach can result in significant cost savings by based on these statistics, either by choosing different operators or avoiding an expensive repartitioning and shuffling phase. altering their parameters (such as their degrees of parallelism). Map join is only worthwhile if some join inputs are small, so These statistics are customizable using a simple, pluggable ac- Shark uses partial DAG execution to select the join strategy at run- cumulator API. Some example statistics include: time based on its inputs’ exact sizes. By using sizes of the join inputs gathered at run-time, this approach works well even with in- 1. Partition sizes and record counts, which can be used to detect put tables that have no prior statistics, such as intermediate results. skew. Run-time statistics also inform the join tasks’ scheduling poli- 2. Lists of “heavy hitters,” i.e., items that occur frequently in cies. If the optimizer has a prior belief that a particular join input the dataset. will be small, it will schedule that task before other join inputs and decide to perform a map-join if it observes that the task’s output is 3. Approximate histograms, which can be used to estimate par- small. This allows the query engine to avoid performing the pre- titions’ data’s distributions. shuffle partitioning of a large table once the optimizer has decided These statistics are sent by each worker to the master, where they to perform a map-join. are aggregated and presented to the optimizer. For efficiency, we 3.1.2 Skew-handling and Degree of Parallelism use lossy compression to record the statistics, limiting their size to 1–2 KB per task. For instance, we encode partition sizes (in bytes) Partial DAG execution can also be used to determine operators’ with logarithmic encoding, which can represent sizes of up to 32 degrees of parallelism and to mitigate skew. GB using only one byte with at most 10% error. The master can The degree of parallelism for reduce tasks can have a large per- then use these statistics to perform various run-time optimizations, formance impact: launching too few reducers may overload re- as we shall discuss next. ducers’ network connections and exhaust their memories, while Partial DAG execution complements existing adaptive query op- launching too many may prolong the job due to task scheduling timization techniques that typically run in a single-node system [6, overhead. Hive’s performance is especially sensitive to the number 20, 30], as we can use existing techniques to dynamically optimize of reduce tasks, due to Hadoop’s large scheduling overhead. the local plan within each node, and use PDE to optimize the global Using partial DAG execution, Shark can use individual parti- structure of the plan at stage boundaries. This fine-grained statis- tions’ sizes to determine the number of reducers at run-time by co- tics collection, and the optimizations that it enables, differentiates alescing many small, fine-grained partitions into fewer coarse par- PDE from graph rewriting features in previous systems, such as titions that are used by reduce tasks. To mitigate skew, fine-grained DryadLINQ [19]. partitions are assigned to coalesced partitions using a greedy bin- packing heuristic that attempts to equalize coalesced partitions’ 3.1.1 Join Optimization sizes [15]. This offers performance benefits, especially when good Partial DAG execution can be used to perform several run-time op- bin-packings exist. timizations for join queries. Somewhat surprisingly, we discovered that Shark can obtain sim- Figure 4 illustrates two communication patterns for MapReduce- ilar performance improvement by running a larger number of re- style joins. In shuffle join, both join tables are hash-partitioned by duce tasks. We attribute this to Spark’s low scheduling overhead.

7.3.2 Columnar Memory Store Pavlo et al.[25] showed that Hadoop was able to perform data In-memory computation is essential to low-latency query answer- loading at 5 to 10 times the throughput of MPP databases. Tested ing, given that memory’s throughput is orders of magnitude higher using the same dataset used in [25], Shark provides the same through- than that of disks. Naïvely using Spark’s memory store, however, put as Hadoop in loading data into HDFS. Shark is 5 times faster can lead to undesirable performance. Shark implements a columnar than Hadoop when loading data into its memory store. memory store on top of Spark’s memory store. 3.4 Data Co-partitioning In-memory data representation affects both space footprint and read throughput. A naïve approach is to simply cache the on-disk In some warehouse workloads, two tables are frequently joined to- data in its native format, performing on-demand deserialization in gether. For example, the TPC-H benchmark frequently joins the the query processor. This deserialization becomes a major bottle- lineitem and order tables. A technique commonly used by MPP neck: in our studies, we saw that modern commodity CPUs can databases is to co-partition the two tables based on their join key in deserialize at a rate of only 200MB per second per core. the data loading process. In distributed file systems like HDFS, The approach taken by Spark’s default memory store is to store the storage system is schema-agnostic, which prevents data co- data partitions as collections of JVM objects. This avoids deserial- partitioning. Shark allows co-partitioning two tables on a com- ization, since the query processor can directly use these objects, but mon key for faster joins in subsequent queries. This can be ac- leads to significant storage space overheads. Common JVM imple- complished with the DISTRIBUTE BY clause: mentations add 12 to 16 bytes of overhead per object. For example, CREATE TABLE l_mem TBLPROPERTIES ("shark.cache"=true) storing 270 MB of TPC-H lineitem table as JVM objects uses ap- AS SELECT * FROM lineitem DISTRIBUTE BY L_ORDERKEY; proximately 971 MB of memory, while a serialized representation requires only 289 MB, nearly three times less space. A more seri- CREATE TABLE o_mem TBLPROPERTIES ( ous implication, however, is the effect on garbage collection (GC). "shark.cache"=true, "copartition"="l_mem") With a 200 B record size, a 32 GB heap can contain 160 million ob- AS SELECT * FROM order DISTRIBUTE BY O_ORDERKEY; jects. The JVM garbage collection time correlates linearly with the number of objects in the heap, so it could take minutes to perform When joining two co-partitioned tables, Shark’s optimizer con- a full GC on a large heap. These unpredictable, expensive garbage structs a DAG that avoids the expensive shuffle and instead uses collections cause large variability in workers’ response times. map tasks to perform the join. Shark stores all columns of primitive types as JVM primitive arrays. Complex data types supported by Hive, such as map and 3.5 Partition Statistics and Map Pruning array, are serialized and concatenated into a single byte array. Data tend to be stored in some logical clustering on one or more Each column creates only one JVM object, leading to fast GCs and columns. For example, entries in a website’s traffic log data might a compact data representation. The space footprint of columnar be grouped by users’ physical locations, because logs are first stored data can be further reduced by cheap compression techniques at in data centers that have the best geographical proximity to users. virtually no CPU cost. Similar to more traditional database systems Within each data center, logs are append-only and are stored in [27], Shark implements CPU-efficient compression schemes such roughly chronological order. As a less obvious case, a news site’s as dictionary encoding, run-length encoding, and bit packing. logs might contain news_id and timestamp columns that have Columnar data representation also leads to better cache behavior, strongly correlated values. For analytical queries, it is typical to especially for for analytical queries that frequently compute aggre- apply filter predicates or aggregations over such columns. For ex- gations on certain columns. ample, a daily warehouse report might describe how different visi- 3.3 Distributed Data Loading tor segments interact with the website; this type of query naturally applies a predicate on timestamps and performs aggregations that In addition to query execution, Shark also uses Spark’s execution are grouped by geographical location. This pattern is even more engine for distributed data loading. During loading, a table is split frequent for interactive data analysis, during which drill-down op- into small partitions, each of which is loaded by a Spark task. The erations are frequently performed. loading tasks use the data schema to extract individual fields from Map pruning is the process of pruning data partitions based on rows, marshals a partition of data into its columnar representation, their natural clustering columns. Since Shark’s memory store splits and stores those columns in memory. data into small partitions, each block contains only one or few log- Each data loading task tracks metadata to decide whether each ical groups on such columns, and Shark can avoid scanning certain column in a partition should be compressed. For example, the blocks of data if their values fall out of the query’s filter range. loading task will compress a column using dictionary encoding if its number of distinct values is below a threshold. This allows To take advantage of these natural clusterings of columns, Shark’s each task to choose the best compression scheme for each partition, memory store on each worker piggybacks the data loading process rather than conforming to a global compression scheme that might to collect statistics. The information collected for each partition in- not be optimal for local partitions. These local decisions do not clude the range of each column and the distinct values if the num- require coordination among data loading tasks, allowing the load ber of distinct values is small (i.e., enum columns). The collected phase to achieve a maximum degree of parallelism, at the small cost statistics are sent back to the master program and kept in memory of requiring each partition to maintain its own compression meta- for pruning partitions during query execution. data. It is important to clarify that an RDD’s lineage does not need When a query is issued, Shark evaluates the query’s predicates to contain the compression scheme and metadata for each parti- against all partition statistics; partitions that do not satisfy the pred- tion. The compression scheme and metadata are simply byproducts icate are pruned and Shark does not launch tasks to scan them. of the RDD computation, and can be deterministically recomputed We collected a sample of queries from the Hive warehouse of a along with the in-memory data in the case of failures. video analytics company, and out of the 3833 queries we obtained, As a result, Shark can load data into memory at the aggregated at least 3277 of them contain predicates that Shark can use for map throughput of the CPUs processing incoming data. pruning. Section 6 provides more details on this workload.

8.def logRegress(points: RDD[Point]): Vector { The highlighted map, mapRows, and reduce functions are au- var w = Vector(D, _ => 2 * rand.nextDouble - 1) tomatically parallelized by Shark to execute across a cluster, and for (i <- 1 to ITERATIONS) { the master program simply collects the output of the reduce func- val gradient = points.map { p => tion to update w. val denom = 1 + exp(-p.y * (w dot p.x)) (1 / denom - 1) * p.y * p.x Note that this distributed logistic regression implementation in }.reduce(_ + _) Shark looks remarkably similar to a program implemented for a w -= gradient single node in the Scala language. The user can conveniently mix } the best parts of both SQL and MapReduce-style programming. w Currently, Shark provides native support for Scala and Java, with } support for Python in development. We have modified the Scala val users = sql2rdd("SELECT * FROM user u shell to enable interactive execution of both SQL and distributed JOIN comment c ON c.uid=u.uid") machine learning algorithms. Because Shark is built on top of the JVM, it is trivial to support other JVM languages, such as Clojure val features = users.mapRows { row => or JRuby. new Vector(extractFeature1(row.getInt("age")), We have implemented a number of basic machine learning al- extractFeature2(row.getStr("country")), gorithms, including linear regression, logistic regression, and k- ...)} val trainedVector = logRegress(features.cache()) means clustering. In most cases, the user only needs to supply a mapRows function to perform feature extraction and can invoke the provided algorithms. Listing 1: Logistic Regression Example The above example demonstrates how machine learning compu- tations can be performed on query results. Using RDD as the main data structure for query operators also enables the possibility of us- 4 Machine Learning Support ing SQL to query the results of machine learning computations in A key design goal of Shark is to provide a single system capable a single execution plan. of efficient SQL query processing and sophisticated machine learn- 4.2 Execution Engine Integration ing. Following the principle of pushing computation to data, Shark In addition to language integration, another key benefit of using supports machine learning as a first-class citizen. This is enabled RDDs as the data structure for operators is the execution engine in- by the design decision to choose Spark as the execution engine and tegration. This common abstraction allows machine learning com- RDD as the main data structure for operators. In this section, we putations and SQL queries to share workers and cached data with- explain Shark’s language and execution engine integration for SQL out the overhead of data movement. and machine learning. Because SQL query processing is implemented using RDDs, lin- Other research projects [12, 14] have demonstrated that it is pos- eage is kept for the whole pipeline, which enables end-to-end fault sible to express certain machine learning algorithms in SQL and tolerance for the entire workflow. If failures occur during machine avoid moving data out of the database. The implementation of learning stage, partitions on faulty nodes will automatically be re- those projects, however, involves a combination of SQL, UDFs, computed based on their lineage. and driver programs written in other languages. The systems be- come obscure and difficult to maintain; in addition, they may sacri- 5 Implementation fice performance by performing expensive parallel numerical com- putations on traditional database engines that were not designed for While implementing Shark, we discovered that a number of engi- such workloads. Contrast this with the approach taken by Shark, neering details had significant performance impacts. Overall, to which offers in-database analytics that push computation to data, improve the query processing speed, one should minimize the tail but does so using a runtime that is optimized for such workloads latency of tasks and the CPU cost of processing each row. and a programming model that is designed to express machine learn- Memory-based Shuffle: Both Spark and Hadoop write map out- ing algorithms. put files to disk, hoping that they will remain in the OS buffer cache 4.1 Language Integration when reduce tasks fetch them. In practice, we have found that the extra system calls and file system journaling adds significant over- In addition to executing a SQL query and returning its results, Shark head. In addition, the inability to control when buffer caches are also allows queries to return the RDD representing the query plan. flushed leads to variability in the execution time of shuffle tasks. A Callers to Shark can then invoke distributed computation over the query’s response time is determined by the last task to finish, and query result using the returned RDD. thus the increasing variability leads to long-tail latency, which sig- As an example of this integration, Listing 1 illustrates a data nificantly hurts shuffle performance. We modified the shuffle phase analysis pipeline that performs logistic regression over a user database. to materialize map outputs in memory, with the option to spill them Logistic regression, a common classification algorithm, searches to disk. for a hyperplane w that best separates two sets of points (e.g. spam- mers and non-spammers). The algorithm applies gradient descent Temporary Object Creation: It is easy to write a program that optimization by starting with a randomized w vector and iteratively creates many temporary objects, which can burden the JVM’s garbage updating it by moving along gradients towards an optimum value. collector. For a parallel job, a slow GC at one task may slow the The program begins by using sql2rdd to issue a SQL query to entire job. Shark operators and RDD transformations are written in retreive user information as a TableRDD. It then performs feature a way that minimizes temporary object creations. extraction on the query rows and runs logistic regression over the Bytecode Compilation of Expression Evaluators: In its current extracted feature matrix. Each iteration of logRegress applies a implementation, Shark sends the expression evaluators generated function of w to all data points to produce a set of gradients, which by the Hive parser as part of the tasks to be executed on each row. are summed to produce a net gradient that is used to update w. By profiling Shark, we discovered that for certain queries, when

9.data is served out of the memory store the majority of the CPU cy- cles are wasted in interpreting these evaluators. We are working on Shark Shark (disk) Hive a compiler to transform these expression evaluators into JVM byte- 2500 100 600 code, which can further increase the execution engine’s throughput. 500 Specialized Data Structures: Using specialized data structures 2000 80 is another low-hanging optimization that we have yet to exploit. 400 Time (seconds) For example, Java’s hash table is built for generic objects. When 1500 60 the hash key is a primitive type, the use of specialized data struc- 300 tures can lead to more compact data representations, and thus better 1000 40 200 cache behavior. 500 6 Experiments 20 100 147 32 1.1 We evaluated Shark using four datasets: 0 0 0 Aggregation Aggregation Selection 2.5M Groups 1K Groups 1. Pavlo et al. Benchmark: 2.1 TB of data reproducing Pavlo et al.’s comparison of MapReduce vs. analytical DBMSs [25]. Figure 5: Selection and aggregation query runtimes (seconds) 2. TPC-H Dataset: 100 GB and 1 TB datasets generated by the from Pavlo et al. benchmark DBGEN program [29]. 3. Real Hive Warehouse: 1.7 TB of sampled Hive warehouse The benchmark used two tables: a 1 GB/node rankings table, data from an early industrial user of Shark. and a 20 GB/node uservisits table. For our 100-node cluster, we recreated a 100 GB rankings table containing 1.8 billion rows and 4. Machine Learning Dataset: 100 GB synthetic dataset to mea- a 2 TB uservisits table containing 15.5 billion rows. We ran the sure the performance of machine learning algorithms. four queries in their experiments comparing Shark with Hive and Overall, our results show that Shark can perform up to 100× report the results in Figures 5 and 6. In this subsection, we hand- faster than Hive, even though we have yet to implement some of the tuned Hive’s number of reduce tasks to produce optimal results for performance optimizations mentioned in the previous section. In Hive. Despite this tuning, Shark outperformed Hive in all cases by particular, Shark provides comparable performance gains to those a wide margin. reported for MPP databases in Pavlo et al.’s comparison [25]. In 6.2.1 Selection Query some cases where data fits in memory, Shark exceeds the perfor- mance reported for MPP databases. The first query was a simple selection on the rankings table: We emphasize that we are not claiming that Shark is funda- SELECT pageURL, pageRank mentally faster than MPP databases; there is no reason why MPP FROM rankings WHERE pageRank > X; engines could not implement the same processing optimizations In [25], Vertica outperformed Hadoop by a factor of 10 because as Shark. Indeed, our implementation has several disadvantages a clustered index was created for Vertica. Even without a clustered relative to commercial engines, such as running on the JVM. In- index, Shark was able to execute this query 80× faster than Hive stead, we aim to show that it is possible to achieve comparable per- for in-memory data, and 5× on data read from HDFS. formance while retaining a MapReduce-like engine, and the fine- grained fault recovery features that such engines provide. In addi- 6.2.2 Aggregation Queries tion, Shark can leverage this engine to perform high-speed machine The Pavlo et al. benchmark ran two aggregation queries: learning functions on the same data, which we believe will be es- sential in future analytics workloads. SELECT sourceIP, SUM(adRevenue) FROM uservisits GROUP BY sourceIP; 6.1 Methodology and Cluster Setup SELECT SUBSTR(sourceIP, 1, 7), SUM(adRevenue) Unless otherwise specified, experiments were conducted on Ama- FROM uservisits GROUP BY SUBSTR(sourceIP, 1, 7); zon EC2 using 100 m2.4xlarge nodes. Each node had 8 virtual cores, 68 GB of memory, and 1.6 TB of local storage. In our dataset, the first query had two million groups and the sec- The cluster was running 64-bit Linux 3.2.28, Apache Hadoop ond had approximately one thousand groups. Shark and Hive both 0.20.205, and Apache Hive 0.9. For Hadoop MapReduce, the num- applied task-local aggregations and shuffled the data to parallelize ber of map tasks and the number of reduce tasks per node were set the final merge aggregation. Again, Shark outperformed Hive by a to 8, matching the number of cores. For Hive, we enabled JVM wide margin. The benchmarked MPP databases perform local ag- reuse between tasks and avoided merging small output files, which gregations on each node, and then send all aggregates to a single would take an extra step after each query to perform the merge. query coordinator for the final merging; this performed very well We executed each query six times, discarded the first run, and when the number of groups was small, but performed worse with report the average of the remaining five runs. We discard the first large number of groups. The MPP databases’ chosen plan is similar run in order to allow the JVM’s just-in-time compiler to optimize to choosing a single reduce task for Shark and Hive. common code paths. We believe that this more closely mirrors real- 6.2.3 Join Query world deployments where the JVM will be reused by many queries. The final query from Pavlo et al. involved joining the 2 TB uservis- 6.2 Pavlo et al. Benchmarks its table with the 100 GB rankings table. Pavlo et al. compared Hadoop versus MPP databases and showed SELECT INTO Temp sourceIP, AVG(pageRank), that Hadoop excelled at data ingress, but performed unfavorably in SUM(adRevenue) as totalRevenue query execution [25]. We reused the dataset and queries from their FROM rankings AS R, uservisits AS UV benchmarks to compare Shark against Hive. WHERE R.pageURL = UV.destURL

10. Hive Static Shark (disk) Adaptive Shark Copartitioned Static + Adaptive 0 500 1000 1500 2000 0 20 40 60 80 100 120 Figure 6: Join query runtime (seconds) from Pavlo benchmark Figure 8: Join strategies chosen by optimizers (seconds) AND UV.visitDate BETWEEN Date(’2000-01-15’) For both Shark and Hive, aggregations were first performed on AND Date(’2000-01-22’) each partition, and then the intermediate aggregated results were GROUP BY UV.sourceIP; partitioned and sent to reduce tasks to produce the final aggrega- tion. As the number of groups becomes larger, more data needs to Again, Shark outperformed Hive in all cases. Figure 6 shows be shuffled across the network. that for this query, serving data out of memory did not provide Figure 7 compares the performance of Shark and Hive, measur- much benefit over disk. This is because the cost of the join step ing Shark’s performance on both in-memory data and data loaded dominated the query processing. Co-partitioning the two tables, from HDFS. As can be seen in the figure, Shark was 80× faster however, provided significant benefits as it avoided shuffling data than hand-tuned Hive for queries with small numbers of groups, 2.1 TB of data during the join step. and 20× faster for queries with large numbers of groups, where the 6.2.4 Data Loading shuffle phase domniated the total execution cost. Hadoop was shown by [25] to excel at data loading, as its data We were somewhat surprised by the performance gain observed loading throughput was five to ten times higher than that of MPP for on-disk data in Shark. After all, both Shark and Hive had to databases. As explained in Section 2, Shark can be used to query read data from HDFS and deserialize it for query processing. This data in HDFS directly, which means its data ingress rate is at least difference, however, can be explained by Shark’s very low task as fast as Hadoop’s. launching overhead, optimized shuffle operator, and other factors; After generating the 2 TB uservisits table, we measured the time see Section 7 for more details. to load it into HDFS and compared that with the time to load it into 6.3.2 Join Selection at Run-time Shark’s memory store. We found the rate of data ingress was 5× In this experiment, we tested how partial DAG execution can im- higher in Shark’s memory store than that of HDFS. prove query performance through run-time re-optimization of query 6.3 Micro-Benchmarks plans. The query joined the lineitem and supplier tables from the 1 TB TPC-H dataset, using a UDF to select suppliers of interest based To understand the factors affecting Shark’s performance, we con- on their addresses. In this specific instance, the UDF selected 1000 ducted a sequence of micro-benchmarks. We generated 100 GB out of 10 million suppliers. Figure 8 summarizes these results. and 1 TB of data using the DBGEN program provided by TPC- H [29]. We chose this dataset because it contains tables and columns SELECT * from lineitem l join supplier s of varying cardinality and can be used to create a myriad of micro- ON l.L_SUPPKEY = s.S_SUPPKEY benchmarks for testing individual operators. WHERE SOME_UDF(s.S_ADDRESS) While performing experiments, we found that Hive and Hadoop Lacking good selectivity estimation on the UDF, a static opti- MapReduce were very sensitive to the number of reducers set for mizer would choose to perform a shuffle join on these two tables a job. Hive’s optimizer automatically sets the number of reducers because the initial sizes of both tables are large. Leveraging partial based on the estimated data size. However, we found that Hive’s DAG execution, after running the pre-shuffle map stages for both optimizer frequently made the wrong decision, leading to incredi- tables, Shark’s dynamic optimizer realized that the filtered supplier bly long query execution times. We hand-tuned the number of re- table was small. It decided to perform a map-join, replicating the ducers for Hive based on characteristics of the queries and through filtered supplier table to all nodes and performing the join using trial and error. We report Hive performance numbers for both optimizer- only map tasks on lineitem. determined and hand-tuned numbers of reducers. Shark, on the To further improve the execution, the optimizer can analyze the other hand, was much less sensitive to the number of reducers and logical plan and infer that the probability of supplier table being required minimal tuning. small is much higher than that of lineitem (since supplier is smaller 6.3.1 Aggregation Performance initially, and there is a filter predicate on supplier). The optimizer chose to pre-shuffle only the supplier table, and avoided launching We tested the performance of aggregations by running group-by two waves of tasks on lineitem. This combination of static query queries on the TPH-H lineitem table. For the 100 GB dataset, analysis and partial DAG execution led to a 3× performance im- lineitem table contained 600 million rows. For the 1 TB dataset, provement over a naïve, statically chosen plan. it contained 6 billion rows. The queries were of the form: 6.3.3 Fault Tolerance To measure Shark’s performance in the presence of node failures, SELECT [GROUP_BY_COLUMN], COUNT(*) FROM lineitem GROUP BY [GROUP_BY_COLUMN] we simulated failures and measured query performance before, dur- ing, and after failure recovery. Figure 9 summarizes fives runs of We chose to run one query with no group-by column (i.e., a sim- our failure recovery experiment, which was performed on a 50- ple count), and three queries with group-by aggregations: SHIP- node m2.4xlarge EC2 cluster. MODE (7 groups), RECEIPTDATE (2500 groups), and SHIPMODE We used a group-by query on the 100 GB lineitem table to mea- (150 million groups in 100 GB, and 537 million groups in 1 TB). sure query performance in the presence of faults. After loading the

11. 800 100 120 140 Shark Shark 5116 5589 5686 Shark (disk) Shark (disk) Hive (tuned) Hive (tuned) 600 Hive Hive Time (seconds) Time (seconds) 80 400 60 40 200 20 27.4 21.3 5.6 13.2 13.9 1.05 3.5 0.97 0 0 1 7 2.5K 150M 1 7 2.5K 537M TPC−H 100GB TPC−H 1TB Figure 7: Aggregation queries on lineitem table. X-axis indicates the number of groups for each aggregation query. 100 Post−recovery Shark Single failure Shark (disk) 80 No failures Hive Full reload Time (seconds) 0 10 20 30 40 60 Figure 9: Query time with failures (seconds) 40 lineitem data into Shark’s memory store, we killed a worker ma- chine and re-ran the query. Shark gracefully recovered from this 20 failure and parallelized the reconstruction of lost partitions on the 1.1 0.8 0.7 1.0 other 49 nodes. This recovery had a small performance impact (∼ 3 seconds), but it was significantly cheaper than the cost of re-loading 0 the entire dataset and re-executing the query. Q1 Q2 Q3 Q4 After this recovery, subsequent queries operated against the re- covered dataset, albeit with fewer machines. In Figure 9, the post- Figure 10: Real Hive warehouse workloads recovery performance was marginally better than the pre-failure performance; we believe that this was a side-effect of the JVM’s JIT compiler, as more of the scheduler’s code might have become cates on eight columns. compiled by the time the post-recovery queries were run. 3. Query 3 counts the number of sessions and distinct users for 6.4 Real Hive Warehouse Queries all but 2 countries. An early industrial user provided us with a sample of their Hive 4. Query 4 computes summary statistics in 7 dimensions group- warehouse data and two years of query traces from their Hive sys- ing by a column, and showing the top groups sorted in de- tem. A leading video analytics company for content providers and scending order. publishers, the user built most of their analytics stack based on Hadoop. The sample we obtained contained 30 days of video ses- Figure 10 compares the performance of Shark and Hive on these sion data, occupying 1.7 TB of disk space when decompressed. It queries. The result is very promising as Shark was able to process consists of a single fact table containing 103 columns, with heavy these real life queries in sub-second latency in all but one cases, use of complex data types such as array and struct. The whereas it took Hive 50 to 100 times longer to execute them. sampled query log contains 3833 analytical queries, sorted in or- A closer look into these queries suggests that this data exhibits der of frequency. We filtered out queries that invoked proprietary the natural clustering properties mentioned in Section 3.5. The map UDFs and picked four frequent queries that are prototypical of pruning technique, on average, reduced the amount of data scanned other queries in the complete trace. These queries compute ag- by a factor of 30. gregate video quality metrics over different audience segments: 6.5 Machine Learning 1. Query 1 computes summary statistics in 12 dimensions for A key motivator of using SQL in a MapReduce environment is the users of a specific customer on a specific day. ability to perform sophisticated machine learning on big data. We 2. Query 2 counts the number of sessions and distinct customer/- implemented two iterative machine learning algorithms, logistic re- client combination grouped by countries with filter predi- gression and k-means, to compare the performance of Shark versus

12. ance, inferior data layout (e.g., lack of indices), and costlier exe- cution strategies [25, 26]. Our exploration of Hive confirms these Shark 0.96 reasons, but also shows that a combination of conceptually simple Hadoop (binary) “engineering” changes to the engine (e.g., in-memory storage) and Hadoop (text) more involved architectural changes (e.g., partial DAG execution) can alleviate them. We also find that a somewhat surprising variable 0 20 40 60 80 100 120 not considered in detail in MapReduce systems, the task schedul- ing overhead, actually has a dramatic effect on performance, and Figure 11: Logistic regression, per-iteration runtime (seconds) greatly improves load balancing if minimized. Intermediate Outputs: MapReduce-based query engines, such as Shark 4.1 Hive, materialize intermediate data to disk in two situations. First, Hadoop (binary) within a MapReduce job, the map tasks save their output in case a Hadoop (text) reduce task fails [13]. Second, many queries need to be compiled into multiple MapReduce steps, and engines rely on replicated file 0 50 100 150 200 systems, such as HDFS, to store the output of each step. For the first case, we note that map outputs were stored on disk Figure 12: K-means clustering, per-iteration runtime (seconds) primarily as a convenience to ensure there is sufficient space to hold them in large batch jobs. Map outputs are not replicated across running the same workflow in Hive and Hadoop. nodes, so they will still be lost if the mapper node fails [13]. Thus, The dataset was synthetically generated and contained 1 billion if the outputs fit in memory, it makes sense to store them in mem- rows and 10 columns, occupying 100 GB of space. Thus, the fea- ory initially, and only spill them to disk if they are large. Shark’s ture matrix contained 1 billion points, each with 10 dimensions. shuffle implementation does this by default, and sees far faster shuf- These machine learning experiments were performed on a 100- fle performance (and no seeks) when the outputs fit in RAM. This node m1.xlarge EC2 cluster. is often the case in aggregations and filtering queries that return a Data was initially stored in relational form in Shark’s memory much smaller output than their input.5 Another hardware trend that store and HDFS. The workflow consisted of three steps: (1) select- may improve performance, even for large shuffles, is SSDs, which ing the data of interest from the warehouse using SQL, (2) extract- would allow fast random access to a larger space than memory. ing features, and (3) applying iterative algorithms. In step 3, both For the second case, engines that extend the MapReduce execu- algorithms were run for 10 iterations. tion model to general task DAGs can run multi-stage jobs without Figures 11 and 12 show the time to execute a single iteration materializing any outputs to HDFS. Many such engines have been of logistic regression and k-means, respectively. We implemented proposed, including Dryad, Tenzing and Spark [17, 9, 33]. two versions of the algorithms for Hadoop, one storing input data as text in HDFS and the other using a serialized binary format. The Data Format and Layout: While the naïve pure schema-on-read binary representation was more compact and had lower CPU cost approach to MapReduce incurs considerable processing costs, many in record deserialization, leading to improved performance. Our re- systems use more efficient storage formats within the MapReduce sults show that Shark is 100× faster than Hive and Hadoop for lo- model to speed up queries. Hive itself supports “table partitions” (a gistic regression and 30× faster for k-means. K-means experienced basic index-like system where it knows that certain key ranges are less speedup because it was computationally more expensive than contained in certain files, so it can avoid scanning a whole table), as logistic regression, thus making the workflow more CPU-bound. well as column-oriented representation of on-disk data [28]. We go In the case of Shark, if data initially resided in its memory store, further in Shark by using fast in-memory columnar representations step 1 and 2 were executed in roughly the same time it took to run within Spark. Shark does this without modifying the Spark runtime one iteration of the machine learning algorithm. If data was not by simply representing a block of tuples as a single Spark record loaded into the memory store, the first iteration took 40 seconds for (one Java object from Spark’s perspective), and choosing its own both algorithms. Subsequent iterations, however, reported numbers representation for the tuples within this object. consistent with Figures 11 and 12. In the case of Hive and Hadoop, Another feature of Spark that helps Shark, but was not present in every iteration took the reported time because data was loaded from previous MapReduce runtimes, is control over the data partitioning HDFS for every iteration. across nodes (Section 3.4). This lets us co-partition tables. Finally, one capability of RDDs that we do not yet exploit is ran- 7 Discussion dom reads. While RDDs only support coarse-grained operations Shark shows that it is possible to run fast relational queries in a for their writes, read operations on them can be fine-grained, ac- fault-tolerant manner using the fine-grained deterministic task model cessing just one record [33]. This would allow RDDs to be used as introduced by MapReduce. This design offers an effective way to indices. Tenzing can use such remote-lookup reads for joins [9]. scale query processing to ever-larger workloads, and to combine Execution Strategies: Hive spends considerable time on sorting it with rich analytics. In this section, we consider two questions: the data before each shuffle and writing the outputs of each MapRe- first, why were previous MapReduce-based systems, such as Hive, duce stage to HDFS, both limitations of the rigid, one-pass MapRe- slow, and what gave Shark its advantages? Second, are there other duce model in Hadoop. More general runtime engines, such as benefits to the fine-grained task model? We argue that fine-grained Spark, alleviate some of these problems. For instance, Spark sup- tasks also help with multitenancy and elasticity, as has been demon- ports hash-based distributed aggregation and general task DAGs. strated in MapReduce systems. 5 7.1 Why are Previous MapReduce-Based Systems Slow? Systems like Hadoop also benefit from the OS buffer cache in serving map outputs, but we found that the extra system calls and Conventional wisdom is that MapReduce is slower than MPP databases file system journalling from writing map outputs to files still adds for several reasons: expensive data materialization for fault toler- overhead (Section 5).

13. To truly optimize the execution of relational queries, however, More fundamentally, there are few reasons why sub-second tasks we found it necessary to select execution plans based on data statis- should not be feasible even at higher scales than we have explored, tics. This becomes difficult in the presence of UDFs and complex such as tens of thousands of nodes. Systems like Dremel [24] rou- analytics functions, which we seek to support as first-class citizens tinely run sub-second, multi-thousand-node jobs. Indeed, even if in Shark. To address this problem, we proposed partial DAG execu- a single master cannot keep up with the scheduling decisions, the tion (PDE), which allows our modified version of Spark to change scheduling could be delegated across “lieutenant” masters for sub- the downstream portion of an execution graph once each stage com- sets of the cluster. Fine-grained tasks also offer many advantages pletes based on data statistics. PDE goes beyond the runtime graph over coarser-grained execution graphs beyond load balancing, such rewriting features in previous systems, such as DryadLINQ [19], as faster recovery (by spreading out lost tasks across more nodes) by collecting fine-grained statistics about ranges of keys and by and query elasticity; we discuss some of these next. allowing switches to a completely different join strategy, such as 7.2 Other Benefits of the Fine-Grained Task Model broadcast join, instead of just selecting the number of reduce tasks. While this paper has focused primarily on the fault tolerance ben- Task Scheduling Cost: Perhaps the most surprising engine prop- efits of fine-grained deterministic tasks, the model also provides erty that affected Shark, however, was a purely “engineering” con- other attractive properties. We wish to point out two benefits that cern: the overhead of launching tasks. Traditional MapReduce sys- have been explored in MapReduce-based systems. tems, such as Hadoop, were designed for multi-hour batch jobs consisting of tasks that were several minutes long. They launched Elasticity: In traditional MPP databases, a distributed query plan each task in a separate OS process, and in some cases had a high is selected once, and the system needs to run at that level of par- latency to even submit a task. For instance, Hadoop uses periodic allelism for the whole duration of the query. In a fine-grained task “heartbeats” from each worker every 3 seconds to assign tasks, and system, however, nodes can appear or go away during a query, and sees overall task startup delays of 5–10 seconds. This was sufficient pending work will automatically be spread onto them. This en- for batch workloads, but clearly falls short for ad-hoc queries. ables the database engine to naturally be elastic. If an administrator Spark avoids this problem by using a fast event-driven RPC li- wishes to remove nodes from the engine (e.g., in a virtualized cor- brary to launch tasks and by reusing its worker processes. It can porate data center), the engine can simply treat those as failed, or launch thousands of tasks per second with only about 5 ms of over- (better yet) proactively replicate their data to other nodes if given head per task, making task lengths of 50-100 ms and MapReduce a few minutes’ warning. Similarly, a database engine running on a jobs of 500 ms viable. What surprised us is how much this affected cloud could scale up by requesting new VMs if a query is expen- query performance, even in large (multi-minute) queries. sive. Amazon’s Elastic MapReduce [2] already supports resizing Sub-second tasks allow the engine to balance work across nodes clusters at runtime. extremely well, even when some nodes incur unpredictable delays Multitenancy: The same elasticity, mentioned above, enables dy- (e.g., network delays or JVM garbage collection). They also help namic resource sharing between users. In a traditional MPP database, dramatically with skew. Consider, for example, a system that needs if an important query arrives while another large query using most to run a hash aggregation on 100 cores. If the system launches 100 of the cluster, there are few options beyond canceling the earlier reduce tasks, the key range for each task needs to be carefully cho- query. In systems based on fine-grained tasks, one can simply wait sen, as any imbalance will slow down the entire job. If it could split a few seconds for the current tasks from the first query to finish, the work among 1000 tasks, then the slowest task can be as much and start giving the nodes tasks from the second query. For in- as 10× slower than the average without affecting the job response stance, Facebook and Microsoft have developed fair schedulers for time much! After implementing skew-aware partition selection in Hadoop and Dryad that allow large historical queries, compute- PDE, we were somewhat disappointed that it did not help compared intensive machine learning jobs, and short ad-hoc queries to safely to just having a higher number of reduce tasks in most workloads, coexist [32, 18]. because Spark could comfortably support thousands of such tasks. However, this property makes the engine highly robust to unex- 8 Related Work pected skew. To the best of our knowledge, Shark is the only low-latency system In this way, Spark stands in contrast to Hadoop/Hive, where us- that can efficiently combine SQL and machine learning workloads, ing the wrong number of tasks was sometimes 10× slower than while supporting fine-grained fault recovery. an optimal plan, and there has been considerable work to auto- We categorize large-scale data analytics systems into three classes. matically choose the number of reduce tasks [21, 15]. Figure 13 First, systems like Hive [28], Tenzing [9], SCOPE [8], and Chee- shows how job execution times varies as the number of reduce tasks tah [10] compile declarative queries into MapReduce-style jobs. launched by Hadoop and Spark. Since a Spark job can launch thou- Even though some of them introduce modifications to the execu- sands of reduce tasks without incurring much overhead, partition tion engine they are built on, it is hard for these systems to achieve data skew can be mitigated by always launching many tasks. interactive query response times for reasons discussed in Section 7. Second, several projects aim to provide low-latency engines us- ing architectures resembling shared-nothing parallel databases. Such 200 projects include PowerDrill [16] and Impala [1]. These systems 6000 Time (seconds) Time (seconds) do not support fine-grained fault tolerance. In case of mid-query 150 4000 faults, the entire query needs to be re-executed. Google’s Dremel [24] 100 does rerun lost tasks, but it only supports an aggregation tree topol- 2000 ogy for query execution, and not the more complex shuffle DAGs 50 required for large joins or distributed machine learning. 0 0 1000 2000 3000 4000 5000 0 1000 2000 3000 4000 5000 A third class of systems take a hybrid approach by combining a Number of Hadoop Tasks Number of Spark Tasks MapReduce-like engine with relational databases. HadoopDB [3] connects multiple single-node database systems using Hadoop as Figure 13: Task launching overhead

14.the communication layer. Queries can be parallelized using Hadoop 11 References MapReduce, but within each MapReduce task, data processing is pushed into the relational database system. Osprey [31] is a middle- [1] https://github.com/cloudera/impala. ware layer that adds fault-tolerance properties to parallel databases. [2] http://aws.amazon.com/about-aws/whats-new/2010/10/20/amazon- It does so by breaking a SQL query into multiple small queries and elastic-mapreduce-introduces-resizing-running-job-flows/. [3] A. Abouzeid et al. Hadoopdb: an architectural hybrid of mapreduce sending them to parallel databases for execution. Shark presents and dbms technologies for analytical workloads. VLDB, 2009. a much simpler single-system architecture that supports all of the [4] S. Agarwal et al. Re-optimizing data-parallel computing. In NSDI’12. properties of this third class of systems, as well as statistical learn- [5] G. Ananthanarayanan et al. Pacman: Coordinated memory caching ing capabilities that HadoopDB and Osprey lack. for parallel jobs. In NSDI, 2012. The partial DAG execution (PDE) technique introduced by Shark [6] R. Avnur and J. M. Hellerstein. Eddies: continuously adaptive query resembles adaptive query optimization techniques proposed in [6, processing. In SIGMOD, 2000. 30, 20]. It is, however, unclear how these single-node techniques [7] Y. Bu et al. HaLoop: efficient iterative data processing on large would work in a distributed setting and scale out to hundreds of clusters. Proc. VLDB Endow., 2010. nodes. In fact, PDE actually complements some of these tech- [8] R. Chaiken et al. Scope: easy and efficient parallel processing of niques, as Shark can use PDE to optimize how data gets shuf- massive data sets. VLDB, 2008. [9] B. Chattopadhyay, , et al. Tenzing a sql implementation on the fled across nodes, and use the traditional single-node techniques mapreduce framework. PVLDB, 4(12):1318–1327, 2011. within a local task. DryadLINQ [19] optimizes its number of re- [10] S. Chen. Cheetah: a high performance, custom data warehouse on duce tasks at run-time based on map output sizes, but does not top of mapreduce. VLDB, 2010. collect richer statistics, such as histograms, or make broader ex- [11] C. Chu et al. Map-reduce for machine learning on multicore. ecution plan changes, such as changing join algorithms, like PDE Advances in neural information processing systems, 19:281, 2007. can. RoPE [4] proposes using historical query information to opti- [12] J. Cohen, B. Dolan, M. Dunlap, J. Hellerstein, and C. Welton. Mad mize query plans, but relies on repeatedly executed queries. PDE skills: new analysis practices for big data. VLDB, 2009. works on queries that are executing for the first time. [13] J. Dean and S. Ghemawat. MapReduce: Simplified data processing Finally, Shark builds on the distributed approaches for machine on large clusters. In OSDI, 2004. learning developed in systems like Graphlab [22], Haloop [7], and [14] X. Feng et al. Towards a unified architecture for in-rdbms analytics. In SIGMOD, 2012. Spark [33]. However, Shark is unique in offering these capabili- [15] B. Guffler et al. Handling data skew in mapreduce. In CLOSER, ties in a SQL engine, allowing users to select data of interest using 2011. SQL and immediately run learning algorithms on it without time- [16] A. Hall et al. Processing a trillion cells per mouse click. VLDB. consuming export to another system. Compared to Spark, Shark [17] M. Isard et al. Dryad: distributed data-parallel programs from also provides far more efficient in-memory representation of rela- sequential building blocks. SIGOPS, 2007. tional data, and mid-query optimization using PDE. [18] M. Isard et al. Quincy: Fair scheduling for distributed computing clusters. In SOSP ’09, 2009. 9 Conclusion [19] M. Isard and Y. Yu. Distributed data-parallel computing using a We have presented Shark, a new data warehouse system that com- high-level programming language. In SIGMOD, 2009. [20] N. Kabra and D. J. DeWitt. Efficient mid-query re-optimization of bines fast relational queries and complex analytics in a single, fault- sub-optimal query execution plans. In SIGMOD, 1998. tolerant runtime. Shark generalizes a MapReduce-like runtime to [21] Y. Kwon et al. Skewtune: mitigating skew in mapreduce applications. run SQL effectively, using both traditional database techniques, In SIGMOD ’12, 2012. such as column-oriented storage, and a novel partial DAG exe- [22] Y. Low et al. Distributed graphlab: a framework for machine learning cution (PDE) technique that lets it reoptimize queries at run-time and data mining in the cloud. VLDB, 2012. based on fine-grained data statistics. This designs enables Shark [23] G. Malewicz et al. Pregel: a system for large-scale graph processing. to generally match the speedups reported for MPP databases over In SIGMOD, 2010. MapReduce, while simultaneously providing machine learning func- [24] S. Melnik et al. Dremel: interactive analysis of web-scale datasets. tions in the same engine and fine-grained, mid-query fault tolerance Proc. VLDB Endow., 3:330–339, Sept 2010. across both SQL and machine learning. Overall, the system is up [25] A. Pavlo et al. A comparison of approaches to large-scale data analysis. In SIGMOD, 2009. to 100× faster than Hive for SQL, and 100× faster than Hadoop [26] M. Stonebraker et al. Mapreduce and parallel dbmss: friends or foes? for machine learning. Commun. ACM. We have open sourced Shark at shark.cs.berkeley.edu, [27] M. Stonebraker et al. C-store: a column-oriented dbms. In VLDB, and have also worked with two Internet companies as early users. 2005. They report speedups of 40–100× on real queries, consistent with [28] A. Thusoo et al. Hive-a petabyte scale data warehouse using hadoop. our results. In ICDE, 2010. [29] Transaction Processing Performance Council. TPC BENCHMARK H. 10 Acknowledgments [30] T. Urhan, M. J. Franklin, and L. Amsaleg. Cost-based query We thank Cliff Engle, Harvey Feng, Shivaram Venkataraman, Ram scrambling for initial delays. In SIGMOD, 1998. [31] C. Yang et al. Osprey: Implementing mapreduce-style fault tolerance Sriharsha, Denny Britz, Antonio Lupher, Patrick Wendell, and Paul in a shared-nothing distributed database. In ICDE, 2010. Ruan for their work on Shark. This research is supported in part by [32] M. Zaharia et al. Delay scheduling: A simple technique for achieving NSF CISE Expeditions award CCF-1139158, gifts from Amazon locality and fairness in cluster scheduling. In EuroSys 10, 2010. Web Services, Google, SAP, Blue Goji, Cisco, Cloudera, Ericsson, [33] M. Zaharia et al. Resilient distributed datasets: a fault-tolerant General Electric, Hewlett Packard, Huawei, Intel, Microsoft, Ne- abstraction for in-memory cluster computing. NSDI, 2012. tApp, Oracle, Quanta, Splunk, VMware and by DARPA (contract #FA8650-11-C-7136).