Integrating Real-Time and Batch Processing in a Poly store

本文描述了一种叫做s-store的流处理引擎,以及它在大狗多商店中的作用。从根本上说,s-store是一个前端处理器,它接受来自多个来源的输入,并将其按摩成一种消除错误(数据清理)的形式,并将输入转换成一种可以有效消化成大狗的形式。s-store还充当智能路由器,将输入元组发送到大狗的适当组件。对s-store共享内存的所有更新都是以事务一致(酸)的方式进行的,从而消除了非同步读和写导致的新错误。将数据从大狗的组件转移到组件的能力是至关重要的。我们已经描述了一个从s-store迁移到发布GREs的人,我们已经实现了作为概念的第一个证明。我们报告了一些有趣的结果,使用此迁移器影响查询计划的评估。
展开查看详情

1. Integrating Real-Time and Batch Processing in a Polystore John Meehan, Stan Zdonik Nesime Tatbul Adam Dziedzic, Aaron Elmore Shaobo Tian, Yulong Tian Intel Labs & MIT University of Chicago Brown University tatbul@csail.mit.edu {ady,aelmore}@cs.uchicago.edu {john,sbz}@cs.brown.edu Abstract—This paper describes a stream processing engine We envision an architecture where all new data enters called S-Store and its role in the BigDAWG polystore. Funda- the polystore as a stream. Streams can arrive from multiple mentally, S-Store acts as a frontend processor that accepts input sources, at high rates, and must be reliably and scalably from multiple sources, and massages it into a form that has eliminated errors (data cleaning) and translates that input into ingested into the system on a continuous basis. During this a form that can be efficiently ingested into BigDAWG. S-Store ingestion phase, various transformations that prepare the data also acts as an intelligent router that sends input tuples to the for more sophisticated querying and storage can be applied. appropriate components of BigDAWG. All updates to S-Store’s For example, raw input streams may be merged, ordered, shared memory are done in a transactionally consistent (ACID) cleaned, normalized, formatted, or enriched with existing way, thereby eliminating new errors caused by non-synchronized reads and writes. The ability to migrate data from component to metadata. The resulting streams can then be used as inputs for component of BigDAWG is crucial. We have described a migrator immediate, real-time analytics (e.g., detecting real-time alerts) from S-Store to Postgres that we have implemented as a first and can be loaded to one or more backend storage systems proof of concept. We report some interesting results using this for longer-term, batch analytics. Meanwhile, the polystore migrator that impact the evaluation of query plans. continues to process interactive queries that may involve both newly ingested and older data. Therefore, it is highly important I. I NTRODUCTION that these queries can see a complete and consistent view of the data in a timely manner. Big data problems are commonly characterized along multi- We believe that a stateful stream processing system with ple dimensions of complexity including volume, velocity, and transactional guarantees and multi-node scalability support is variety. Earlier system solutions focused on each individual di- a good fit for addressing the real-time processing requirements mension separately, targeting different classes of computations of a polystore system discussed above. S-Store, the first or data types (e.g., batch/OLAP [1] vs. real-time/streaming such system that we have been building at the ISTC for [2], graphs [3] vs. arrays [4]). This led to a heterogeneous Big Data [13], [14], [15], has been designed for streaming ecosystem, which has become difficult to manage for its users applications with shared mutable state, such as real-time ETL. in terms of programming effort and performance optimization. Each ETL workflow can be represented as a dataflow graph Furthermore, large-scale big data applications rarely involve consisting of ACID transactions as nodes and streams flowing a single type of data or computation (e.g., [5]). As a result, between them as edges. Input streams chunked into well- integrated architectures (e.g., [6], [7]) and new hybrid systems defined atomic batches are processed through these dataflows (e.g., [8], [9], [10], [11]) have started emerging. in an orderly and fault-tolerant manner. The resulting output The polystore architecture and its first reference imple- batches can then be incrementally loaded to backend stores mentation BigDAWG represent a comprehensive solution for with transactional guarantees. Furthermore, S-Store has its federated querying over multiple storage engines, each possi- own in-memory storage engine that can handle adhoc queries bly with a different data and query model or storage format, and traditional OLTP transactions over shared tables, thereby optimized for a different type of workload [12]. One of the providing consistent and fast access to most recent data and main design principles of BigDAWG is that it tightly integrates materialized views derived from it. S-Store’s fast transactional real-time and batch processing, enabling seamless and high- store feature also enables unique optimizations in the Big- performance querying over both fresh and historical data. In DAWG polystore such as caching and anti-caching. Similarly, this paper, we describe how we realize this principle using BigDAWG’s extensible, island-based architecture enables the a novel transactional stream processing system called S-Store use of S-Store together with other messaging or streaming [13]. We illustrate several important roles a streaming system systems in a federated manner if needed. such as S-Store can generally play in the heterogeneous setting In the rest of this paper, we first provide a more detailed of a polystore architecture such as BigDAWG, and provide an background on BigDAWG, S-Store, and the basic querying overview of our ongoing research and preliminary results in and migration primitives that we implemented to integrate this area. them together. We then discuss our ongoing research topics 978-1-5090-3525-0/16/$31.00 ©2016 IEEE

