Massively Parallel Databases and MapReduce Systems

Timely and cost-effective analytics over “big data” has emerged as a key ingredient for success in many businesses, scientific and engineeringdisciplines, and government endeavors. Web clicks, social media, scientific experiments, and datacenter monitoring are among data sources that generate vast amounts of raw data every day. The need to convert this raw data into useful information has spawned considerable innovation in systems for large-scale data analytics, especially over the last decade. This monograph covers the design principles and core features of systems for analyzing very large datasets using massively-parallel computation and storage techniques on large clusters of nodes. We first discuss how the requirements of data analytics have evolved since the early work on parallel database systems. We then describe some of the major technological innovations that have each spawned a distinct category of systems for data analytics. Each unique system category is described along a number of dimensions including data model and query interface, storage layer, execution engine, query optimization,scheduling, resource management, and fault tolerance. We conclude with a summary of present trends in large-scale data analytics.

1.Foundations and Trends R in Databases Vol. 5, No. 1 (2012) 1–104 c 2013 S. Babu and H. Herodotou DOI: 10.1561/1900000036 Massively Parallel Databases and MapReduce Systems Shivnath Babu Herodotos Herodotou Duke University Microsoft Research

2. Contents 1 Introduction 2 1.1 Requirements of Large-scale Data Analytics . . . . . . . . 3 1.2 Categorization of Systems . . . . . . . . . . . . . . . . . . 4 1.3 Categorization of System Features . . . . . . . . . . . . . 6 1.4 Related Work . . . . . . . . . . . . . . . . . . . . . . . . 8 2 Classic Parallel Database Systems 10 2.1 Data Model and Interfaces . . . . . . . . . . . . . . . . . 11 2.2 Storage Layer . . . . . . . . . . . . . . . . . . . . . . . . 12 2.3 Execution Engine . . . . . . . . . . . . . . . . . . . . . . 18 2.4 Query Optimization . . . . . . . . . . . . . . . . . . . . . 22 2.5 Scheduling . . . . . . . . . . . . . . . . . . . . . . . . . . 26 2.6 Resource Management . . . . . . . . . . . . . . . . . . . 28 2.7 Fault Tolerance . . . . . . . . . . . . . . . . . . . . . . . 29 2.8 System Administration . . . . . . . . . . . . . . . . . . . 31 3 Columnar Database Systems 33 3.1 Data Model and Interfaces . . . . . . . . . . . . . . . . . 34 3.2 Storage Layer . . . . . . . . . . . . . . . . . . . . . . . . 34 3.3 Execution Engine . . . . . . . . . . . . . . . . . . . . . . 39 3.4 Query Optimization . . . . . . . . . . . . . . . . . . . . . 41 ii

3. iii 3.5 Scheduling . . . . . . . . . . . . . . . . . . . . . . . . . . 42 3.6 Resource Management . . . . . . . . . . . . . . . . . . . 42 3.7 Fault Tolerance . . . . . . . . . . . . . . . . . . . . . . . 43 3.8 System Administration . . . . . . . . . . . . . . . . . . . 44 4 MapReduce Systems 45 4.1 Data Model and Interfaces . . . . . . . . . . . . . . . . . 46 4.2 Storage Layer . . . . . . . . . . . . . . . . . . . . . . . . 47 4.3 Execution Engine . . . . . . . . . . . . . . . . . . . . . . 51 4.4 Query Optimization . . . . . . . . . . . . . . . . . . . . . 54 4.5 Scheduling . . . . . . . . . . . . . . . . . . . . . . . . . . 56 4.6 Resource Management . . . . . . . . . . . . . . . . . . . 58 4.7 Fault Tolerance . . . . . . . . . . . . . . . . . . . . . . . 60 4.8 System Administration . . . . . . . . . . . . . . . . . . . 61 5 Dataflow Systems 62 5.1 Data Model and Interfaces . . . . . . . . . . . . . . . . . 63 5.2 Storage Layer . . . . . . . . . . . . . . . . . . . . . . . . 66 5.3 Execution Engine . . . . . . . . . . . . . . . . . . . . . . 69 5.4 Query Optimization . . . . . . . . . . . . . . . . . . . . . 71 5.5 Scheduling . . . . . . . . . . . . . . . . . . . . . . . . . . 73 5.6 Resource Management . . . . . . . . . . . . . . . . . . . 74 5.7 Fault Tolerance . . . . . . . . . . . . . . . . . . . . . . . 75 5.8 System Administration . . . . . . . . . . . . . . . . . . . 76 6 Conclusions 77 6.1 Mixed Systems . . . . . . . . . . . . . . . . . . . . . . . . 78 6.2 Memory-based Systems . . . . . . . . . . . . . . . . . . . 80 6.3 Stream Processing Systems . . . . . . . . . . . . . . . . . 81 6.4 Graph Processing Systems . . . . . . . . . . . . . . . . . 83 6.5 Array Databases . . . . . . . . . . . . . . . . . . . . . . . 84 References 86

