Shark是一个结合查询处理的新数据分析系统 对大型集群进行复杂的分析。它利用了一种新的分布 内存抽象提供一个可以运行的统一引擎 SQL查询和复杂的分析功能(例如,迭代) 机器学习)在规模上,并有效地从失败中恢复 mid-query。这使得鲨鱼运行SQL查询100×更快 比Apache蜂巢,和机器学习项目超过100× 快于Hadoop。

注脚

展开查看详情

1. Shark: SQL and Rich Analytics at Scale Reynold S. Xin, Josh Rosen, Matei Zaharia, Michael J. Franklin, Scott Shenker, Ion Stoica AMPLab, EECS, UC Berkeley {rxin, joshrosen, matei, franklin, shenker, istoica}@cs.berkeley.edu ABSTRACT and various generalizations [22, 13], offers a fine-grained fault tol- Shark is a new data analysis system that marries query processing erance model suitable for large clusters, where tasks on failed or slow nodes can be deterministically re-executed on other nodes. with complex analytics on large clusters. It leverages a novel dis- MapReduce is also fairly general: it has been shown to be able tributed memory abstraction to provide a unified engine that can run to express many statistical and learning algorithms [15]. It also SQL queries and sophisticated analytics functions (e.g., iterative easily supports unstructured data and “schema-on-read.” However, machine learning) at scale, and efficiently recovers from failures mid-query. This allows Shark to run SQL queries up to 100× faster MapReduce engines lack many of the features that make databases than Apache Hive, and machine learning programs more than 100× efficient, and thus exhibit high latencies of tens of seconds to hours. Even systems that have significantly optimized MapReduce for SQL faster than Hadoop. Unlike previous systems, Shark shows that it is queries, such as Google’s Tenzing [13], or that combine it with a possible to achieve these speedups while retaining a MapReduce- traditional database on each node, such as HadoopDB [4], report a like execution engine, and the fine-grained fault tolerance proper- minimum latency of 10 seconds. As such, MapReduce approaches ties that such engine provides. It extends such an engine in sev- eral ways, including column-oriented in-memory storage and dy- have largely been dismissed for interactive-speed queries [31], and even Google is developing new engines for such workloads [29]. namic mid-query replanning, to effectively execute SQL. The result is a system that matches the speedups reported for MPP analytic Instead, most MPP analytic databases (e.g., Vertica, Greenplum, databases over MapReduce, while offering fault tolerance proper- Teradata) and several of the new low-latency engines proposed for ties and complex analytics capabilities that they lack. MapReduce environments (e.g., Google Dremel [29], Cloudera Im- pala [1]) employ a coarser-grained recovery model, where an entire Categories and Subject Descriptors query has to be resubmitted if a machine fails.1 This works well for short queries where a retry is inexpensive, but faces significant H.2 [Database Management]: Systems challenges for long queries as clusters scale up [4]. In addition, Keywords these systems often lack the rich analytics functions that are easy to implement in MapReduce, such as machine learning and graph Databases; Data Warehouse; Machine Learning; Spark; Shark; Hadoop algorithms. Furthermore, while it may be possible to implement some of these functions using UDFs, these algorithms are often ex- 1 Introduction pensive, exacerbating the need for fault and straggler recovery for Modern data analysis faces a confluence of growing challenges. long queries. Thus, most organizations tend to use other systems First, data volumes are expanding dramatically, creating the need alongside MPP databases to perform complex analytics. to scale out across clusters of hundreds of commodity machines. To provide an effective environment for big data analysis, we Second, such high scale increases the incidence of faults and strag- believe that processing systems will need to support both SQL and glers (slow tasks), complicating parallel database design. Third, the complex analytics efficiently, and to provide fine-grained fault re- complexity of data analysis has also grown: modern data analysis covery across both types of operations. This paper describes a new employs sophisticated statistical methods, such as machine learn- system that meets these goals, called Shark. Shark is open source ing algorithms, that go well beyond the roll-up and drill-down ca- and compatible with Apache Hive, and has already been used at pabilities of traditional enterprise data warehouse systems. Finally, web companies to speed up queries by 40–100×. despite these increases in scale and complexity, users still expect to Shark builds on a recently-proposed distributed shared memory be able to query data at interactive speeds. abstraction called Resilient Distributed Datasets (RDDs) [39] to To tackle the “big data” problem, two major lines of systems perform most computations in memory while offering fine-grained have recently been explored. The first, consisting of MapReduce [17] fault tolerance. In-memory computing is increasingly important in large-scale analytics for two reasons. First, many complex analyt- ics functions, such as machine learning and graph algorithms, are iterative, scanning the data multiple times; thus, the fastest sys- Permission to make digital or hard copies of all or part of this work for tems deployed for these applications are in-memory [28, 27, 39]. personal or classroom use is granted without fee provided that copies are Second, even traditional SQL warehouse workloads exhibit strong not made or distributed for profit or commercial advantage and that copies bear this notice and the full citation on the first page. To copy otherwise, to temporal and spatial locality, because more-recent fact table data republish, to post on servers or to redistribute to lists, requires prior specific 1 permission and/or a fee. Dremel provides fault tolerance within a query, but Dremel is lim- SIGMOD’13, June 22–27, 2013, New York, New York, USA. ited to aggregation trees instead of the more complex communica- Copyright 2013 ACM 978-1-4503-2037-5/13/06 ...$15.00. tion patterns in joins.

2.and small dimension tables are read disproportionately often. A study of Facebook’s Hive warehouse and Microsoft’s Bing analyt- ics cluster showed that over 95% of queries in both systems could be served out of memory using just 64 GB/node as a cache, even though each system manages more than 100 PB of total data [6]. The main benefit of RDDs is an efficient mechanism for fault recovery. Traditional main-memory databases support fine-grained updates to tables and replicate writes across the network for fault tolerance, which is expensive on large commodity clusters. In con- trast, RDDs restrict the programming interface to coarse-grained deterministic operators that affect multiple data items at once, such as map, group-by and join, and recover from failures by tracking the lineage of each dataset and recomputing lost data. This approach works well for data-parallel relational queries, and has also been shown to support machine learning and graph computation [39]. Figure 1: Shark Architecture Thus, when a node fails, Shark can recover mid-query by rerun- ning the deterministic operations used to build lost data partitions on other nodes, similar to MapReduce. Indeed, it typically recovers functions. Shark is compatible with Apache Hive, enabling users within seconds by parallelizing this work across the cluster. to run Hive queries much faster without any changes to either the To run SQL efficiently, however, we also had to extend the RDD queries or the data. execution model, bringing in several concepts from traditional an- Thanks to its Hive compatibility, Shark can query data in any alytical databases and some new ones. We started with an exist- system that supports the Hadoop storage API, including HDFS and ing implementation of RDDs called Spark [39], and added several Amazon S3. It also supports a wide range of data formats such features. First, to store and process relational data efficiently, we as text, binary sequence files, JSON, and XML. It inherits Hive’s implemented in-memory columnar storage and columnar compres- schema-on-read capability and nested data types [34]. sion. This reduced both the data size and the processing time by In addition, users can choose to load high-value data into Shark’s as much as 5× over naïvely storing the data in a Spark program memory store for fast analytics, as illustrated below: in its original format. Second, to optimize SQL queries based on CREATE TABLE latest_logs the data characteristics even in the presence of analytics functions TBLPROPERTIES ("shark.cache"=true) and UDFs, we extended Spark with Partial DAG Execution (PDE): AS SELECT * FROM logs WHERE date > now()-3600; Shark can reoptimize a running query after running the first few stages of its task DAG, choosing better join strategies or the right Figure 1 shows the architecture of a Shark cluster, consisting of degree of parallelism based on observed statistics. Third, we lever- a single master node and a number of worker nodes, with the ware- age other properties of the Spark engine not present in traditional house metadata stored in an external transactional database. It is MapReduce systems, such as control over data partitioning. built on top of Spark, a modern MapReduce-like cluster computing Our implementation of Shark is compatible with Apache Hive engine. When a query is submitted to the master, Shark compiles [34], supporting all of Hive’s SQL dialect and UDFs and allowing the query into operator tree represented as RDDs, as we shall dis- execution over unmodified Hive data warehouses. It augments SQL cuss in Section 2.4. These RDDs are then translated by Spark into with complex analytics functions written in Spark, using Spark’s a graph of tasks to execute on the worker nodes. Java, Scala or Python APIs. These functions can be combined with Cluster resources can optionally be allocated by a resource man- SQL in a single execution plan, providing in-memory data sharing ager (e.g., Hadoop YARN [2] or Apache Mesos [21]) that provides and fast recovery across both types of processing. resource sharing and isolation between different computing frame- Experiments show that using RDDs and the optimizations above, works, allowing Shark to coexist with engines like Hadoop. Shark can answer SQL queries up to 100× faster than Hive, runs In the remainder of this section, we cover the basics of Spark and iterative machine learning algorithms more than 100× faster than the RDD programming model, and then we describe how Shark Hadoop, and can recover from failures mid-query within seconds. query plans are generated and executed. Shark’s speed is comparable to that of MPP databases in bench- marks like Pavlo et al.’s comparison with MapReduce [31], but 2.1 Spark it offers fine-grained recovery and complex analytics features that Spark is the MapReduce-like cluster computing engine used by these systems lack. Shark. Spark has several features that differentiate it from tradi- More fundamentally, our work shows that MapReduce-like exe- tional MapReduce engines [39]: cution models can be applied effectively to SQL, and offer a promis- ing way to combine relational and complex analytics. In addi- 1. Like Dryad [22] and Hyracks [10], it supports general com- tion, we explore why current SQL engines implemented on top putation DAGs, not just the two-stage MapReduce topology. of MapReduce runtimes, such as Hive, are slow. We show how a combination of enhancements in Shark (e.g., PDE), and engine 2. It provides an in-memory storage abstraction called Resilient properties that have not been optimized in MapReduce, such as the Distributed Datasets (RDDs) that lets applications keep data overhead of launching tasks, eliminate many of the bottlenecks in in memory across queries, and automatically reconstructs any traditional MapReduce systems. data lost during failures [39]. 3. The engine is optimized for low latency. It can efficiently 2 System Overview manage tasks as short as 100 milliseconds on clusters of As described in the previous section, Shark is a data analysis sys- thousands of cores, while engines like Hadoop incur a la- tem that supports both SQL query processing and machine learning tency of 5–10 seconds to launch each task.