2. Fig. 2. A Simple S-Store Dataflow Graph ordered data in near-real-time [16], [17], [18]. These systems Fig. 1. BigDAWG 1.0 Architecture chained variants of standard relational operators that had been altered to handle unbounded streams. Due to the real-time and long-term motivations. Finally, we present preliminary nature of these systems, latency was of the highest priority, experimental results evaluating the initial performance of the since the value of the results often degraded with time. As a querying and migration primitives. result, disk access was minimized wherever possible, and in II. BACKGROUND our view early streaming systems did not properly address storage-related issues. Streaming applications often require A. BigDAWG Architecture support for storage and historical queries, in which case an The BigDAWG polystore system unifies multiple systems additional data storage engine needs to be used for strong data with varying use cases and storage requirements under a single consistency guarantees. architecture [12]. It is founded on the idea that “one size does S-Store satisfies both requirements, providing low-latency, not fit all,” and thus unifying many different storage engines push-based processing seamlessly integrated with ACID data is the best method of handling a variety of specific tasks. The management. To accomplish this, S-Store ensures that all state, architecture of BigDAWG is illustrated in Figure 1. be it stream state, windows, or relational tables, may only Because these systems use very different query languages, be accessed within the context of a transaction. Streaming BigDAWG uses a construct called islands of information, workloads are divided into dataflow graphs, directed acyclic which are front-facing abstractions including a query language, graphs of disparate stored procedures (or SPs), atomic units data model, and shims, a set of connections to the underlying of processing attached to both an input and output stream. storage engines. Each island represents a category of database A transaction execution (or TE) is defined as the execution systems; for example, a relational island may contain tradi- of a stored procedure on an incoming atomic batch of input tional database systems such as Postgres or MySQL, while an tuples. Each batch contains its own unique batch-id, which array island may contain multi-dimensional array databases determines the order in which they can be processed. As a such as SciDB [4]. Individual systems may be included in transaction execution commits, its output tuples are given the multiple islands, if they fall under multiple categories. same batch-id and are placed onto its output stream. These Individual systems under BigDAWG may migrate data can then serve as an input batch of the downstream stored between one another using a cast operator. These operators procedure of the dataflow graph. transform the data from the host system into a serialized A very simple dataflow graph is shown in Figure 2. Here binary format which can then be interpreted by the destination we have two stored procedures, SP 1 and SP 2, linked by a system. The cast operator allows for an efficient method of stream. SP 1 takes as input batch b1, labeled A(b1), and starts moving data to a specific storage engine that may be more a transaction. This transaction produces B(b1) as an output. efficient at carrying out the operation. For instance, if the user SP 2 then takes B(b1) as input, and produces a transaction that is looking to join a table from Postgres to a SciDB array, has C(b1) as an output. While A(b1), B(b1), and C(b1) all it may be most efficient to migrate the array into Postgres have the same batch-id (b1), they will contain different tuples (transforming it in the process) and perform the join there. (transformations on the original batch). Streaming systems play a very unique role in BigDAWG, as S-Store’s contributions are best summarized through the they manage any stream data that is pushed to the polystore. combination of its three data processing guarantees: They perform continuous queries on all new data, and push 1. ACID guarantees for individual transactions (both OLTP information and notifications to a variety of output sources. and streaming). Like conventional OLTP systems, each All streaming systems fall under a streaming island, which is transaction (OLTP or streaming) takes the database from further described in Section III-A. We believe that streaming one consistent state to another. systems also have an important role in data ingestion in the 2. Ordered Execution guarantees for dataflow graphs of general case, which we describe in Section III-B. streaming transactions. Streaming data contains an inherent order, and processing on streaming data requires multiple B. S-Store consecutive steps. Streaming transactions must be sched- Traditional stream processing systems were first created uled in a way that preserves those orderings. over a decade ago with the purpose of handling ever-changing 3. Exactly-Once Processing guarantees for streams (i.e., no 978-1-5090-3525-0/16/$31.00 ©2016 IEEE

3. loss or duplication). This is particularly relevant in the case of failure to ensure that the same stored procedure does not execute on the same batch multiple times, and no transactions are lost in the failure. Because lightweight transactions are a must for a streaming system with ACID guarantees, S-Store is built on top of H- Store, a high-throughput main-memory OLTP engine [19]. S- Store inherits the distributed, shared-nothing architecture of H-Store, as well as its command log recovery mechanism. (a) Pseudo-SQL of DimTrade Ingestion [21] All state (including streams and windows) are implemented as relational tables. Dataflow graphs of stored procedures are implemented via partition engine triggers, or PE triggers. When a new batch of tuples is inserted onto a stream with a PE trigger, a new transaction execution of the downstream stored procedure is invoked on that batch. A streaming scheduler ensures that S-Store’s ordering guarantees are maintained while coordinating parallel processing on as many transactions as possible. S-Store fills two roles within the BigDAWG polystore. As a streaming system that ingests data and runs continuous queries, (b) Dataflow Graph of DimTrade Ingestion S-Store clearly fits under the streaming island. However, Fig. 3. Ingestion of DimTrade Table in TPC-DI Benchmark because S-Store is SQL-based at its core, it is also able to serve as a main-memory OLTP engine for BigDAWG under traction of data from a variety of sources, 2) the transformation the relational island. Thus, S-Store uses shims to connect to of raw source data to match the structure of the data in the both islands, as illustrated in Figure 1. target system, and 3) the loading of the altered data into the C. Example Use-Cases target system [20]. 1) MIMIC: MIMIC II is an ICU data set containing clinical Traditionally, data integration involves loading flat files into data obtained from hospitals and physiological vital sign data a database system, allowing large quantities of data to be for ICU patients [5]. Due to the diverse nature of the data set, collected before bulk loading into the system. However, there MIMIC II is one initial use case for the BigDAWG Polystore are obvious latency benefits to instead ingesting the data as it System. arrives. One particularly interesting aspect of this data set is the time TPC-DI is a data integration benchmark created to measure series signal data representing patient vital signs. If this signal the performance of various enterprise-level integration solu- data is captured and analyzed in real-time, it is possible to tions [22]. The benchmark mimics a retail brokerage firm, perform interactive queries that simulate emergency situations. and focuses on extracting and combining data from a variety Using S-Store, it is trivial to construct a dataflow graph capable of sources and source formats (e.g. CSV, XML), transforming of detecting unusual shifts or patterns in the signal data. For them into one unified data model and loading the results into instance, if the weighted average of a patient’s Pulmonary a data store. Arterial Pressure (PAP) is detected to be under a specific While TPC-DI is designed with traditional ETL in mind, threshold, S-Store can create an alert for medical professionals. it can be easily modified to represent a streaming ETL More complicated queries, such as detecting irregular patterns workload instead. Take the queries associated with ingesting in ECG signal data, are also possible. the DimTrade table, for example (Figure 3(a)). If these tuples In addition to these real-time alerts, S-Store is capable of arrive in batches on a regular basis rather than a full flat file, cleaning and formatting incoming tuples for future ingestion then the queries can be modeled as a streaming workload. into a long-term storage engine. Oftentimes time-series data is Ordinarily, running the workload as a single transaction in a best stored in an array database such as SciDB, as it is easy to shared-nothing database would require one large distributed consider the patient, type of waveform, and time information transaction. This is because the queries involved retrieve data each as its own dimension. S-Store can transform incoming from multiple tables, each of which must be partitioned on tuples to suit the needs of SciDB, and use BigDAWG’s a different key. However, by dividing the process into five migration functionality to bulk load those tuples into disk- operations (Figure 3(b)), S-Store can instead perform the based array storage. operations incrementally, processing each tuple in five smaller 2) TPC-DI: Data integration is a requirement for any single-sited transactions while still providing the correct result. database system when ingesting new information, often re- This results in quicker access to incremental portions of the ferred to as Extraction-Transformation-Loading (ETL) pro- end result and provides opportunities for parallelism, while cesses. ETL processes are primarily responsible for 1) the ex- keeping correctness intact. 978-1-5090-3525-0/16/$31.00 ©2016 IEEE