4.Abstract Timely and cost-effective analytics over “big data” has emerged as a key ingredient for success in many businesses, scientific and engineering disciplines, and government endeavors. Web clicks, social media, scien- tific experiments, and datacenter monitoring are among data sources that generate vast amounts of raw data every day. The need to convert this raw data into useful information has spawned considerable inno- vation in systems for large-scale data analytics, especially over the last decade. This monograph covers the design principles and core features of systems for analyzing very large datasets using massively-parallel computation and storage techniques on large clusters of nodes. We first discuss how the requirements of data analytics have evolved since the early work on parallel database systems. We then describe some of the major technological innovations that have each spawned a distinct category of systems for data analytics. Each unique system category is described along a number of dimensions including data model and query interface, storage layer, execution engine, query optimization, scheduling, resource management, and fault tolerance. We conclude with a summary of present trends in large-scale data analytics. S. Babu and H. Herodotou. Massively Parallel Databases and MapReduce Systems. Foundations and Trends R in Databases, vol. 5, no. 1, pp. 1–104, 2012. DOI: 10.1561/1900000036.

5. 1 Introduction Organizations have always experienced the need to run data analyt- ics tasks that convert large amounts of raw data into the information required for timely decision making. Parallel databases like Gamma [75] and Teradata [188] were some of the early systems to address this need. Over the last decade, more and more sources of large datasets have sprung up, giving rise to what is popularly called big data. Web clicks, social media, scientific experiments, and datacenter monitoring are among such sources that generate vast amounts of data every day. Rapid innovation and improvements in productivity necessitate timely and cost-effective analysis of big data. This need has led to considerable innovation in systems for large-scale data analytics over the last decade. Parallel databases have added techniques like columnar data storage and processing [39, 133]. Simultaneously, new distributed compute and storage systems like MapReduce [73] and Bigtable [58] have been developed. This monograph is an attempt to cover the de- sign principles and core features of systems for analyzing very large datasets. We focus on systems for large-scale data analytics, namely, the field that is called Online Analytical Processing (OLAP) as opposed to Online Transaction Processing (OLTP). 2

6.1.1. Requirements of Large-scale Data Analytics 3 We begin in this chapter with an overview of how we have organized the overall content. The overview first discusses how the requirements of data analytics have evolved since the early work on parallel database systems. We then describe some of the major technological innovations that have each spawned a distinct category of systems for data ana- lytics. The last part of the overview describes a number of dimensions along which we will describe and compare each of the categories of systems for large-scale data analytics. The overview is followed by four chapters that each discusses one unique category of systems in depth. The content in the following chap- ters is organized based on the dimensions that will be identified in this chapter. We then conclude with a summary of present trends in large- scale data analytics. 1.1 Requirements of Large-scale Data Analytics The Classic Systems Category: Parallel databases—which consti- tute the classic system category that we discuss—were the first sys- tems to make parallel data processing available to a wide class of users through an intuitive high-level programming model. Parallel databases were based predominantly on the relational data model. The declara- tive SQL was used as the query language for expressing data processing tasks over data stored as tables of records. Parallel databases achieved high performance and scalability by partitioning tables across the nodes in a shared-nothing cluster. Such a horizontal partitioning scheme enabled relational operations like filters, joins, and aggregations to be run in parallel over different partitions of each table stored on different nodes. Three trends started becoming prominent in the early 2000s that raised questions about the superiority of classic parallel databases: • More and more companies started to store as much data as they could collect. The classic parallel databases of the day posed ma- jor hurdles in terms of scalability and total cost of ownership as the need to process these ever-increasing data volumes arose. • The data being collected and stored by companies was diverse in

7.4 Introduction structure. For example, it became a common practice to collect highly structured data such as sales data and user demographics along with less structured data such as search query logs and web page content. It was hard to fit such diverse data into the rigid data models supported by classic parallel databases. • Business needs started to demand shorter and shorter intervals between the time when data was collected (typically in an OLTP system) and the time when the results of analyzing the data were available for manual or algorithmic decision making. These trends spurred two types of innovations: (a) innovations aimed at addressing the deficiencies of classic parallel databases while pre- serving their strengths such as high performance and declarative query languages, and (b) innovations aimed at creating alternate system ar- chitectures that can support the above trends in a cost-effective man- ner. These innovations, together with the category of classic parallel database systems, give the four unique system categories for large-scale data analytics that we will cover. Table 1.1 lists the system categories and some of the systems that fall under each category. 1.2 Categorization of Systems The Columnar Systems Category: Columnar systems pioneered the concept of storing tables by collocating entire columns together instead of collocating rows as done in classic parallel databases. Systems with columnar storage and processing, such as Vertica [133], have been shown to use CPU, memory, and I/O resources more efficiently in large- scale data analytics compared to row-oriented systems. Some of the main benefits come from reduced I/O in columnar systems by reading only the needed columns during query processing. Columnar systems are covered in Chapter 3. The MapReduce Systems Category: MapReduce is a program- ming model and an associated implementation of a run-time system that was developed by Google to process massive datasets by harness- ing a very large cluster of commodity nodes [73]. Systems in the classic