3. need to replicate each byte written to another machine for fault- tolerance. DRAM in a modern server is over 10× faster than even a 10-Gigabit network. Second, Spark can keep just one copy of each RDD partition in memory, saving precious memory over a repli- cated system, since it can always recover lost data using lineage. Third, when a node fails, its lost RDD partitions can be rebuilt in parallel across the other nodes, allowing speedy recovery.3 Fourth, even if a node is just slow (a “straggler”), we can recompute nec- essary partitions on other nodes because RDDs are immutable so there are no consistency concerns with having two copies of a par- tition. These benefits make RDDs attractive as the foundation for our relational processing in Shark. Figure 2: Lineage graph for the RDDs in our Spark example. Oblongs represent RDDs, while circles show partitions within 2.3 Fault Tolerance Guarantees a dataset. Lineage is tracked at the granularity of partitions. To summarize the benefits of RDDs, Shark provides the following fault tolerance properties, which have been difficult to support in traditional MPP database designs: RDDs are unique to Spark, and were essential to enabling mid- query fault tolerance. However, the other differences are important 1. Shark can tolerate the loss of any set of worker nodes. The engineering elements that contribute to Shark’s performance. execution engine will re-execute any lost tasks and recom- In addition to these features, we have also modified the Spark pute any lost RDD partitions using lineage.4 This is true engine for Shark to support partial DAG execution, that is, modi- even within a query: Spark will rerun any failed tasks, or fication of the query plan DAG after only some of the stages have lost dependencies of new tasks, without aborting the query. finished, based on statistics collected from these stages. Similar to 2. Recovery is parallelized across the cluster. If a failed node [25], we use this technique to optimize join algorithms and other as- contained 100 RDD partitions, these can be rebuilt in parallel pects of the execution mid-query, as we shall discuss in Section 3.1. on 100 different nodes, quickly recovering the lost data. 2.2 Resilient Distributed Datasets (RDDs) 3. The deterministic nature of RDDs also enables straggler mit- Spark’s main abstraction is resilient distributed datasets (RDDs), igation: if a task is slow, the system can launch a speculative which are immutable, partitioned collections that can be created “backup copy” of it on another node, as in MapReduce [17]. through various data-parallel operators (e.g., map, group-by, hash- 4. Recovery works even for queries that combine SQL and ma- join). Each RDD is either a collection stored in an external storage chine learning UDFs (Section 4), as these operations all com- system, such as a file in HDFS, or a derived dataset created by pile into a single RDD lineage graph. applying operators to other RDDs. For example, given an RDD of (visitID, URL) pairs for visits to a website, we might compute an 2.4 Executing SQL over RDDs RDD of (URL, count) pairs by applying a map operator to turn each Shark runs SQL queries over Spark using a three-step process sim- event into an (URL, 1) pair, and then a reduce to add the counts by ilar to traditional RDBMSs: query parsing, logical plan generation, URL. and physical plan generation. In Spark’s native API, RDD operations are invoked through a Given a query, Shark uses the Hive query compiler to parse the functional interface similar to DryadLINQ [24] in Scala, Java or query and generate an abstract syntax tree. The tree is then turned Python. For example, the Scala code for the query above is: into a logical plan and basic logical optimization, such as predi- val visits = spark.hadoopFile("hdfs://...") cate pushdown, is applied. Up to this point, Shark and Hive share val counts = visits.map(v => (v.url, 1)) an identical approach. Hive would then convert the operator into a .reduceByKey((a, b) => a + b) physical plan consisting of multiple MapReduce stages. In the case of Shark, its optimizer applies additional rule-based optimizations, RDDs can contain arbitrary data types as elements (since Spark such as pushing LIMIT down to individual partitions, and creates runs on the JVM, these elements are Java objects), and are au- a physical plan consisting of transformations on RDDs rather than tomatically partitioned across the cluster, but they are immutable MapReduce jobs. We use a variety of operators already present in once created, and they can only be created through Spark’s deter- Spark, such as map and reduce, as well as new operators we imple- ministic parallel operators. These two restrictions, however, enable mented for Shark, such as broadcast joins. Spark’s master then exe- highly efficient fault recovery. In particular, instead of replicating cutes this graph using standard MapReduce scheduling techniques, each RDD across nodes for fault-tolerance, Spark remembers the such as placing tasks close to their input data, rerunning lost tasks, lineage of the RDD (the graph of operators used to build it), and and performing straggler mitigation [39]. recovers lost partitions by recomputing them from base data [39].2 While this basic approach makes it possible to run SQL over For example, Figure 2 shows the lineage graph for the RDDs com- Spark, doing it efficiently is challenging. The prevalence of UDFs puted above. If Spark loses one of the partitions in the (URL, 1) and complex analytic functions in Shark’s workload makes it diffi- RDD, for example, it can recompute it by rerunning the map on cult to determine an optimal query plan at compile time, especially just the corresponding partition of the input file. for new data that has not undergone ETL. In addition, even with The RDD model offers several key benefits in our large-scale in- 3 memory computing setting. First, RDDs can be written at the speed To provide fault tolerance across “shuffle” operations like a par- of DRAM instead of the speed of the network, because there is no allel reduce, the execution engine also saves the “map” side of the shuffle in memory on the source nodes, spilling to disk if necessary. 2 4 We assume that external files for RDDs representing data do not Support for master recovery could also be added by reliabliy log- change, or that we can take a snapshot of a file when we create an ging the RDD lineage graph and the submitted jobs, because this RDD from it. state is small, but we have not implemented this yet.