4. III. O NGOING R ESEARCH Both S-Store and BigDAWG are ongoing research projects, with several areas of active development. Below we describe some areas of future work. A. The Role of Streaming Island in BigDAWG As with other data types, BigDAWG must be able to manage incoming streaming data, and should provide the user with a unified method of querying those data streams. In addition to S-Store, BigDAWG should be able to support other contemporary streaming data management systems such as Spark Streaming [23] or Apache Storm [24]. As is the case with other categories of data types, BigDAWG has need of Fig. 4. Streaming ETL Example a streaming island in order to manage the unique needs of streaming data. [20]. By instead using a streaming system to model the ETL Due to the nature of streaming data, the streaming island process, it is possible to do the data cleaning and migration must be substantially different than the islands described pre- as new data arrives rather than waiting to complete everything viously. While most islands are pull-based in nature, streaming in bulk. island is inherently push-based. Multiple data sources can Streaming ETL divides the data into a sequence of well- be connected to this streaming island, as well as multiple defined chunks of configurable size. Each chunk should be stream ingestion systems. One of the primary functions of processed and loaded as an atomic unit, i.e., partial chunks BigDAWG’s streaming island should be to direct streaming should not be visible to backend engines for querying. Further- data into the proper ingestion system(s). In this way, streaming more, chunks should be durably maintained until at least their island serves as a publish-subscribe messaging module, and backend loading is successfully completed and acknowledged should perhaps be partially implemented using an engine that by the target system. Transformations on the chunks may specializes in scalable messaging, such as Apache Kafka [25]. also require reading and writing shared tables in a concurrent The second functionality required by streaming island is the manner. Last but not least, for ensuring correct streaming ability to view and pass results from continuous queries. To semantics, processing chunks exactly once, in the right order is propagate the push-based nature of streams, streaming island required. Therefore, transactional processing support is crucial must be able to trigger other operations, including pull-based for streaming ETL. operations from non-streaming systems. One simple example Due to its transactional properties, S-Store in particular is of such an operation is a user-facing alert. Take, for instance, very well-suited to streaming ETL. Data ingestion naturally a MIMIC medical application that is monitoring heart rate in involves shared, mutable state, since references to previously real time. If conditions are met that indicate abnormalities in ingested state is required for many operations. For instance, the heart rate, the streaming application may need to send an one common operation in the relational data transformation alert to a doctor. process is to look up and populate a foreign key reference to In addition to the push-based functionality, other non- a related table. For this operation to succeed, the foreign key streaming systems may need to be able to poll the results of row must already be populated in the secondary table, and to a continuous query at any time. The streaming island should guarantee correctness, all state reads and writes must be done facilitate this as well, either by temporarily storing the results through ACID transactions. of the query, or simply serving as a pass-through for the pull- S-Store is scalable, and thus can serve as a streaming request to the appropriate streaming system. ingestion engine for many diverse systems under the Big- DAWG polystore. As data is transformed, it can then be B. Streaming ETL incrementally migrated in batches to the appropriate engine A polystore such as BigDAWG provides an opportunity to using BigDAWG’s cast operators. From beginning to end, the reconsider the entire data ingestion process. Historically, data entire process can be done as push-based operations, meaning ingestion and ETL is an under-served portion of data storage that the destination systems will automatically be fed new data and analytics, and we believe that there are improvements to as it becomes available. be made by integrating a streaming system into the process. Typically, ETL is performed in large batches. Data is collected C. Cross-System Data Movement and Caching throughout the day, and stored in flat files to be loaded all at once. A series of operations are then performed on this Cross-system data storage and management is an obvious incoming data in order to mold it into the data schema of the challenge in polystores. If the same data items are needed target system. One obvious drawback to this approach is that in separate queries, each of which can be best handled by the data is not available on the target system until an entire different systems, then where should those data items be batch has been collected, which can take hours or even days located for the best possible performance? 978-1-5090-3525-0/16/$31.00 ©2016 IEEE