8.1.2. Categorization of Systems 5 Category Example Systems in this Category Classic Aster nCluster [25, 92], DB2 Parallel Edition [33], Gamma [75], Greenplum [99], Netezza [116], SQL Server Parallel Data Warehouse [177], Teradata [188] Columnar Amazon RedShift [12], C-Store [181], Infobright [118], MonetDB [39], ParAccel [164], Sybase IQ [147], Vec- torWise [206], Vertica [133] MapReduce Cascading [52], Clydesdale [123], Google MapReduce [73], Hadoop [192, 14], HadoopDB [5], Hadoop++ [80], Hive [189], JAQL [37], Pig [94] Dataflow Dremel [153], Dryad [197], Hyracks [42], Nephele [34], Pregel [148], SCOPE [204], Shark [195], Spark [199] Table 1.1: The system categories that we consider, and some of the systems that fall under each category. category have traditionally struggled to scale to such levels. MapReduce systems pioneered the concept of building multiple standalone scalable distributed systems, and then composing two or more of these systems together in order to run analytic tasks on large datasets. Popular sys- tems in this category, such as Hadoop [14], store data in a standalone block-oriented distributed file-system, and run computational tasks in another distributed system that supports the MapReduce programming model. MapReduce systems are covered in Chapter 4. The Dataflow Systems Category: Some deficiencies in MapReduce systems were identified as these systems were used for a large number of data analysis tasks. The MapReduce programming model is too re- strictive to express certain data analysis tasks easily, e.g., joining two datasets together. More importantly, the execution techniques used by MapReduce systems are suboptimal for many common types of data analysis tasks such as relational operations, iterative machine learn- ing, and graph processing. Most of these problems can be addressed by replacing MapReduce with a more flexible dataflow-based execution model that can express a wide range of data access and communication

9.6 Introduction patterns. Various dataflow-based execution models have been used by the systems in this category, including directed acyclic graphs in Dryad [197], serving trees in Dremel [153], and bulk synchronous parallel pro- cessing in Pregel [148]. Dataflow systems are covered in Chapter 5. Other System Categories: It became clear over time that new sys- tems can be built by combining design principles from different system categories. For example, techniques used for high-performance process- ing in classic parallel databases can be used together with techniques used for fine-grained fault tolerance in MapReduce systems [5]. Each system in this coalesced category exposes a unified system interface that provides a combined set of features that are traditionally associ- ated with different system categories. We will discuss coalesced systems along with the other system categories in the respective chapters. The need to reduce the gap between the generation of data and the generation of analytics results over this data has required system devel- opers to constantly raise the bar in large-scale data analytics. On one hand, this need saw the emergence of scalable distributed storage sys- tems that provide various degrees of transactional capabilities. Support for transactions enables these systems to serve as the data store for on- line services while making the data available concurrently in the same system for analytics. The same need has led to the emergence of par- allel database systems that support both OLTP and OLAP in a single system. We put both types of systems into the category called mixed systems because of their ability to run mixed workloads—workloads that contain transactional as well as analytics tasks—efficiently. We will discuss mixed systems in Chapter 6 as part of recent trends in massively parallel data analytics. 1.3 Categorization of System Features We have selected eight key system features along which we will describe and compare each of the four categories of systems for large-scale data analytics. Data Model and Interfaces: A data model provides the definition and logical structure of the data, and determines in which manner data

10.1.3. Categorization of System Features 7 can be stored, organized, and manipulated by the system. The most popular example of a data model is the relational model (which uses a table-based format), whereas most systems in the MapReduce and Dataflow categories permit data to be in any arbitrary format stored in flat files. The data model used by each system is closely related to the query interface exposed by the system, which allows users to manage and manipulate the stored data. Storage Layer: At a high level, a storage layer is simply responsible for persisting the data as well as providing methods for accessing and modifying the data. However, the design, implementation and features provided by the storage layer used by each of the different system cat- egories vary greatly, especially as we start comparing systems across the different categories. For example, classic parallel databases use in- tegrated and specialized data stores that are tightly coupled with their execution engines, whereas MapReduce systems typically use an inde- pendent distributed file-system for accessing data. Execution Engine: When a system receives a query for execution, it will typically convert it into an execution plan for accessing and processing the query’s input data. The execution engine is the entity responsible for actually running a given execution plan in the system and generating the query result. In the systems that we consider, the execution engine is also responsible for parallelizing the computation across large-scale clusters of machines, handling machine failures, and setting up inter-machine communication to make efficient use of the network and disk bandwidth. Query Optimization: In general, query optimization is the process a system uses to determine the most efficient way to execute a given query by considering several alternative, yet equivalent, execution plans. The techniques used for query optimization in the systems we consider are very different in terms of: (i) the space of possible execution plans (e.g., relational operators in databases versus configuration parameter set- tings in MapReduce systems), (ii) the type of query optimization (e.g., cost-based versus rule-based), (iii) the type of cost modeling technique (e.g., analytical models versus models learned using machine-learning

11.8 Introduction techniques), and (iv) the maturity of the optimization techniques (e.g., fully automated versus manual tuning). Scheduling: Given the distributed nature of most data analytics sys- tems, scheduling the query execution plan is a crucial part of the sys- tem. Systems must now make several scheduling decisions, including scheduling where to run each computation, scheduling inter-node data transfers, as well as scheduling rolling updates and maintenance tasks. Resource Management: Resource management primarily refers to the efficient and effective use of a cluster’s resources based on the re- source requirements of the queries or applications running in the sys- tem. In addition, many systems today offer elastic properties that allow users to dynamically add or remove resources as needed according to workload requirements. Fault Tolerance: Machine failures are relatively common in large clus- ters. Hence, most systems have built-in fault tolerance functionalities that would allow them to continue providing services, possibly with graceful degradation, in the face of undesired events like hardware fail- ures, software bugs, and data corruption. Examples of typical fault tolerance features include restarting failed tasks either due to appli- cation or hardware failures, recovering data due to machine failure or corruption, and using speculative execution to avoid stragglers. System Administration: System administration refers to the activ- ities where additional human effort may be needed to keep the system running smoothly while the system serves the needs of multiple users and applications. Common activities under system administration in- clude performance monitoring and tuning, diagnosing the cause of poor performance or failures, capacity planning, and system recovery from permanent failures (e.g., failed disks) or disasters. 1.4 Related Work This monograph is related to a few surveys done in the past. Lee and others have done a recent survey that focuses on parallel data process- ing with MapReduce [136]. In contrast, we provide a more comprehen-