4.such a plan, naïvely executing it over Spark (or other MapReduce Table 2 Stage 2 runtimes) can be inefficient. In the next section, we discuss sev- eral extensions we made to Spark to efficiently store relational data and run SQL, starting with a mechanism that allows for dynamic, statistics-driven re-optimization at run-time. 3 Engine Extensions In this section, we describe our modifications to the Spark engine to enable efficient execution of SQL queries. Join Join Result Result 3.1 Partial DAG Execution (PDE) Table 1 Stage 1 Systems like Shark and Hive are frequently used to query fresh data Map join Shuffle join that has not undergone a data loading process. This precludes the Figure 3: Data flows for map join and shuffle join. Map join use of static query optimization techniques that rely on accurate a broadcasts the small table to all large table partitions, while priori data statistics, such as statistics maintained by indices. The shuffle join repartitions and shuffles both tables. lack of statistics for fresh data, combined with the prevalent use of UDFs, requires dynamic approaches to query optimization. the join key. Each reducer joins corresponding partitions using a To support dynamic query optimization in a distributed setting, local join algorithm, which is chosen by each reducer based on run- we extended Spark to support partial DAG execution (PDE), a tech- time statistics. If one of a reducer’s input partitions is small, then it nique that allows dynamic alteration of query plans based on data constructs a hash table over the small partition and probes it using statistics collected at run-time. the large partition. If both partitions are large, then a symmetric We currently apply partial DAG execution at blocking “shuf- hash join is performed by constructing hash tables over both inputs. fle" operator boundaries where data is exchanged and repartitioned, In map join, also known as broadcast join, a small input table is since these are typically the most expensive operations in Shark. By broadcast to all nodes, where it is joined with each partition of a default, Spark materializes the output of each map task in memory large table. This approach can result in significant cost savings by before a shuffle, spilling it to disk as necessary. Later, reduce tasks avoiding an expensive repartitioning and shuffling phase. fetch this output. Map join is only worthwhile if some join inputs are small, so PDE modifies this mechanism in two ways. First, it gathers cus- Shark uses partial DAG execution to select the join strategy at run- tomizable statistics at global and per-partition granularities while time based on its inputs’ exact sizes. By using sizes of the join materializing map outputs. Second, it allows the DAG to be altered inputs gathered at run-time, this approach works well even with in- based on these statistics, either by choosing different operators or put tables that have no prior statistics, such as intermediate results. altering their parameters (such as their degrees of parallelism). Run-time statistics also inform the join tasks’ scheduling poli- These statistics are customizable using a simple, pluggable ac- cies. If the optimizer has a prior belief that a particular join input cumulator API. Some example statistics include: will be small, it will schedule that task before other join inputs and 1. Partition sizes and record counts, which can be used to detect decide to perform a map-join if it observes that the task’s output is skew. small. This allows the query engine to avoid performing the pre- shuffle partitioning of a large table once the optimizer has decided 2. Lists of “heavy hitters,” i.e., items that occur frequently in to perform a map-join. the dataset. 3.1.2 Skew-handling and Degree of Parallelism 3. Approximate histograms, which can be used to estimate par- titions’ data distributions. Partial DAG execution can also be used to determine operators’ degrees of parallelism and to mitigate skew. These statistics are sent by each worker to the master, where they The degree of parallelism for reduce tasks can have a large per- are aggregated and presented to the optimizer. For efficiency, we formance impact: launching too few reducers may overload re- use lossy compression to record the statistics, limiting their size to ducers’ network connections and exhaust their memories, while 1–2 KB per task. For instance, we encode partition sizes (in bytes) launching too many may prolong the job due to task scheduling with logarithmic encoding, which can represent sizes of up to 32 overhead. Hive’s performance is especially sensitive to the number GB using only one byte with at most 10% error. The master can of reduce tasks [8], due to Hadoop’s large scheduling overhead. then use these statistics to perform various run-time optimizations, Using partial DAG execution, Shark can use individual parti- as we shall discuss next. tions’ sizes to determine the number of reducers at run-time by co- Partial DAG execution complements existing adaptive query op- alescing many small, fine-grained partitions into fewer coarse par- timization techniques that typically run in a single-node system [7, titions that are used by reduce tasks. To mitigate skew, fine-grained 25, 36], as we can use existing techniques to dynamically optimize partitions are assigned to coalesced partitions using a greedy bin- the local plan within each node, and use PDE to optimize the global packing heuristic that attempts to equalize coalesced partitions’ structure of the plan at stage boundaries. This fine-grained statis- sizes [19]. This offers performance benefits, especially when good tics collection, and the optimizations that it enables, differentiates bin-packings exist. PDE from graph rewriting features in previous systems, such as Somewhat surprisingly, we discovered that Shark can obtain sim- DryadLINQ [24]. ilar performance improvement simply by running a larger number 3.1.1 Join Optimization of reduce tasks. We attribute this to Spark’s low scheduling and task-launching overhead. Partial DAG execution can be used to perform several run-time op- timizations for join queries. 3.2 Columnar Memory Store Figure 3 illustrates two communication patterns for MapReduce- In-memory computation is essential to low-latency query answer- style joins. In shuffle join, both join tables are hash-partitioned by ing, given that memory’s throughput is orders of magnitude higher

5.than that of disks. Naïvely using Spark’s memory store, however, using the same dataset used in [31], Shark provides the same through- can lead to undesirable performance. For this reason, Shark imple- put as Hadoop in loading data into HDFS. Shark is 5 times faster ments a columnar memory store on top of Spark’s native memory than Hadoop when loading data into its memory store. store. 3.4 Data Co-partitioning In-memory data representation affects both space footprint and read throughput. A naïve approach is to simply cache the on-disk In some warehouse workloads, two tables are frequently joined to- data in its native format, performing on-demand deserialization in gether. For example, the TPC-H benchmark frequently joins the the query processor. This deserialization becomes a major bottle- lineitem and order tables. A technique commonly used by MPP neck: in our studies, we saw that modern commodity CPUs can databases is to co-partition the two tables based on their join key in deserialize at a rate of only 200MB per second per core. the data loading process. In distributed file systems like HDFS, The approach taken by Spark’s default memory store is to store the storage system is schema-agnostic, which prevents data co- data partitions as collections of JVM objects. This avoids deserial- partitioning. Shark allows co-partitioning two tables on a com- ization, since the query processor can directly use these objects, but mon key for faster joins in subsequent queries. This can be ac- leads to significant storage space overheads. Common JVM imple- complished with the DISTRIBUTE BY clause: mentations add 12 to 16 bytes of overhead per object. For example, CREATE TABLE l_mem TBLPROPERTIES ("shark.cache"=true) storing 270 MB of TPC-H lineitem table as JVM objects uses ap- AS SELECT * FROM lineitem DISTRIBUTE BY L_ORDERKEY; proximately 971 MB of memory, while a serialized representation CREATE TABLE o_mem TBLPROPERTIES ( requires only 289 MB, nearly three times less space. A more seri- "shark.cache"=true, "copartition"="l_mem") ous implication, however, is the effect on garbage collection (GC). AS SELECT * FROM order DISTRIBUTE BY O_ORDERKEY; With a 200 B record size, a 32 GB heap can contain 160 million ob- When joining two co-partitioned tables, Shark’s optimizer con- jects. The JVM garbage collection time correlates linearly with the structs a DAG that avoids the expensive shuffle and instead uses number of objects in the heap, so it could take minutes to perform map tasks to perform the join. a full GC on a large heap. These unpredictable, expensive garbage collections cause large variability in response times. 3.5 Partition Statistics and Map Pruning Shark stores all columns of primitive types as JVM primitive Typically, data is stored using some logical clustering on one or arrays. Complex data types supported by Hive, such as map and more columns. For example, entries in a website’s traffic log data array, are serialized and concatenated into a single byte array. might be grouped by users’ physical locations, because logs are first Each column creates only one JVM object, leading to fast GCs and stored in data centers that have the best geographical proximity to a compact data representation. The space footprint of columnar users. Within each data center, logs are append-only and are stored data can be further reduced by cheap compression techniques at vir- in roughly chronological order. As a less obvious case, a news site’s tually no CPU cost. Similar to columnar database systems, e.g., C- logs might contain news_id and timestamp columns that are store [32], Shark implements CPU-efficient compression schemes strongly correlated. For analytical queries, it is typical to apply such as dictionary encoding, run-length encoding, and bit packing. filter predicates or aggregations over such columns. For example, Columnar data representation also leads to better cache behavior, a daily warehouse report might describe how different visitor seg- especially for for analytical queries that frequently compute aggre- ments interact with the website; this type of query naturally ap- gations on certain columns. plies a predicate on timestamps and performs aggregations that are 3.3 Distributed Data Loading grouped by geographical location. This pattern is even more fre- quent for interactive data analysis, during which drill-down opera- In addition to query execution, Shark also uses Spark’s execution tions are frequently performed. engine for distributed data loading. During loading, a table is split Map pruning is the process of pruning data partitions based on into small partitions, each of which is loaded by a Spark task. The their natural clustering columns. Since Shark’s memory store splits loading tasks use the data schema to extract individual fields from data into small partitions, each block contains only one or few log- rows, marshal a partition of data into its columnar representation, ical groups on such columns, and Shark can avoid scanning certain and store those columns in memory. blocks of data if their values fall out of the query’s filter range. Each data loading task tracks metadata to decide whether each To take advantage of these natural clusterings of columns, Shark’s column in a partition should be compressed. For example, the memory store on each worker piggybacks the data loading process loading task will compress a column using dictionary encoding to collect statistics. The information collected for each partition in- if its number of distinct values is below a threshold. This allows cludes the range of each column and the distinct values if the num- each task to choose the best compression scheme for each partition, ber of distinct values is small (i.e., enum columns). The collected rather than conforming to a global compression scheme that might statistics are sent back to the master program and kept in memory not be optimal for local partitions. These local decisions do not for pruning partitions during query execution. require coordination among data loading tasks, allowing the load When a query is issued, Shark evaluates the query’s predicates phase to achieve a maximum degree of parallelism, at the small cost against all partition statistics; partitions that do not satisfy the pred- of requiring each partition to maintain its own compression meta- icate are pruned and Shark does not launch tasks to scan them. data. It is important to clarify that an RDD’s lineage does not need We collected a sample of queries from the Hive warehouse of a to contain the compression scheme and metadata for each parti- video analytics company, and out of the 3833 queries we obtained, tion. The compression scheme and metadata are simply byproducts at least 3277 of them contained predicates that Shark can use for of the RDD computation, and can be deterministically recomputed map pruning. Section 6 provides more details on this workload. along with the in-memory data in the case of failures. As a result, Shark can load data into memory at the aggregated 4 Machine Learning Support throughput of the CPUs processing incoming data. A key design goal of Shark is to provide a single system capable Pavlo et al.[31] showed that Hadoop was able to perform data of efficient SQL query processing and sophisticated machine learn- loading at 5 to 10 times the throughput of MPP databases. Tested ing. Following the principle of pushing computation to data, Shark