5. Streaming ETL is a strong example of such a problem. Let’s consider a situation in which S-Store is performing streaming ETL for Postgres (illustrated in Figure 4). Ideally, S-Store is able to perform transformations on incoming data indepen- dently of Postgres. However, as mentioned in Section III-B, the transformation process will frequently require referencing existing data within the target system. An S-Store query that requires Postgres data can be executed in one of a few possible ways: (a) Migration Query Plan vs. UNION in Island Query Plan (i) Cross-System Query - The required data remains in Postgres. The S-Store query must be executed as a cross- system transaction that accesses the target data a single time, and immediately forgets it once the transaction commits. This is very expensive, especially if a similar query will be run in the near future. (ii) Replication - The required data is cached in S-Store from Postgres. After the copy is made, the S-Store query can be run locally and is inexpensive. However, maintaining the correctness of the S-Store cached copy is expensive in the event that the required data is modified in a Postgres query. (iii) Migration - The required data can be moved into S- Store (and removed from Postgres). The S-Store query can be run locally and is inexpensive, especially in the (b) Comparison for UNION Queries event of repeated queries on the same data. However, if a Postgres query requires access to the data, it will need Fig. 5. Migration Evaluation for UNION Queries to be run as a cross-system query. the ORDERS table lives in each system. To accomplish a As this example illustrates, there are three primary solu- full table scan on ORDERS, the tuples in S-Store must be tions to the data locality problem: replication, migration, and combined with the tuples from Postgres, effectively making cross-system querying. Each solution comes with benefits and the query ”(SELECT * FROM S-Store.ORDERS) UNION drawbacks, and the optimal approach will always depend on (SELECT * FROM Postgres.ORDERS).” Two methods of the specific case. One future avenue of research is exploring executing the UNION query (illustrated in Figure 5(a)) are: these trade-offs, and developing a cost-model which quantifies i. migrate the data from S-Store to Postgres, and perform the options and informs a query planner about which approach the UNION in Postgres, or is ideal for a given situation. ii. pull all resulting tuples into the Relational Island, and IV. I NITIAL R ESULTS perform the UNION there. This experiment was run on an Intel R XEON R processor The introduction of streaming ETL and cross-system with 40 cores running at 2.20 GHz. S-Store was deployed caching brings up important questions: what is the cost of in single-node mode. Migration from S-Store to Postgres is moving data between systems? Is it more expensive to peri- implemented as a binary-to-binary migration. It is assumed odically update a copy of the data in another system, or to that the pipe connection between the two systems is already pull data across systems each time it is needed? Does pushing open, and thus pipe set up time is ignored in the migration some of the query-planning into the island level improve cross- results. Queries are submitted to both systems via JDBC. A system query performance? total of 200,000 tuples were contained within the ORDERS In order to compare potential query plans, we have con- table, a percentage of which were stored in S-Store and the structed a simple experiment that compares two query plans rest in Postgres. for a UNION query. Let’s assume that S-Store is being used As can be seen in Figure 5(b), the most efficient query as an ingestion engine for the ORDERS table of the TPC- plan depends on the amount of data being held in the S-Store C workload, eventually migrating tuples into Postgres 1 [26]. and Postgres tables. The cost of migrating tuples from S- A user wishes to run a full table scan on the ORDERS Store to Postgres increases linearly with the number of tuples table, using the simple query ”SELECT * FROM ORDERS.” being transferred. If 25% or fewer tuples are held in the S- Because S-Store is ingesting tuples into the ORDERS table Store table, then it is more efficient to migrate the tuples into and incrementally sending them to Postgres, a percentage of Postgres and do the UNION there. However, by executing in 1 While TPC-DI is a more realistic workload for streaming ETL than TPC- this way, the data from S-Store is effectively being moved C, results were unavailable as of this writing. twice: once to Postgres, and then again to the client. Thus, if 978-1-5090-3525-0/16/$31.00 ©2016 IEEE