12.1.4. Related Work 9 sive and in-depth coverage of systems for large-scale data analytics, and also define a categorization of these systems. Empirical compar- isons have been done in the literature among different systems that we consider. For example, Pavlo and others have compared the perfor- mance of both classic parallel databases and columnar databases with the performance of MapReduce systems [166]. Tutorials and surveys have appeared in the past on specific dimen- sions along which we describe and compare each of the four categories of systems for large-scale data analytics. Recent tutorials include one on data layouts and storage in MapReduce systems [79] and one on pro- gramming techniques for MapReduce systems [174]. Kossmann’s survey on distributed query processing [128] and Lu’s survey on query process- ing in classic parallel databases [142] are also related.

13. 2 Classic Parallel Database Systems The 1980s and early 1990s was a period of rapid strides in the technol- ogy for massively parallel processing. The initial drivers of this technol- ogy were scientific and engineering applications like weather forecast- ing, molecular modeling, oil and gas exploration, and climate research. Around the same time, several businesses started to see value in ana- lyzing the growing volumes of transactional data. Such analysis led to a class of applications, called decision support applications, which posed complex queries on very large volumes of data. Single-system databases could not handle the workload posed by decision support applications. This trend, in turn, fueled the need for parallel database systems. Three architectural choices were explored for building parallel database systems: shared memory, shared disk, and shared nothing. In the shared-memory architecture, all processors share access to a com- mon central memory and to all disks [76]. This architecture has limited scalability because access to the memory quickly becomes a bottleneck. In the shared-disk architecture, each processor has its private memory, but all processors share access to all disks [76]. This architecture can become expensive at scale because of the complexity of connecting all processors to all disks. 10

14.2.1. Data Model and Interfaces 11 Figure 2.1: Shared-nothing architecture for parallel processing. The shared-nothing architecture has proved to be the most viable at scale over the years. In the shared-nothing architecture, each proces- sor has its own private memory as well as disks. Figure 2.1 illustrates the shared-nothing architecture used in parallel database systems. Note that the only resource shared among the processors is the communica- tion network. A number of research prototypes and industry-strength parallel database systems have been built using the shared-nothing architec- ture over the last three decades. Examples include Aster nCluster [25], Bubba [41], Gamma [75], Greenplum [99], IBM DB2 Parallel Edi- tion [33], Netezza [116], Oracle nCUBE [48], SQL Server Parallel Data Warehouse [177], Tandem [85], and Teradata [188]. 2.1 Data Model and Interfaces Most parallel database systems support the relational data model. A relational database consists of relations (or, tables) that, in turn, consist of tuples. Every tuple in a table conforms to a schema which is defined by a fixed set of attributes [76]. This feature has both advantages and disadvantages. The simplicity of the relational model has historically played an important role in the success of parallel database systems. A well-defined schema helps with cost-based query optimization and to keep data error-free in the face

15.12 Classic Parallel Database Systems of data-entry errors by humans or bugs in applications. At the same time, the relational data model has been criticized for its rigidity. For example, initial application development time can be longer due to the need to define the schema upfront. Features such as support for JSON and XML data as well schema evolution reduce this disadvantage [71]. Structured Query Language (SQL) is the declarative language most widely used for accessing, managing, and analyzing data in parallel database systems. Users can specify an analysis task using an SQL query, and the system will optimize and execute the query. As part of SQL, parallel database systems also support (a) user-defined functions, user-defined aggregates, and stored procedures for specifying analysis tasks that are not easily expressed using standard relational-algebra operators, and (b) interfaces (e.g., ODBC, JDBC) for accessing data from higher-level programming languages such as C++ and Java or graphical user interfaces. 2.2 Storage Layer The relational data model and SQL query language have the crucial benefit of data independence: SQL queries can be executed correctly irrespective of how the data in the tables is physically stored in the system. There are two noteworthy aspects of physical data storage in parallel databases: (a) partitioning, and (b) assignment. Partitioning a table S refers to the technique of distributing the tuples of S across disjoint fragments (or, partitions). Assignment refers to the technique of distributing these partitions across the nodes in the parallel database system. 2.2.1 Table Partitioning Table partitioning is a standard feature in database systems today [115, 155, 185, 186]. For example, a sales records table may be partitioned horizontally based on value ranges of a date column. One partition may contain all sales records for the month of January, another partition may contain all sales records for February, and so on. A table can also be partitioned vertically with each partition containing a subset of