6.def logRegress(points: RDD[Point]): Vector { Note that this distributed logistic regression implementation in var w = Vector(D, _ => 2 * rand.nextDouble - 1) Shark looks remarkably similar to a program implemented for a for (i <- 1 to ITERATIONS) { single node in the Scala language. The user can conveniently mix val gradient = points.map { p => the best parts of both SQL and MapReduce-style programming. val denom = 1 + exp(-p.y * (w dot p.x)) (1 / denom - 1) * p.y * p.x Currently, Shark provides native support for Scala, Java and Python. }.reduce(_ + _) We have modified the Scala shell to enable interactive execution of w -= gradient both SQL and distributed machine learning algorithms. Because } Shark is built on top of the JVM, it would be relatively straightfor- w ward to support other JVM languages, such as Clojure or JRuby. } We have implemented a number of basic machine learning al- val users = sql2rdd("SELECT * FROM user u gorithms, including linear regression, logistic regression, and k- JOIN comment c ON c.uid=u.uid") means clustering. In most cases, the user only needs to supply a mapRows function to perform feature extraction and can invoke val features = users.mapRows { row => the provided algorithms. new Vector(extractFeature1(row.getInt("age")), The above example demonstrates how machine learning compu- extractFeature2(row.getStr("country")), tations can be performed on query results. Using RDDs as the main ...)} val trainedVector = logRegress(features.cache()) data structure for query operators also enables one to use SQL to query the results of machine learning computations in a single exe- cution plan. Listing 1: Logistic Regression Example 4.2 Execution Engine Integration In addition to language integration, another key benefit of using supports machine learning as a first-class citizen. This is enabled RDDs as the data structure for operators is the execution engine in- by the design decision to choose Spark as the execution engine and tegration. This common abstraction allows machine learning com- RDD as the main data structure for operators. In this section, we putations and SQL queries to share workers and cached data with- explain Shark’s language and execution engine integration for SQL out the overhead of data movement. and machine learning. Because SQL query processing is implemented using RDDs, lin- eage is kept for the whole pipeline, which enables end-to-end fault Other research projects [16, 18] have demonstrated that it is pos- sible to express certain machine learning algorithms in SQL and tolerance for the entire workflow. If failures occur during the ma- avoid moving data out of the database. The implementation of chine learning stage, partitions on faulty nodes will automatically those projects, however, involves a combination of SQL, UDFs, be recomputed based on their lineage. and driver programs written in other languages. The systems be- 5 Implementation come obscure and difficult to maintain; in addition, they may sacri- fice performance by performing expensive parallel numerical com- While implementing Shark, we discovered that a number of engi- putations on traditional database engines that were not designed for neering details had significant performance impacts. Overall, to such workloads. Contrast this with the approach taken by Shark, improve the query processing speed, one should minimize the tail which offers in-database analytics that push computation to data, latency of tasks and the CPU cost of processing each row. but does so using a runtime that is optimized for such workloads Memory-based Shuffle: Both Spark and Hadoop write map out- and a programming model that is designed to express machine learn- put files to disk, hoping that they will remain in the OS buffer cache ing algorithms. when reduce tasks fetch them. In practice, we have found that the extra system calls and file system journaling adds significant over- 4.1 Language Integration head. In addition, the inability to control when buffer caches are In addition to executing a SQL query and returning its results, Shark flushed leads to variability in shuffle tasks. A query’s response time also allows queries to return the RDD representing the query plan. is determined by the last task to finish, and thus the increasing vari- Callers to Shark can then invoke distributed computation over the ability leads to long-tail latency, which significantly hurts shuffle query result using the returned RDD. performance. We modified the shuffle phase to materialize map As an example of this integration, Listing 1 illustrates a data outputs in memory, with the option to spill them to disk. analysis pipeline that performs logistic regression over a user database. Temporary Object Creation: It is easy to write a program that Logistic regression, a common classification algorithm, searches creates many temporary objects, which can burden the JVM’s garbage for a hyperplane w that best separates two sets of points (e.g. spam- collector. For a parallel job, a slow GC at one task may slow the mers and non-spammers). The algorithm applies gradient descent entire job. Shark operators and RDD transformations are written in optimization by starting with a randomized w vector and iteratively a way that minimizes temporary object creations. updating it by moving along gradients towards an optimum value. The program begins by using sql2rdd to issue a SQL query to Bytecode Compilation of Expression Evaluators: In its current retreive user information as a TableRDD. It then performs feature implementation, Shark sends the expression evaluators generated extraction on the query rows and runs logistic regression over the by the Hive parser as part of the tasks to be executed on each row. extracted feature matrix. Each iteration of logRegress applies a By profiling Shark, we discovered that for certain queries, when function of w to all data points to produce a set of gradients, which data is served out of the memory store the majority of the CPU cy- are summed to produce a net gradient that is used to update w. cles are wasted in interpreting these evaluators. We are working on The highlighted map, mapRows, and reduce functions are au- a compiler to transform these expression evaluators into JVM byte- tomatically parallelized by Shark to execute across a cluster, and code, which can further increase the execution engine’s throughput. the master program simply collects the output of the reduce func- Specialized Data Structures: Using specialized data structures is tion to update w. an optimization that we have yet to exploit. For example, Java’s