6.more than 25% of the tuples are in S-Store, then it becomes is similar to kappa and Liquid in that all new input is handled faster to instead transfer all results to the Relational Island and by a streaming system, but our serving layer consists of a more do the UNION there. This has the added benefit of being able heterogeneous storage system. Also, our streaming system, S- to pull results from both systems in parallel. As a result, the Store, is a transactional streaming system with its own native optimal query performance for this approach falls at a 50/50 storage, which facilitates ETL. data distribution between S-Store and Postgres. There are a number of systems that have explicitly been There are additional aspects to consider with these prelimi- designed for handling hybrid workloads that include real-time nary results. For instance, if the query is repeated on a regular processing. Examples include Spark Streaming [23], Microsoft basis, then it becomes more efficient to migrate the tuples into Trill [9], and Google Dataflow [10]. These all support batch, Postgres, even if the initial migration is more expensive. In micro-batch, and streaming workloads in general, but in a the case of streaming ETL, incremental loading is an effective more homogeneous setting compared to S-Store. Also, being method of spreading the cost of the migration over time and analytical systems, they provide weaker transactional guaran- providing quicker access for Postgres queries. tees than S-Store. Also note that the UNION operator is relatively inexpensive to perform. In the case of UNIONing within the relational VI. C ONCLUSIONS island, result sets only need to be stored long enough to be In this paper, we described a polystore called BigDAWG concatenated and sent to the client. More complicated query and the role of a streaming engine in BigDAWG. We also plans, including joins and nesting, will increase the complexity describe S-Store as a particular streaming engine that has a of processing required within the Relational Island. It is likely sophisticated model of shared memory. S-Store supports trans- that it is more efficient to perform complex queries within a actional guarantees, making the system much more reliable for mature specialized system such as Postgres, even if it means managing shared state. This is important so that inconsistent migrating large amounts of data. We will explore these kinds updates do not work their way into the other storage systems of issues in more detail as part of our ongoing research. that constitute BigDAWG. V. R ELATED W ORK We have briefly described how S-Store can also act as an BigDAWG has many parallels with federated database sys- ETL system, providing services such as data cleaning, data in- tems like Garlic [27]. For example, in both cases, schema tegration, and efficient data loading. Data movement between mapping and data movement between sites are important fea- the component systems of a polystore is quite fundamental. tures. The main difference is that in a federated database, each Thus, we have concentrated on the topic of data migration in site (component) was autonomous. Each site had a different this paper. We have shown a simple experiment that supports owner with her own set of policies. It would not be possible to the idea that the cost of data migration depends strongly on permanently copy data from one system to another. BigDAWG the amount of data that is moved. is really a database built out of heterogeneous databases. There is a single owner who determines things like data placement VII. ACKNOWLEDGMENTS across systems. This research was funded in part by the Intel Science and ETL systems have been around for many years. S-Store Technology Center for Big Data, and by the NSF under grant should be responsible for this important function in Big- NSF IIS-1111423. DAWG. Traditional ETL is typically done as a series of batch processing steps, each of which dumps its results to a file R EFERENCES that is accessed by the next element in the pipeline [20]. This writing of files is very slow. S-Store processes tuples as they [1] “Apache Hadoop,” http://hadoop.apache.org. [2] “Apache Storm,” http://storm.apache.org. arrive, and pushes these intermediate results downstream to [3] G. Malewicz et al., “Pregel: A System for Large-scale Graph Process- the next element without writing these results to a file. This ing,” in SIGMOD, 2010. results in near-real-time ETL. [4] M. Stonebraker et al., “SciDB: A Database Management System for Applications with Complex Analytics,” Computing in Science and Integrating real-time and batch processing has become an Engineering, vol. 15, no. 3, pp. 54–62, 2013. important need, and several alternative architectures have been [5] PhysioNet, “MIMIC II Data Set,” https://physionet.org/mimic2/. adopted by big data companies, such as lambda [6] or kappa [6] “Lambda Architecture,” http://lambda-architecture.net. [7] “Kappa Architecture,” https://www.oreilly.com/ideas/questioning-the- architecture [7]. In lambda, the same input data is fed to both lambda-architecture. a throughput-optimized batch and a latency-optimized real- [8] “Apache Spark,” http://spark.apache.org. time layer in parallel, whose results are then made available [9] B. Chandramouli et al., “Trill: A High-Performance Incremental Query to the applications via a serving layer. Kappa in contrast feeds Processor for Diverse Analytics,” PVLDB, vol. 8, no. 4, pp. 401–412, 2014. the input only to a streaming system, followed by a serving [10] T. Akidau et al., “The Dataflow Model: A Practical Approach to layer, which supports both real-time and batch processing (by Balancing Correctness, Latency, and Cost in Massive-Scale, Unbounded, replaying historical data from a logging system such as Kafka Out-of-Order Data Processing,” PVLDB, vol. 8, no. 12, pp. 1792–1803, 2015. [25]). Fernandez et al. also propose Liquid - an extended [11] R. C. Fernandez et al., “Liquid: Unifying Nearline and Offline Big Data architecture similar to kappa [11]. Our polystore architecture Integration,” in CIDR, 2015. 978-1-5090-3525-0/16/$31.00 ©2016 IEEE