16.2.2. Storage Layer 13 Uses of Table Partitioning in Database Systems Efficient pruning of unneeded data during query processing Parallel data access (partitioned parallelism) during query pro- cessing Reducing data contention during query processing and adminis- trative tasks. Faster data loading, archival, and backup Efficient statistics maintenance in response to insert, delete, and update rates. Better cardinality estimation for subplans that ac- cess few partitions Prioritized data storage on faster/slower disks based on access patterns Fine-grained control over physical design for database tuning Efficient and online table and index defragmentation at the par- tition level Table 2.1: Uses of table partitioning in database systems columns in the table. Vertical partitioning is more common in columnar database systems compared to the classic parallel database systems. We will cover vertical partitioning in Chapter 3. Hierarchical combinations of horizontal and vertical partitioning may also be used. Table 2.1 lists various uses of table partitioning. Apart from giving major performance improvements, partitioning simplifies a number of common administrative tasks in database systems [9, 201]. The growing usage of table partitioning has been accompanied by efforts to give applications and users the ability to specify partitioning conditions for tables that they derive from base data. SQL extensions from database vendors now enable queries to specify how derived tables are partitioned (e.g., [92]). The many uses of table partitioning have created a diverse mix of partitioning techniques used in parallel database systems. We will illustrate these techniques with an example involving four tables: R(a, rdata), S(a, b, sdata), T (a, tdata), U (b, udata). Here, a is an in- teger attribute and b is a date attribute. These two attributes will be

17.14 Classic Parallel Database Systems Figure 2.2: A hierarchical partitioning of table S used as join keys in our examples. rdata, sdata, tdata, and udata are respectively the data specific to each of the tables R, S, T , and U . Figure 2.2 shows an example partitioning for table S. S is range- partitioned on ranges of attribute a into four partitions S1 -S4 . Partition S1 consists of all tuples in S with 0 ≤ a < 20, S2 consists of all tuples in S with 20 ≤ a < 40, and so on. Each of S1 -S4 is further range- partitioned on ranges of attribute b. Thus, for example, partition S11 consists of all tuples in S with 0 ≤ a < 20 and 01 − 01 − 2010 ≤ b < 02 − 01 − 2010. Range partitioning is one among multiple partitioning techniques that can be used [53, 76, 108]: • Hash partitioning, where tuples are assigned to tables based on the result of a hash function applied to one or more attributes. • List partitioning, where the unique values of one or more at- tributes in each partition are specified. For example, a list parti- tioning for the example table S may specify that all tuples with a ∈ 1, 2, 3 should be in partition S1 . • Random (round-robin) partitioning, where tuples are assigned to tables in a random (round-robin) fashion. • Block partitioning, where each consecutive block of tuples (or bytes) written to a table forms a partition. For example, par-

18.2.2. Storage Layer 15 Figure 2.3: Partitioning of tables R, S, T , U . Dotted lines show partitions with potentially joining records tition S1 may consist of the first 1000 tuples inserted into S, S2 may consist of the next 1000 tuples inserted into S, and so on. Figure 2.3 shows how the partitioning of table S can be interpreted as a two-dimensional partitioning. The figure also shows partitions for tables R, T , and U . The dotted lines in the figure show the join relation- ships between pairs of partitions. These relationships become important when we talk about assignment in §2.2.2. Figure 2.3 also shows how the sizes of partitions in a table may not be uniform. Such skewed partition sizes can arise for a number of reasons. Hash or equi-range partitioning produces skewed partition sizes if the attribute(s) used in the partitioning function has a skewed distribution. Skewed partition sizes may also come from data loading and archival needs. For example, Table U in Figure 2.3 is partitioned using unequal ranges on b. 10-day ranges are used for recent partitions of U . Older data is accessed less frequently, so older 10-day partitions of U are merged into monthly ones to improve query performance and archival efficiency.

19.16 Classic Parallel Database Systems 2.2.2 Partition Assignment Partition assignment is the task of deciding which node or nodes in the parallel database system should store each partition of the tables in the database. Three factors are usually considered as part of partition assignment: degree of declustering, collocation, and replication. Degree of Declustering The degree of declustering for a table specifies the number of nodes that store one or more partitions of the table. With full declustering, the degree of declustering is equal to the number of nodes in the system [75]. That is, tuples in a fully declustered table are spread over all nodes in the system. Otherwise, the table is said to be partially declustered. Partial declustering is typically accompanied by the creation of nodegroups [33]. (Nodegroups are called relation clusters in [114].) A nodegroup is a collection of nodes in the parallel database system. Each nodegroup can be referenced by name. Consider the following example (with syntax borrowed from [33]) for a parallel database system with 10 nodes numbered 1-10: CREATE NODEGROUP GROUP_A ON NODES(1 TO 10); CREATE NODEGROUP GROUP_B ON NODES(2, 4, 6, 8, 10); CREATE TABLE R (a integer, rdata char[100]) IN GROUP_A PARTI- TIONING KEY (a) USING HASHING; CREATE TABLE S (a integer, b integer, sdata char[100]) IN GROUP_B PARTITIONING KEY (a) USING HASHING; CREATE TABLE T (a integer, tdata char[100]) IN GROUP_B PARTITION- ING KEY (a) USING HASHING; In this example, the database administrator chose to create two nodegroups—GROUP_A and GROUP_B—for the cluster of 10 nodes in the parallel database system. Group A consists of all 10 nodes while Group B consists of 5 nodes. Partitions of table R are stored on Group_A, so table R is fully declustered. Partitions of tables S and T are stored on Group_B, so these tables are partially declustered. Full declustering can benefit query processing over very large tables. For example, table R may be very large. In this case, a query that does