7.hash table is built for generic objects. When the hash key is a prim- itive type, the use of specialized data structures can lead to more Shark Shark (disk) Hive compact data representations, and thus better cache behavior. 2500 100 600 6 Experiments 500 2000 80 We evaluated Shark using four datasets: 400 Time (seconds) 1500 1. Pavlo et al. Benchmark: 2.1 TB of data reproducing Pavlo et 60 300 al.’s comparison of MapReduce vs. analytical DBMSs [31]. 1000 40 2. TPC-H Dataset: 100 GB and 1 TB datasets generated by the 200 DBGEN program [35]. 500 20 100 3. Real Hive Warehouse: 1.7 TB of sampled Hive warehouse 147 32 1.1 data from an early industrial user of Shark. 0 0 0 Aggregation Aggregation 4. Machine Learning Dataset: 100 GB synthetic dataset to mea- Selection 2.5M Groueps 1K Groueps sure the performance of machine learning algorithms. Figure 4: Selection and aggregation query runtimes (seconds) from Pavlo et al. benchmark Overall, our results show that Shark can perform more than 100× faster than Hive and Hadoop, even though we have yet to imple- ment some of the performance optimizations mentioned in the pre- vious section. In particular, Shark provides comparable perfor- Hive mance gains to those reported for MPP databases in Pavlo et al.’s Shark (disk) Shark comparison [31]. In some cases where data fits in memory, Shark Copartitioned exceeds the performance reported for MPP databases. We emphasize that we are not claiming that Shark is funda- 0 500 1000 1500 2000 mentally faster than MPP databases; there is no reason why MPP engines could not implement the same processing optimizations Figure 5: Join query runtime (seconds) from Pavlo benchmark as Shark. Indeed, our implementation has several disadvantages relative to commercial engines, such as running on the JVM. In- stead, we aim to show that it is possible to achieve comparable per- Hive. Despite this tuning, Shark outperformed Hive in all cases by formance while retaining a MapReduce-like engine, and the fine- a wide margin. grained fault recovery features that such engines provide. In addi- 6.2.1 Selection Query tion, Shark can leverage this engine to perform machine learning functions on the same data, which we believe will be essential for The first query was a simple selection on the rankings table: future analytics workloads. SELECT pageURL, pageRank 6.1 Methodology and Cluster Setup FROM rankings WHERE pageRank > X; Unless otherwise specified, experiments were conducted on Ama- In [31], Vertica outperformed Hadoop by a factor of 10 because zon EC2 using 100 m2.4xlarge nodes. Each node had 8 virtual a clustered index was created for Vertica. Even without a clustered cores, 68 GB of memory, and 1.6 TB of local storage. index, Shark was able to execute this query 80× faster than Hive The cluster was running 64-bit Linux 3.2.28, Apache Hadoop for in-memory data, and 5× on data read from HDFS. 0.20.205, and Apache Hive 0.9. For Hadoop MapReduce, the num- ber of map tasks and the number of reduce tasks per node were set 6.2.2 Aggregation Queries to 8, matching the number of cores. For Hive, we enabled JVM The Pavlo et al. benchmark ran two aggregation queries: reuse between tasks and avoided merging small output files, which would take an extra step after each query to perform the merge. SELECT sourceIP, SUM(adRevenue) FROM uservisits GROUP BY sourceIP; We executed each query six times, discarded the first run, and report the average of the remaining five runs. We discard the first SELECT SUBSTR(sourceIP, 1, 7), SUM(adRevenue) run in order to allow the JVM’s just-in-time compiler to optimize FROM uservisits GROUP BY SUBSTR(sourceIP, 1, 7); common code paths. We believe that this more closely mirrors real- world deployments where the JVM will be reused by many queries. In our dataset, the first query had two million groups and the sec- ond had approximately one thousand groups. Shark and Hive both 6.2 Pavlo et al. Benchmarks applied task-local aggregations and shuffled the data to parallelize Pavlo et al. compared Hadoop versus MPP databases and showed the final merge aggregation. Again, Shark outperformed Hive by a that Hadoop excelled at data ingress, but performed unfavorably in wide margin. The benchmarked MPP databases perform local ag- query execution [31]. We reused the dataset and queries from their gregations on each node, and then send all aggregates to a single benchmarks to compare Shark against Hive. query coordinator for the final merging; this performed very well The benchmark used two tables: a 1 GB/node rankings table, when the number of groups was small, but performed worse with and a 20 GB/node uservisits table. For our 100-node cluster, we large number of groups. The MPP databases’ chosen plan is similar recreated a 100 GB rankings table containing 1.8 billion rows and to choosing a single reduce task for Shark and Hive. a 2 TB uservisits table containing 15.5 billion rows. We ran the 6.2.3 Join Query four queries in their experiments comparing Shark with Hive and report the results in Figures 4 and 5. In this subsection, we hand- The final query from Pavlo et al. involved joining the 2 TB uservis- tuned Hive’s number of reduce tasks to produce optimal results for its table with the 100 GB rankings table.

8.SELECT INTO Temp sourceIP, AVG(pageRank), SUM(adRevenue) as totalRevenue FROM rankings AS R, uservisits AS UV Static WHERE R.pageURL = UV.destURL Adaptive AND UV.visitDate BETWEEN Date(’2000-01-15’) Static + Adaptive AND Date(’2000-01-22’) GROUP BY UV.sourceIP; 0 20 40 60 80 100 120 Again, Shark outperformed Hive in all cases. Figure 5 shows that for this query, serving data out of memory did not provide Figure 7: Join strategies chosen by optimizers (seconds) much benefit over disk. This is because the cost of the join step dominated the query processing. Co-partitioning the two tables, however, provided significant benefits as it avoided shuffling 2.1 from HDFS. As can be seen in the figure, Shark was 80× faster TB of data during the join step. than hand-tuned Hive for queries with small numbers of groups, and 20× faster for queries with large numbers of groups, where the 6.2.4 Data Loading shuffle phase domniated the total execution cost. Hadoop was shown by [31] to excel at data loading, as its data We were somewhat surprised by the performance gain observed loading throughput was five to ten times higher than that of MPP for on-disk data in Shark. After all, both Shark and Hive had to databases. As explained in Section 2, Shark can be used to query read data from HDFS and deserialize it for query processing. This data in HDFS directly, which means its data ingress rate is at least difference, however, can be explained by Shark’s very low task as fast as Hadoop’s. launching overhead, optimized shuffle operator, and other factors; After generating the 2 TB uservisits table, we measured the time see Section 7 for more details. to load it into HDFS and compared that with the time to load it into 6.3.2 Join Selection at Run-time Shark’s memory store. We found the rate of data ingress was 5× higher in Shark’s memory store than that of HDFS. In this experiment, we tested how partial DAG execution can im- prove query performance through run-time re-optimization of query 6.3 Micro-Benchmarks plans. The query joined the lineitem and supplier tables from the 1 To understand the factors affecting Shark’s performance, we con- TB TPC-H dataset, using a UDF to select suppliers of interest based ducted a sequence of micro-benchmarks. We generated 100 GB on their addresses. In this specific instance, the UDF selected 1000 and 1 TB of data using the DBGEN program provided by TPC- out of 10 million suppliers. Figure 7 summarizes these results. H [35]. We chose this dataset because it contains tables and columns SELECT * from lineitem l join supplier s of varying cardinality and can be used to create a myriad of micro- ON l.L_SUPPKEY = s.S_SUPPKEY benchmarks for testing individual operators. WHERE SOME_UDF(s.S_ADDRESS) While performing experiments, we found that Hive and Hadoop MapReduce were very sensitive to the number of reducers set for Lacking good selectivity estimation on the UDF, a static opti- a job. Hive’s optimizer automatically sets the number of reducers mizer would choose to perform a shuffle join on these two tables based on the estimated data size. However, we found that Hive’s because the initial sizes of both tables are large. Leveraging partial optimizer frequently made the wrong decision, leading to incredi- DAG execution, after running the pre-shuffle map stages for both bly long query execution times. We hand-tuned the number of re- tables, Shark’s dynamic optimizer realized that the filtered supplier ducers for Hive based on characteristics of the queries and through table was small. It decided to perform a map-join, replicating the trial and error. We report Hive performance numbers for both optimizer- filtered supplier table to all nodes and performing the join using determined and hand-tuned numbers of reducers. Shark, on the only map tasks on lineitem. other hand, was much less sensitive to the number of reducers and To further improve the execution, the optimizer can analyze the required minimal tuning. logical plan and infer that the probability of supplier table being small is much higher than that of lineitem (since supplier is smaller 6.3.1 Aggregation Performance initially, and there is a filter predicate on supplier). The optimizer We tested the performance of aggregations by running group-by chose to pre-shuffle only the supplier table, and avoided launching queries on the TPC-H lineitem table. For the 100 GB dataset, two waves of tasks on lineitem. This combination of static query lineitem table contained 600 million rows. For the 1 TB dataset, analysis and partial DAG execution led to a 3× performance im- it contained 6 billion rows. provement over a naïve, statically chosen plan. The queries were of the form: 6.3.3 Fault Tolerance SELECT [GROUP_BY_COLUMN], COUNT(*) FROM lineitem GROUP BY [GROUP_BY_COLUMN] To measure Shark’s performance in the presence of node failures, we simulated failures and measured query performance before, dur- We chose to run one query with no group-by column (i.e., a sim- ing, and after failure recovery. Figure 8 summarizes fives runs of ple count), and three queries with group-by aggregations: SHIP- our failure recovery experiment, which was performed on a 50- MODE (7 groups), RECEIPTDATE (2500 groups), and SHIPMODE node m2.4xlarge EC2 cluster. (150 million groups in 100 GB, and 537 million groups in 1 TB). We used a group-by query on the 100 GB lineitem table to mea- For both Shark and Hive, aggregations were first performed on sure query performance in the presence of faults. After loading the each partition, and then the intermediate aggregated results were lineitem data into Shark’s memory store, we killed a worker ma- partitioned and sent to reduce tasks to produce the final aggrega- chine and re-ran the query. Shark gracefully recovered from this tion. As the number of groups becomes larger, more data needs to failure and parallelized the reconstruction of lost partitions on the be shuffled across the network. other 49 nodes. This recovery had a small performance impact, but Figure 6 compares the performance of Shark and Hive, measur- it was significantly cheaper than the cost of re-loading the entire ing Shark’s performance on both in-memory data and data loaded dataset and re-executing the query (14 vs 34 secs).