7.[12] A. Elmore, J. Duggan, M. Stonebraker, M. Balazinska, U. Cetintemel, V. Gadepally, J. Heer, B. Howe, J. Kepner, T. Kraska, S. Madden, D. Maier, T. Mattson, S. Papadopoulos, J. Parkhurst, N. Tatbul, M. Var- tak, and S. Zdonik, “A Demonstration of the BigDAWG Polystore System,” The Proceedings of the VLDB Endowment (PVLDB), vol. 8, no. 12, August 2015. [13] J. Meehan, N. Tatbul, S. Zdonik, C. Aslantas, U. Cetintemel, J. Du, T. Kraska, S. Madden, D. Maier, A. Pavlo, M. Stonebraker, K. Tufte, and H. Wang, “S-Store: Streaming Meets Transaction Processing,” PVLDB, vol. 8, no. 13, pp. 2134–2145, 2015. [14] U. Cetintemel, J. Du, T. Kraska, S. Madden, D. Maier, J. Meehan, A. Pavlo, M. Stonebraker, E. Sutherland, N. Tatbul, K. Tufte, H. Wang, and S. Zdonik, “S-Store: A Streaming NewSQL System for Big Velocity Applications (Demonstration),” in International Conference on Very Large Data Bases (VLDB’14), Hangzhou, China, September 2014. [15] N. Tatbul et al., “Handling Shared, Mutable State in Stream Processing with Correctness Guarantees,” IEEE Data Engineering Bulletin, to appear. [16] D. Abadi et al., “Aurora: A New Model and Architecture for Data Stream Management,” VLDB Journal, vol. 12, no. 2, 2003. [17] A. Arasu et al., “STREAM: The Stanford Data Stream Management System,” in Data Stream Management: Processing High-Speed Data Streams, 2004. [18] S. Chandrasekaran et al., “TelegraphCQ: Continuous Dataflow Process- ing for an Uncertain World,” in CIDR, 2003. [19] R. Kallman et al., “H-Store: A High-Performance, Distributed Main Memory Transaction Processing System,” PVLDB, vol. 1, no. 2, 2008. [20] P. Vassiliadis, “A Survey of Extract-Transform-Load Technology,” IJDWM, vol. 5, no. 3, pp. 1–27, 2009. [21] Transaction Processing Performance Council (TPC), “TPC Benchmark DI (Version 1.1.0),” http://www.tpc.org/tpcdi/, Nov. 2014. [22] M. Poess, T. Rabl, H.-A. Jacobsen, and B. Caufield, “TPC-DI: The First Industry Benchmark for Data Integration,” Proc. VLDB Endow., vol. 7, no. 13, pp. 1367–1378, Aug. 2014. [23] M. Zaharia, T. Das, H. Li, T. Hunter, S. Shenker, and I. Stoica, “Discretized Streams: Fault-tolerant Streaming Computation at Scale,” in SOSP, 2013, pp. 423–438. [24] A. Toshniwal, S. Taneja, A. Shukla, K. Ramasamy, J. M. Patel, S. Kulka- rni, J. Jackson, K. Gade, M. Fu, J. Donham, N. Bhagat, S. Mittal, and D. V. Ryaboy, “Storm @Twitter,” in SIGMOD, 2014, pp. 147–156. [25] J. Kreps, N. Narkhede, and J. Rao, “Kafka: A Distributed Messaging System for Log Processing,” in NetDB Workshop, 2011. [26] The Transaction Processing Council, “TPC-C Benchmark (Revision 5.9.0),” http://www.tpc.org/tpcc/, 2007. [27] M. J. Carey, L. M. Haas, P. M. Schwarz, M. Arya, W. Cody, R. Fagin, M. Flickner, A. W. Luniewski, W. Niblack, D. Petkovic et al., “Towards heterogeneous multimedia information systems: The garlic approach,” in Research Issues in Data Engineering, 1995: Distributed Object Man- agement, Proceedings. RIDE-DOM’95. Fifth International Workshop on. IEEE, 1995, pp. 124–131. 978-1-5090-3525-0/16/$31.00 ©2016 IEEE