20.2.2. Storage Layer 17 grouping and aggregation on R.a can be processed in parallel using all 10 nodes, without needing to move any data among the nodes. Collocation It is sometimes beneficial to have selective overlap among the nodes on which the partitions of two or more tables are stored. Consider the example in §2.2.2 where partitions of tables S and T are both stored on the nodegroup Group_B. Also, note that both tables are hash partitioned on the respective attribute a. If the same hash function and number of partitions are chosen for both tables, then there will be a one-to-one correspondence between the partitions of both tables that will join with one another. In this case, it is possible to collocate the joining partitions of both tables. That is, any pair of joining partitions with be stored on the same node of the nodegroup. The advantage of collocation is that tables can be joined without the need to move any data from one node to another. However, col- location of joining tables can be nontrivial when there are complex join relationships. Consider our four example tables and their join re- lationships shown in Figure 2.3. In this context, consider the following example three-way-join query that joins tables R, S, and U . Select * From R, S, U Where R.a = S.a and S.b = U.b Suppose each partition can be stored only on one node in the parallel database system. In this case, the only way to collocate all pairs of joining partitions in the three tables is to store all partitions on a single node of the system. Such an assignment—where all three tables have a degree of declustering equal to one—would be a terrible waste of the resources in the parallel database. Replication As we saw in the above example, the flexibility of assignment is limited if tuples in a table are stored only once in the system. This problem can

21.18 Classic Parallel Database Systems be addressed by replicating tuples on multiple nodes. Replication can be done at the level of table partitions, which will address the problem in the above example. For example, if table U is small, then partitions of U can be replicated on all nodes where partitions of S are stored. Replication can be done at the table level such that different replicas are partitioned differently. For example, one replica of the table may be hash partitioned while another may be range partitioned. Apart from performance benefits, replication also helps reduce unavailability or loss of data when faults arise in the parallel database system (e.g., a node fails permanently or becomes disconnected temporarily from other nodes due to a network failure). The diverse mix of partitioning, declustering, collocation, and repli- cation techniques available can make it confusing for users of parallel database systems to identify the best data layout for their workload. This problem has motivated research on automated ways to recommend good data layouts based on the workload [151, 169]. 2.3 Execution Engine To execute a SQL query quickly and efficiently, a parallel database system has to break the query into multiple tasks that can be exe- cuted across the nodes in the system. The system’s execution engine is responsible for orchestrating this activity. In most parallel database systems, each submitted query is handled by a coordinator task. The coordinator first invokes a query optimizer in order to generate a execution plan for the query. An execution plan in a parallel database system is composed of operators that support both intra-operator and inter-operator parallelism, as well as mechanisms to transfer data from producer operators to consumer operators. The plan is broken down into schedulable tasks that are run on the nodes in the system. The coordinator task is responsible for checking whether the plan completed successfully, and if so, transferring the results produced by the plan to the user or application that submitted the query. In this section, we will describe the components of a parallel execu- tion plan. The next two sections will discuss respectively the techniques

22.2.3. Execution Engine 19 used to select a good execution plan and to schedule the tasks in the plan. 2.3.1 Parallel Query Execution Consider a query that joins two tables R and S based on the equi-join condition R.a = S.a. In §2.2.2, we introduced a collocated join operator that can perform the join if tables R and S are both partitioned and the partitions are assigned such that any pair of joining partitions is stored on the same node. A collocated join operator is often the most efficient way to perform the join because it performs the join in parallel while avoiding the need to transfer data between nodes. However, a collocated join is only possible if the partitioning and assignment of the joining tables is planned in advance. Suppose the tables R and S are partitioned identically on the join- ing key, but the respective partitions are not collocated. In this case, a directed join operator can be used to join the tables. The directed join operator transfers each partition of one table (say, R) to the node where the joining partition of the other table is stored. Once a partition from R is brought to where the joining partition in S is stored, a local join can be performed. Directed joins are also possible when the tables are not partitioned identically. For example, directed joins are possible for R S, T S, and U S in Figure 2.3 despite the complex partitioning techniques used. Compared to a collocated join, a directed join incurs the cost of transferring one of the tables across the network. If the tables R and S are not partitioned identically on the joining attribute, then two other types of parallel join operators can be used to perform the join: repartitioned join operator and broadcast join (or fragment-and-replicate join) operator. The repartitioned join operator simply repartitions the tuples in both tables using the same partitioning condition (e.g., hash). Joining partitions are brought to the same node where they can be joined. This operator incurs the cost of transferring both the tables across the network. The broadcast join operator transfers one table (say, R) in full to every node where any partition of the other table is stored. The join