9. 800 100 120 140 Shark Shark 5116 5589 5686 Shark (disk) Shark (disk) Hive (tuned) Hive (tuned) 600 Hive Hive Time (seconds) Time (seconds) 80 400 60 40 200 20 27.4 21.3 5.6 13.2 13.9 1.05 3.5 0.97 0 0 1 7 2.5K 150M 1 7 2.5K 537M TPC−H 100GB TPC−H 1TB Figure 6: Aggregation queries on lineitem table. X-axis indicates the number of groups for each aggregation query. 100 Shark Post−recovery Shark (disk) Single failure Hive 80 No failures Full reload Time (seconds) 60 0 10 20 30 40 Figure 8: Query time with failures (seconds) 40 20 After this recovery, subsequent queries operated against the re- covered dataset, albeit with fewer machines. In Figure 8, the post- 1.1 0.8 0.7 1.0 recovery performance was marginally better than the pre-failure 0 performance; we believe that this was a side-effect of the JVM’s Q1 Q2 Q3 Q4 JIT compiler, as more of the scheduler’s code might have become compiled by the time the post-recovery queries were run. Figure 9: Real Hive warehouse workloads 6.4 Real Hive Warehouse Queries Figure 9 compares the performance of Shark and Hive on these An early industrial user provided us with a sample of their Hive queries. The result is very promising as Shark was able to process warehouse data and two years of query traces from their Hive sys- these real life queries in sub-second latency in all but one cases, tem. A leading video analytics company for content providers and whereas it took Hive 50 to 100 times longer to execute them. publishers, the user built most of their analytics stack based on A closer look into these queries suggests that this data exhibits Hadoop. The sample we obtained contained 30 days of video ses- the natural clustering properties mentioned in Section 3.5. The map sion data, occupying 1.7 TB of disk space when decompressed. It pruning technique, on average, reduced the amount of data scanned consists of a single fact table containing 103 columns, with heavy by a factor of 30. use of complex data types such as array and struct. The sampled query log contains 3833 analytical queries, sorted in or- 6.5 Machine Learning der of frequency. We filtered out queries that invoked proprietary A key motivator of using SQL in a MapReduce environment is the UDFs and picked four frequent queries that are prototypical of ability to perform sophisticated machine learning on big data. We other queries in the complete trace. These queries compute ag- implemented two iterative machine learning algorithms, logistic re- gregate video quality metrics over different audience segments: gression and k-means, to compare the performance of Shark versus running the same workflow in Hive and Hadoop. 1. Query 1 computes summary statistics in 12 dimensions for The dataset was synthetically generated and contained 1 billion users of a specific customer on a specific day. rows and 10 columns, occupying 100 GB of space. Thus, the fea- 2. Query 2 counts the number of sessions and distinct customer/- ture matrix contained 1 billion points, each with 10 dimensions. client combination grouped by countries with filter predi- These machine learning experiments were performed on a 100- cates on eight columns. node m1.xlarge EC2 cluster. Data was initially stored in relational form in Shark’s memory 3. Query 3 counts the number of sessions and distinct users for store and HDFS. The workflow consisted of three steps: (1) select- all but 2 countries. ing the data of interest from the warehouse using SQL, (2) extract- 4. Query 4 computes summary statistics in 7 dimensions group- ing features, and (3) applying iterative algorithms. In step 3, both ing by a column, and showing the top groups sorted in de- algorithms were run for 10 iterations. scending order. Figures 10 and 11 show the time to execute a single iteration