23.20 Classic Parallel Database Systems is then performed locally. This operator incurs a data transfer cost of size_of (R) × d, where size_of (R) is the size of R and d is the degree of declustering of S. Broadcast join operators are typically used when one table is very small. As discussed in §2.2.1, partition sizes may be skewed due to a num- ber of reasons. The most common case is when one or both tables contain joining keys with a skewed distribution. The load imbalance created by such skew can severely degrade the performance of join op- erators such as the repartitioned join. This problem can be addressed by identifying the skewed join keys and handling them in special ways. Suppose a join key with value v has a skewed distribution among tuples of a table R that needs to be joined with a table S. In the regular repartitioned join, all tuples in R and S with join key equal to v will be processed by a single node in the parallel database system. Instead, the tuples in R with join key equal to v can be further partitioned across multiple nodes. The correct join result will be produced as long as the tuples in S with join key equal to v are replicated across the same nodes. In this fashion, the resources in multiple nodes can be used to process the skewed join keys [77]. While our discussion focused on the parallel execution of joins, the same principles apply to the parallel execution of other relational op- erators like filtering and group-by. The unique approach used here to extract parallelism is to partition the input into multiple fragments, and to process these fragments in parallel. This form of parallelism is called partitioned parallelism [76]. Two other forms of parallelism are also employed commonly in exe- cution plans in parallel database systems: pipelined parallelism and in- dependent parallelism. A query execution plan may contain a sequence of operators linked together by producer-consumer relationships where all operators can be run in parallel as data flows continuously across ev- ery producer-consumer pair. This form of parallelism is called pipelined parallelism. Independent parallelism refers to the parallel execution of indepen- dent operators in a query plan. For example, consider the following query that joins the four tables R, S, T , and U :

24.2.3. Execution Engine 21 Select * From R, S, T, U Where R.a = S.a and S.a = T.a and S.b = U.b This query can be processed by a plan where R is joined with T , S is joined with U , and then the results of both these joins are joined together to produce the final result. In this plan, R T and S U can be executed independently in parallel. 2.3.2 Abstractions for Data Transfer In a parallel query execution plan, interprocess data transfer may be needed between producer and consumer operators or between the dif- ferent phases of a single operator (e.g., between the partition and join phases of a repartitioned join operator). Straightforward techniques for interprocess data transfer include materializing all intermediate tuples to disk or directly using the operating system’s interprocess commu- nication (IPC) mechanisms for each point-to-point data transfer. Such techniques, while easy to implement, have some drawbacks. They may incur high overhead or complicate the problem of finding good execu- tion plans and schedules. Parallel database systems have developed rich abstractions for in- terprocess data transfers in parallel query execution plans. We will de- scribe three representative abstractions: (a) split tables from Gamma [75], (b) table queues from IBM DB2 Parallel Edition [33], and (c) ex- change operators from Volcano [97]. Tuples output by an operator in a query plan in Gamma are routed to the correct destination by looking up a split table [75]. Thus, a split table maps tuples to destination tasks. For example, a split table that is used in conjunction with a repartitioned join of tables R and S may route tuples from both tables based on a common hash partitioning function. IBM DB2 Parallel Edition’s table queues are similar to split tables but provide richer functionality [33]. Table queues provide a buffering mechanism for tuples output by a producer task before the tuples are processed by a consumer task. Based on the rate at which the consumer can process tuples, tuple queues enable the rate at which the producer

25.22 Classic Parallel Database Systems produces tuples to be controlled. A table queue can have multiple pro- ducer tasks as well as consumer tasks. Furthermore, table queues can implement multiple communication patterns such as broadcasting ver- sus directed (i.e., does a tuple produced by a producer task go to all consumer tasks or to a specific one?). The exchange operator has a more ambitious goal beyond serving as the abstraction of interprocess data transfer in Volcano’s query execu- tion plans [97]. Exchange is a meta-operator that encapsulates all issues related to parallel execution (e.g., partitioning, producer-consumer syn- chronization) and drives parallel execution in Volcano. The benefit is that query execution plans in Volcano can leverage different forms of parallelism while still using the simpler implementations of relational operators from centralized databases. 2.4 Query Optimization The following steps are involved in the execution of a SQL query in a parallel database system: • Plan selection: choosing an efficient execution plan for the query. • Scheduling: Generating executable tasks for the operators and data transfers in the plan, and choosing the nodes where these tasks will run. • Resource allocation: Determining how much CPU, memory, and I/O (both local I/O and network I/O) resources to allocate to the executable tasks in the plan. Plan selection, which is commonly referred to as query optimization, is the focus of this section. Scheduling and resource allocation will be covered respectively in the next two sections. Plan selection for a query mainly involves the following steps: • Ordering and choosing join operators. • Access path selection for the tables.

26.2.4. Query Optimization 23 • Choosing the appropriate cost metric (e.g., completion time ver- sus total resource usage) based on which the best plan for the query can be determined. The early approaches to plan selection in a parallel database sys- tem used a two-phase approach [113, 104]. First, a cost-based query optimizer—like the System R optimizer developed for centralized database systems [26, 173]—was used to find the best serial plan [113]. Problems like join ordering and access path selection were addressed as part of the selection of the best serial plan. Then, a post-optimization phase was applied to transform this serial plan into a parallel exe- cution plan. Decisions made in the post-optimization phase included which attributes to repartition tables on such that data transfer costs were minimized [104]. While the two-phase approach showed initial promise—especially in shared-memory systems—it quickly became apparent that this ap- proach can miss good plans in shared-nothing systems. For example, consider the four-way join example query introduced in §2.3.1. The best serial plan for the query may use the left-deep serial join order: ((R S) T) U . Since a serial plan assumes that the plan will run on a single node where all the data resides, the plan’s cost is influ- enced primarily by factors like sort orders and the presence of indexes. However, the best join order for parallel execution could be different and bushy: (R T ) (S U ). Such a plan can be very efficient if R and T (and respectively, S and U ) are stored in a collocated fashion. Recall that collocated joins are very efficient because they can leverage parallelism without incurring data transfer costs for the join processing. As the above example shows, parallel database systems select query execution plans from a large plan space. The different forms of parallelism—partitioned, pipelined, and independent—motivate searching for the best plan from a larger set of operators as well as join trees compared to query execution in a centralized database sys- tem. The use of semi-joins and partition-wise joins further increases the size of the plan space for parallel execution plans [63, 108]. Semi-joins can act as size reducers (similar to a selection) such that the total size of tuples that need to be transferred is reduced. The use of