10. within a MapReduce job, the map tasks save their output in case a reduce task fails [17]. Second, many queries need to be compiled Shark 0.96 into multiple MapReduce steps, and engines rely on replicated file Hadoop (binary) systems, such as HDFS, to store the output of each step. Hadoop (text) For the first case, we note that map outputs were stored on disk primarily as a convenience to ensure there is sufficient space to hold 0 20 40 60 80 100 120 them in large batch jobs. Map outputs are not replicated across nodes, so they will still be lost if the mapper node fails [17]. Thus, Figure 10: Logistic regression, per-iteration runtime (seconds) if the outputs fit in memory, it makes sense to store them in memory initially, and only spill them to disk if they are large. Shark’s shuf- fle implementation does this by default, and sees far faster shuffle Shark 4.1 performance (and no seeks) when the outputs fit in RAM. This Hadoop (binary) is often the case in aggregations and filtering queries that return a Hadoop (text) much smaller output than their input.5 Another hardware trend that may improve performance, even for large shuffles, is SSDs, which 0 50 100 150 200 would allow fast random access to a larger space than memory. For the second case, engines that extend the MapReduce execu- Figure 11: K-means clustering, per-iteration runtime (seconds) tion model to general task DAGs can run multi-stage jobs without materializing any outputs to HDFS. Many such engines have been of logistic regression and k-means, respectively. We implemented proposed, including Dryad, Tenzing and Spark [22, 13, 39]. two versions of the algorithms for Hadoop, one storing input data as text in HDFS and the other using a serialized binary format. The Data Format and Layout: While the naïve pure schema-on-read binary representation was more compact and had lower CPU cost approach to MapReduce incurs considerable processing costs, many in record deserialization, leading to improved performance. Our re- systems use more efficient storage formats within the MapReduce sults show that Shark is 100× faster than Hive and Hadoop for lo- model to speed up queries. Hive itself supports “table partitions” (a gistic regression and 30× faster for k-means. K-means experienced basic index-like system where it knows that certain key ranges are less speedup because it was computationally more expensive than contained in certain files, so it can avoid scanning a whole table), as logistic regression, thus making the workflow more CPU-bound. well as column-oriented representation of on-disk data [34]. We go In the case of Shark, if data initially resided in its memory store, further in Shark by using fast in-memory columnar representations step 1 and 2 were executed in roughly the same time it took to run within Spark. Shark does this without modifying the Spark runtime one iteration of the machine learning algorithm. If data was not by simply representing a block of tuples as a single Spark record loaded into the memory store, the first iteration took 40 seconds for (one Java object from Spark’s perspective), and choosing its own both algorithms. Subsequent iterations, however, reported numbers representation for the tuples within this object. consistent with Figures 10 and 11. In the case of Hive and Hadoop, Another feature of Spark that helps Shark, but was not present in every iteration took the reported time because data was loaded from previous MapReduce runtimes, is control over the data partitioning HDFS for every iteration. across nodes (Section 3.4). This lets us co-partition tables. Finally, one capability of RDDs that we do not yet exploit is ran- 7 Discussion dom reads. While RDDs only support coarse-grained operations Shark shows that it is possible to run fast relational queries in a for their writes, read operations on them can be fine-grained, ac- fault-tolerant manner using the fine-grained deterministic task model cessing just one record [39]. This would allow RDDs to be used as introduced by MapReduce. This design offers an effective way to indices. Tenzing can use such remote-lookup reads for joins [13]. scale query processing to ever-larger workloads, and to combine Execution Strategies: Hive spends considerable time on sorting it with rich analytics. In this section, we consider two questions: the data before each shuffle and writing the outputs of each MapRe- first, why were previous MapReduce-based systems, such as Hive, duce stage to HDFS, both limitations of the rigid, one-pass MapRe- slow, and what gave Shark its advantages? Second, are there other duce model in Hadoop. More general runtime engines, such as benefits to the fine-grained task model? We argue that fine-grained Spark, alleviate some of these problems. For instance, Spark sup- tasks also help with multitenancy and elasticity, as has been demon- ports hash-based distributed aggregation and general task DAGs. strated in MapReduce systems. To truly optimize the execution of relational queries, however, 7.1 Why are Previous MapReduce-Based Systems Slow? we found it necessary to select execution plans based on data statis- tics. This becomes difficult in the presence of UDFs and complex Conventional wisdom is that MapReduce is slower than MPP databases analytics functions, which we seek to support as first-class citizens for several reasons: expensive data materialization for fault toler- in Shark. To address this problem, we proposed partial DAG execu- ance, inferior data layout (e.g., lack of indices), and costlier exe- tion (PDE), which allows our modified version of Spark to change cution strategies [31, 33]. Our exploration of Hive confirms these the downstream portion of an execution graph once each stage com- reasons, but also shows that a combination of conceptually simple pletes based on data statistics. PDE goes beyond the runtime graph “engineering” changes to the engine (e.g., in-memory storage) and rewriting features in previous systems, such as DryadLINQ [24], more involved architectural changes (e.g., partial DAG execution) by collecting fine-grained statistics about ranges of keys and by can alleviate them. We also find that a somewhat surprising variable allowing switches to a completely different join strategy, such as not considered in detail in MapReduce systems, the task schedul- broadcast join, instead of just selecting the number of reduce tasks. ing overhead, actually has a dramatic effect on performance, and 5 greatly improves load balancing if minimized. Systems like Hadoop also benefit from the OS buffer cache in serving map outputs, but we found that the extra system calls and Intermediate Outputs: MapReduce-based query engines, such as file system journalling from writing map outputs to files still adds Hive, materialize intermediate data to disk in two situations. First, overhead (Section 5).

11.Task Scheduling Cost: Perhaps the most surprising engine prop- 7.2 Other Benefits of the Fine-Grained Task Model erty that affected Shark, however, was a purely “engineering” con- While this paper has focused primarily on the fault tolerance ben- cern: the overhead of launching tasks. Traditional MapReduce sys- efits of fine-grained deterministic tasks, the model also provides tems, such as Hadoop, were designed for multi-hour batch jobs other attractive properties. We wish to point out two benefits that consisting of tasks that were several minutes long. They launched have been explored in MapReduce-based systems. each task in a separate OS process, and in some cases had a high latency to even submit a task. For instance, Hadoop uses periodic Elasticity: In traditional MPP databases, a distributed query plan “heartbeats” from each worker every 3 seconds to assign tasks, and is selected once, and the system needs to run at that level of par- sees overall task startup delays of 5–10 seconds. This was sufficient allelism for the whole duration of the query. In a fine-grained task for batch workloads, but clearly falls short for ad-hoc queries. system, however, nodes can appear or go away during a query, and Spark avoids this problem by using a fast event-driven RPC li- pending work will automatically be spread onto them. This en- brary to launch tasks and by reusing its worker processes. It can ables the database engine to naturally be elastic. If an administrator launch thousands of tasks per second with only about 5 ms of over- wishes to remove nodes from the engine (e.g., in a virtualized cor- head per task, making task lengths of 50–100 ms and MapReduce porate data center), the engine can simply treat those as failed, or jobs of 500 ms viable. What surprised us is how much this affected (better yet) proactively replicate their data to other nodes if given query performance, even in large (multi-minute) queries. a few minutes’ warning. Similarly, a database engine running on a cloud could scale up by requesting new VMs if a query is expen- Sub-second tasks allow the engine to balance work across nodes sive. Amazon’s Elastic MapReduce [3] already supports resizing extremely well, even when some nodes incur unpredictable delays clusters at runtime. (e.g., network delays or JVM garbage collection). They also help dramatically with skew. Consider, for example, a system that needs Multitenancy: The same elasticity, mentioned above, enables dy- to run a hash aggregation on 100 cores. If the system launches 100 namic resource sharing between users. In some traditional MPP reduce tasks, the key range for each task needs to be carefully cho- databases, if an important query arrives while another large query sen, as any imbalance will slow down the entire job. If it could split is using most of the cluster, there are few options beyond canceling the work among 1000 tasks, then the slowest task can be as much the earlier query. In systems based on fine-grained tasks, one can as 10× slower than the average without affecting the job response simply wait a few seconds for the current tasks from the first query time much! After implementing skew-aware partition selection in to finish, and start giving the nodes tasks from the second query. PDE, we were somewhat disappointed that it did not help compared For instance, Facebook and Microsoft have developed fair sched- to just having a higher number of reduce tasks in most workloads, ulers for Hadoop and Dryad that allow large historical queries, because Spark could comfortably support thousands of such tasks. compute-intensive machine learning jobs, and short ad-hoc queries However, this property makes the engine highly robust to unex- to safely coexist [38, 23]. pected skew. 8 Related Work In this way, Spark stands in contrast to Hadoop/Hive, where us- ing the wrong number of tasks was sometimes 10× slower than To the best of our knowledge, Shark is the only low-latency system an optimal plan, and there has been considerable work to auto- that can efficiently combine SQL and machine learning workloads, matically choose the number of reduce tasks [26, 19]. Figure 12 while supporting fine-grained fault recovery. shows how job execution times vary as the number of reduce tasks We categorize large-scale data analytics systems into three classes. launched by Hadoop and Spark in a simple aggregation query on a First, systems like ASTERIX [9], Tenzing [13], SCOPE [12], Chee- 100-node cluster. Since a Spark job can launch thousands of reduce tah [14], and Hive [34] compile declarative queries into MapReduce- tasks without incurring much overhead, partition data skew can be style jobs. Although some of them modify the execution engine mitigated by always launching many tasks. they are built on, it is hard for these systems to achieve interactive query response times for reasons discussed in Section 7. Second, several projects aim to provide low-latency engines us- ing architectures resembling shared-nothing parallel databases. Such 200 projects include PowerDrill [20] and Impala [1]. These systems 6000 Time (seconds) Time (seconds) do not support fine-grained fault tolerance. In case of mid-query 150 4000 faults, the entire query needs to be re-executed. Google’s Dremel [29] 100 does rerun lost tasks, but it only supports an aggregation tree topol- 2000 ogy for query execution, and not the more complex shuffle DAGs 50 required for large joins or distributed machine learning. 0 0 1000 2000 3000 4000 5000 0 1000 2000 3000 4000 5000 A third class of systems take a hybrid approach by combining a Number of Hadoop Tasks Number of Spark Tasks MapReduce-like engine with relational databases. HadoopDB [4] connects multiple single-node database systems using Hadoop as Figure 12: Task launching overhead the communication layer. Queries can be parallelized using Hadoop More fundamentally, there are few reasons why sub-second tasks MapReduce, but within each MapReduce task, data processing is should not be feasible even at higher scales than we have explored, pushed into the relational database system. Osprey [37] is a middle- such as tens of thousands of nodes. Systems like Dremel [29] rou- ware layer that adds fault-tolerance properties to parallel databases. tinely run sub-second, multi-thousand-node jobs. Indeed, even if It does so by breaking a SQL query into multiple small queries and a single master cannot keep up with the scheduling decisions, the sending them to parallel databases for execution. Shark presents scheduling could be delegated across “lieutenant” masters for sub- a much simpler single-system architecture that supports all of the sets of the cluster. Fine-grained tasks also offer many advantages properties of this third class of systems, as well as statistical learn- over coarser-grained execution graphs beyond load balancing, such ing capabilities that HadoopDB and Osprey lack. as faster recovery (by spreading out lost tasks across more nodes) The partial DAG execution (PDE) technique introduced by Shark and query elasticity [30]; we discuss some of these next. resembles adaptive query optimization techniques proposed in [7,

12.36, 25]. It is, however, unclear how these single-node techniques [3] http://aws.amazon.com/elasticmapreduce/. would work in a distributed setting and scale out to hundreds of [4] A. Abouzeid et al. Hadoopdb: an architectural hybrid of mapreduce nodes. In fact, PDE actually complements some of these tech- and dbms technologies for analytical workloads. VLDB, 2009. niques, as Shark can use PDE to optimize how data gets shuf- [5] S. Agarwal et al. Re-optimizing data-parallel computing. In NSDI’12. fled across nodes, and use the traditional single-node techniques [6] G. Ananthanarayanan et al. Pacman: Coordinated memory caching for parallel jobs. In NSDI, 2012. within a local task. DryadLINQ [24] optimizes its number of re- [7] R. Avnur and J. M. Hellerstein. Eddies: continuously adaptive query duce tasks at run-time based on map output sizes, but does not processing. In SIGMOD, 2000. collect richer statistics, such as histograms, or make broader ex- [8] S. Babu. Towards automatic optimization of mapreduce programs. In ecution plan changes, such as changing join algorithms, like PDE SoCC’10. can. RoPE [5] proposes using historical query information to opti- [9] A. Behm et al. Asterix: towards a scalable, semistructured data mize query plans, but relies on repeatedly executed queries. PDE platform for evolving-world models. Distributed and Parallel works on queries that are executing for the first time. Databases, 29(3):185–216, 2011. Finally, Shark builds on the distributed approaches for machine [10] V. Borkar et al. Hyracks: A flexible and extensible foundation for learning developed in systems like Graphlab [27], Haloop [11], and data-intensive computing. In ICDE’11. [11] Y. Bu et al. HaLoop: efficient iterative data processing on large Spark [39]. However, Shark is unique in offering these capabili- clusters. Proc. VLDB Endow., 2010. ties in a SQL engine, allowing users to select data of interest using [12] R. Chaiken et al. Scope: easy and efficient parallel processing of SQL and immediately run learning algorithms on it without time- massive data sets. VLDB, 2008. consuming export to another system. Compared to Spark, Shark [13] B. Chattopadhyay, , et al. Tenzing a sql implementation on the also provides far more efficient in-memory representation of rela- mapreduce framework. PVLDB, 4(12):1318–1327, 2011. tional data, and mid-query optimization using PDE. [14] S. Chen. Cheetah: a high performance, custom data warehouse on top of mapreduce. VLDB, 2010. 9 Conclusion [15] C. Chu et al. Map-reduce for machine learning on multicore. Advances in neural information processing systems, 19:281, 2007. We have presented Shark, a new data warehouse system that com- [16] J. Cohen, B. Dolan, M. Dunlap, J. Hellerstein, and C. Welton. Mad bines fast relational queries and complex analytics in a single, fault- skills: new analysis practices for big data. VLDB, 2009. tolerant runtime. Shark significantly enhances a MapReduce-like [17] J. Dean and S. Ghemawat. MapReduce: Simplified data processing runtime to efficiently run SQL, by using existing database tech- on large clusters. In OSDI, 2004. niques (e.g., column-oriented storage) and a novel partial DAG [18] X. Feng et al. Towards a unified architecture for in-rdbms analytics. execution (PDE) technique that leverages fine-grained data statis- In SIGMOD, 2012. tics to dynamically reoptimize queries at run-time. This design en- [19] B. Guffler et al. Handling data skew in mapreduce. In CLOSER’11. ables Shark to approach the speedups reported for MPP databases [20] A. Hall et al. Processing a trillion cells per mouse click. VLDB. over MapReduce, while providing support for machine learning al- [21] B. Hindman et al. Mesos: A platform for fine-grained resource sharing in the data center. In NSDI’11. gorithms, as well as mid-query fault tolerance across both SQL [22] M. Isard et al. Dryad: distributed data-parallel programs from queries and machine learning computations. Overall, the system sequential building blocks. SIGOPS, 2007. can be up to 100× faster than Hive for SQL, and more than 100× [23] M. Isard et al. Quincy: Fair scheduling for distributed computing faster than Hadoop for machine learning. More fundamentally, this clusters. In SOSP ’09, 2009. research represents an important step towards a unified architecture [24] M. Isard and Y. Yu. Distributed data-parallel computing using a for efficiently combining complex analytics and relational query high-level programming language. In SIGMOD, 2009. processing. [25] N. Kabra and D. J. DeWitt. Efficient mid-query re-optimization of We have open sourced Shark at shark.cs.berkeley.edu. sub-optimal query execution plans. In SIGMOD, 1998. The latest stable release implements most of the techniques dis- [26] Y. Kwon et al. Skewtune: mitigating skew in mapreduce applications. In SIGMOD ’12, 2012. cussed in this paper, and more advanced features such as PDE and [27] Y. Low et al. Distributed graphlab: a framework for machine learning data copartitioning will be incorporated soon. We have also worked and data mining in the cloud. VLDB, 2012. with two Internet companies as early users. They report speedups [28] G. Malewicz et al. Pregel: a system for large-scale graph processing. of 40–100× on real queries, consistent with our results. In SIGMOD, 2010. [29] S. Melnik et al. Dremel: interactive analysis of web-scale datasets. 10 Acknowledgments Proc. VLDB Endow., 3:330–339, Sept 2010. We thank Cliff Engle, Harvey Feng, Shivaram Venkataraman, Ram [30] K. Ousterhout et al. The case for tiny tasks in compute clusters. In HotOS’13. Sriharsha, Tim Tully, Denny Britz, Antonio Lupher, Patrick Wen- [31] A. Pavlo et al. A comparison of approaches to large-scale data dell, Paul Ruan, Jason Dai, Shane Huang, and other colleagues in analysis. In SIGMOD, 2009. the AMPLab for their work on Shark. We also thank Andy Pavlo [32] M. Stonebraker et al. C-store: a column-oriented dbms. In VLDB’05. and his colleagues for making their benchmark dataset and queries [33] M. Stonebraker et al. Mapreduce and parallel dbmss: friends or foes? available. This research is supported in part by NSF CISE Expe- Commun. ACM. ditions award CCF-1139158 and DARPA XData Award FA8750- [34] A. Thusoo et al. Hive-a petabyte scale data warehouse using hadoop. 12-2-0331, and gifts from Amazon Web Services, Google, SAP, In ICDE, 2010. Blue Goji, Cisco, Clearstory Data, Cloudera, Ericsson, Facebook, [35] Transaction Processing Performance Council. TPC BENCHMARK H. General Electric, Hortonworks, Huawei, Intel, Microsoft, NetApp, [36] T. Urhan, M. J. Franklin, and L. Amsaleg. Cost-based query Oracle, Quanta, Samsung, Splunk, VMware and Yahoo!, and by a scrambling for initial delays. In SIGMOD, 1998. [37] C. Yang et al. Osprey: Implementing mapreduce-style fault tolerance Google PhD Fellowship. in a shared-nothing distributed database. In ICDE, 2010. [38] M. Zaharia et al. Delay scheduling: A simple technique for achieving 11 References locality and fairness in cluster scheduling. In EuroSys 10, 2010. [39] M. Zaharia et al. Resilient distributed datasets: a fault-tolerant [1] https://github.com/cloudera/impala. abstraction for in-memory cluster computing. NSDI, 2012. [2] http://hadoop.apache.org/.