27.24 Classic Parallel Database Systems semi-joins is based on the following set of equivalences for the equi-join of two tables R and S on the join key a: R a S ≡ (R a S) a S ≡R a (S a R) ≡ (R a S) a (S a R) The use of a semijoin is beneficial if the cost to produce and transfer it is less than the cost of transferring the whole table to do the join. However, now the plan space is significantly larger because of the many combinations in which joins and semi-join reducers can be used in a plan [63]. Recall from §2.2.1 that many types of partitioning techniques are used in parallel database systems. These techniques further increase the search space during plan selection over partitioned tables. Consider an example query Q1 over the partitioned tables R, S, and T shown in Figure 2.3. Q1 : Select * From R, S, T Where R.a = S.a and S.a = T.a and S.b ≥ 02-15-10 and T.a < 25 Use of filter conditions for partition pruning: The partitions T4 - T8 and S11 , S21 , S31 , S41 can be pruned from consideration because it is clear from the partitioning conditions that tuples in these partitions will not satisfy the filter conditions. Partition pruning can speed up query performance drastically by eliminating unnecessary table and index scans as well as reducing data transfers, memory needs, disk spills, and resource-contention-related overheads. Use of join conditions for partition pruning: Based on a transi- tive closure of the filter and join conditions, partition pruning can also eliminate partitions S32 , S33 , S42 , S43 , R3 , R4 , and U1 . The use of partition pruning will generate a plan like Q1 P1 shown in Figure 2.4. In such plans, the leaf operators logically append together (i.e., UNION ALL) the unpruned partitions for each table. Each un- pruned partition is accessed using regular table or index scans. The appended partitions are joined using one of the parallel join operators such as the collocated or repartitioned join operators.

28.2.4. Query Optimization 25 Figure 2.4: Q1 P1 and Q1 P2 are plans that can be generated by optimizers in parallel database systems for our example query Q1 . IS and TS are respectively index and table scan operators. HJ and MJ are respectively parallel join operators based on hash and merge. Union is a bag union operator. Partition-aware join path selection: Depending on the data prop- erties and data layout in the parallel database system, a plan like Q1 P2 shown in Figure 2.4 can significantly outperform plan Q1 P1 [108]. Q1 P2 exploits certain properties arising from partitioning in the given setting: • Tuples in partition R1 can join only with S12 ∪ S13 and T1 ∪ T2 . Similarly, tuples in partition R2 can join only with S22 ∪ S23 and T3 . Thus, the full R S T join can be broken up into smaller and more efficient partition-wise joins where the best join order for R1 (S12 ∪ S13 ) (T1 ∪ T2 ) can be different from that for R2 (S22 ∪ S23 ) T3 . One likely reason is change in the data properties of tables S and T over time, causing variations in statistics across partitions. • The best choice of join operators for R1 (S12 ∪ S13 ) (T1 ∪ T2 ) may differ from that for R2 (S22 ∪S23 ) T3 , e.g., due to storage or physical design differences across partitions (e.g., index created on one partition but not on another). Apart from the large size of the plan space, an additional challenge while selecting parallel query plans comes from the cost model used to estimate the cost of plans. The cost metric introduced by the classic System R query optimizer measures the total amount of work done during plan execution [173].

29.26 Classic Parallel Database Systems In a serial plan execution, the total work also corresponds to the latency of plan execution. (Latency measures the time to complete the plan execution.) However, the total work and latency metrics are different in the execution of a parallel plan. Unfortunately, the la- tency metric violates a fundamental assumption made in the dynamic- programming-based plan selection algorithm introduced by the System R query optimizer [93]. (Example 3 in [93] gives an illustration of how the latency metric violates the principle of optimality needed by the dynamic-programming-based plan selection algorithm in System R.) The optimal parallel execution plan for the latency metric may not be composed of optimal subplans for the same metric. Because of the complexity posed by the plan space and cost metrics, a variety of plan selection algorithms have been proposed for paral- lel database systems. Deterministic algorithms proposed include those based on dynamic programming [93], flow optimization [74], as well as greedy approaches [143]. Randomized algorithms proposed include those based on simulated annealing [134] and genetic search [179]. Finally, errors can arise while estimating plan execution costs be- cause of reasons such as unknown data properties or uncertain resource availability. This problem is more acute in parallel database systems compared to centralized database systems. Adaptive plan selection has been proposed as a way to address this problem [103]. 2.5 Scheduling As part of the execution of a parallel query plan, the plan is broken down into a set of executable tasks. These tasks are scheduled to run on the nodes of the parallel database system. The tasks usually form a directed acyclic graph (DAG) based on producer-consumer relation- ships. Subject to the possibility of pipelined parallel execution, a task is ready to run after all its ancestor tasks in the DAG are complete. Two common approaches are used to decide which node to sched- ule a task on. In the first approach, this decision is made during plan selection. The decision is often simplified because shared-nothing par- allel database systems are predominantly based on function shipping