s-store:Streaming Meets Transaction Processing

流处理解决了实时应用的需要。事务处理解决了短原子计算的协调性和安全性。迄今为止,这两种操作模式存在于单独的、炉子管道系统中。在这项工作中,我们试图融合两个计算范式在一个单一的系统称为S-Store。这样,S-Store可以同时适应OLTP和流式应用。我们提出了一个简单的事务模型的流,无缝集成与传统的OLTP系统。我们选择构建S-Store作为H-Store的扩展——一个开源的、内存中的分布式OLTP数据库系统。通过以这种方式实现S-Store,我们可以利用事务处理设施我们可以利用H-Store已经提供的事务处理设施,我们可以专注于所需要的附加功能支持流式传输。类似的实现可以使用其他主内存OLTP平台。我们表明实际上可以在S-Store中实现更高的传输吞吐量,而不仅仅是H-Store中的等效部署。我们还展示了如何这可以在H-Store中实现,并添加一个谦虚的。新功能的数量。此外,我们还比较了S-Store两个最先进的流媒体系统,ESPER和Apache风暴,并展示S-Store有时如何超过它们的性能同时提供更强的正确性保证。
展开查看详情

1. S-Store: Streaming Meets Transaction Processing John Meehan1 , Nesime Tatbul2,3 , Stan Zdonik1 , Cansu Aslantas1 , Ugur Cetintemel1 , Jiang Du4 , Tim Kraska1 , Samuel Madden3 , David Maier5 , Andrew Pavlo6 , Michael Stonebraker3 , Kristin Tufte5 , Hao Wang3 1 2 3 4 5 6 Brown University Intel Labs MIT University of Toronto Portland State University CMU ABSTRACT not designed with storage in mind; it was tacked on after the fact. Stream processing addresses the needs of real-time applications. Thus, there was no support for things like ACID transactions, leav- Transaction processing addresses the coordination and safety of ing applications open to potential inconsistencies with weak guar- short atomic computations. Heretofore, these two modes of op- antees for isolation and recovery. eration existed in separate, stove-piped systems. In this work, we These first-generation streaming systems could be viewed as real- attempt to fuse the two computational paradigms in a single sys- time analytics systems. After all, the input was made up of an in- tem called S-Store. In this way, S-Store can simultaneously ac- finite stream of new tuples. The notion of some of these tuples commodate OLTP and streaming applications. We present a sim- representing updates of previously viewed tuples (or causing up- ple transaction model for streams that integrates seamlessly with dates to other stored data that is related) was not made explicit in a traditional OLTP system, and provides both ACID and stream- the model. This is fine if time is the key. In this case, if each tuple is oriented guarantees. We chose to build S-Store as an extension of given a unique timestamp, the update pattern is append-only. How- H-Store - an open-source, in-memory, distributed OLTP database ever, there are cases when the identifying attribute is something system. By implementing S-Store in this way, we can make use of else. Consider a stock ticker application in which stock symbol is the transaction processing facilities that H-Store already provides, the key. Here a new tuple for, say, IBM is really an update to the and we can concentrate on the additional features that are needed previously reported price. Traders want to see the current stock to support streaming. Similar implementations could be done using book as a consistent view of the 6000 stocks on the NYSE, with other main-memory OLTP platforms. We show that we can actu- all prices reported in a consistent way. Thus, these applications ally achieve higher throughput for streaming workloads in S-Store introduce the need for shared mutable state in streaming systems. than an equivalent deployment in H-Store alone. We also show how We are beginning to see the rise of second-generation streaming this can be achieved within H-Store with the addition of a modest systems [1, 2, 9, 32, 33, 37, 40]. These systems do not enforce a amount of new functionality. Furthermore, we compare S-Store to relational view on their users. Instead, they allow users to create two state-of-the-art streaming systems, Esper and Apache Storm, their own operators that are invoked and managed by a common and show how S-Store can sometimes exceed their performance infrastructure. Note that it is reasonable to have libraries of com- while at the same time providing stronger correctness guarantees. mon operators (including relational) that manipulate tables. The infrastructure enforces some model of failure semantics (e.g., at- least-once or exactly-once processing), but still ignores needs of 1. INTRODUCTION proper isolation and consistent storage in the context of updates. A decade ago, the database research community focused atten- Meanwhile, the advent of inexpensive, high-density RAM has tion on stream data processing systems. These systems [10, 16], in- led to a new generation of distributed on-line transaction processing cluding our own system, Aurora/Borealis [7, 8], were largely con- (OLTP) systems that store their data in main memory, thereby en- cerned with executing SQL-like operators on an unbounded and abling very high throughput with ACID guarantees for workloads continuous stream of input data. The main optimization goal of with shared mutable state (e.g., [6, 18, 29]). However, these sys- these systems was reducing the latency of results, since they mainly tems lack the notion of stream-based processing (e.g., unbounded addressed what might be called monitoring applications [28, 30]. data, push-based data arrival, ordered processing, windowing). To achieve this, they were typically run in main memory, thereby Many applications that involve shared mutable state in fact need avoiding the extreme latency caused by disk access. aspects of both streaming and transaction processing. In this paper, While essentially all of the monitoring applications that we en- we propose to combine these two computational paradigms in a countered had a need for archival storage, the system-level support single system called S-Store. for this was limited and ad hoc. That is, the systems were largely 1.1 Example Use Cases Applications that benefit from this kind of hybrid system gen- This work is licensed under the Creative Commons Attribution- NonCommercial-NoDerivs 3.0 Unported License. To view a copy of this li- erally include those that use the streaming facilities to record per- cense, visit http://creativecommons.org/licenses/by-nc-nd/3.0/. Obtain per- sistent state or views in shared tables (in near real-time), and at mission prior to any use beyond those covered by the license. Contact the same time use the transactional facilities to ensure a consistent copyright holder by emailing info@vldb.org. Articles from this volume representation or summary of this state (e.g., dashboards or leader- were invited to present their results at the 42nd International Conference on boards [14]). We now describe two selected use cases as examples. Very Large Data Bases, September 5th - September 9th 2016, New Delhi, Real-Time Data Ingestion. An analytics warehouse must be up- India. Proceedings of the VLDB Endowment, Vol. 8, No. 13 dated periodically with recent activity. It was once the case that Copyright 2015 VLDB Endowment 2150-8097/15/09. this was done once a day (typically at night) when there was little 2134

2. If we used only a pure stream processing system to implement this use case, we would be able to ensure ordering and push-based processing. However, the isolation requirements of the application would not be expressible. If we used a pure OLTP DBMS instead, we would be able to ensure isolation, but would be unable to take advantage of push-based processing. Transaction ordering would need to be managed at the client, requiring unnecessary context switches and a need to poll the interface for new data. Today, use cases like this are implemented with in-memory data structures, careful custom coding, and recovery based on replaying message logs. We believe that a platform like S-Store reduces user code complexity. Figure 1: FIX Trading Example 1.2 Contributions and Outline This paper introduces the design and implementation of S-Store, to no load on the system. Nowadays, systems must be available a single system for processing streams and transactions with well- at all times and the latency window for loading new data is quickly defined correctness guarantees. Our approach to building such a shrinking. Also, new data must be added to the warehouse in a con- system is to start with a fully transactional OLTP main-memory sistent fashion (e.g., groups of updates must be added atomically) database system and to integrate additional streaming functional- [23]. This suggests that a transaction mechanism is needed. Even ity. By doing so, we are able to leverage infrastructure that already more interesting is the fact that incoming data is typically in differ- addresses many of the implementation complexities of transaction ent formats and is often dirty. ETL tools can address some of the processing. This choice is very natural, since streaming systems problems of data cleaning and integration, but they work with files largely run in main memory to achieve low latency. More specifi- of bulk updates. This is slow and cumbersome, and cannot load cally, this work makes the following key contributions: the warehouse in near real time. Thus, there is a need for some- Model. We define a novel, general-purpose computational model thing similar to ETL that instead works on streaming data. S-Store that allows us to seamlessly mix streaming transactions with ordi- is well-positioned to satisfy this need, and in fact is already being nary OLTP transactions. Stream processing adds additional seman- used for this purpose in the BigDAWG system [19]. tics to an OLTP engine’s operating model. In particular, stream pro- Shared Mutable State. S-Store is useful beyond real-time ETL, as cessing introduces the notion of order to the transaction mix. That illustrated in the example depicted in Figure 1. In the figure, rectan- is, it is possible to say that one transaction must precede another, gles represent transactions; oil drums represent stored, shared data; something that is missing from the non-deterministic semantics of skinny arrows represent reads and writes of stored data; and block a standard transaction model. Further, since streams are unbounded arrows represent streams. This example is based on customer ex- and arrive on a continuous basis, there is a need to add the necessary perience at TIBCO StreamBase, Inc [4]. It is a simplified version primitives for bounding computation on streams, such as batch- of intelligent order routing with FIX (Financial Information eX- based processing [10, 27] and windowing [12, 24]. Finally, stream- change) data. ing transactions support a push-based processing model, whereas Notice that FIX data arrives on a stream and is processed by a OLTP transactions access state in a pull-based manner. Our hybrid transaction (Check and Debit Order Amount) that checks the buyer’s model provides uniform access to state for all transactions. account balance and puts a temporary hold on the funds involved Architecture and Implementation. We show how our hybrid com- in that transaction in the Buying Power database. When this is suc- putational model can be cleanly and efficiently implemented on cessful, the Venue Selection transaction determines to which ex- top of a state-of-the-art main-memory OLTP engine (H-Store [29]). change the order is to be sent. This can be a complex process Our architectural extensions are general enough to be applied to any that involves checking, e.g., the history of a particular exchange main-memory OLTP engine, and include: (i) streams and windows with the given security, and may involve retrieving data from other represented as time-varying state, (ii) triggers to enable push-based databases not shown in the figure. Thus, it is modeled as a separate processing over such state, (iii) a streaming scheduler that ensures transaction so that the Buying Power database is available to other correct transaction ordering, and (iv) a variant on H-Store’s recov- transactions, before the Venue Selection transaction is complete. ery scheme that ensures exactly-once processing for streams. Note Also, Venue Selection requires isolation, since it has to make its that the discussion in this paper is confined to the single-node case; decision based on a consistent state (e.g., there may be other, inde- multi-node S-Store is the topic for follow-on research. pendent OLTP transactions accessing the Customer Orders database Performance. We provide a detailed study of S-Store’s perfor- as shown in the figure). The bold red arrow that connects these two mance characteristics, specifically the benefits of integrating trans- transactions expresses a dependency between them which requires actional state processing with push-based processing. For stream- that for a particular FIX input, Check and Debit Order Amount ing workloads that require transactional state, S-Store demonstrates must precede Venue Selection. This illustrates the need for trans- improved throughput over both pure OLTP systems and pure stream- action ordering. Moreover, when Check and Debit Order Amount ing systems. In both cases, the advantage is a direct result of inte- commits, Venue Selection needs to be triggered (push-based pro- grating state and processing, removing blocking during communi- cessing). At the bottom of the figure, the Update Order transaction cation between the dataflow manager and the data-storage engine. takes input from the exchanges, and confirms or denies previously The rest of this paper is organized as follows: We first describe placed orders. In the case of a failed order, it will return funds to our computational model for transactional stream processing in Sec- the customers account. This can obviously conflict with new orders tion 2. Section 3 presents the design and implementation of the from the same customer. Thus, Check and Debit Order Amount and S-Store system, which realizes this model on top of the H-Store Update Order must both be transactions to guarantee consistency main-memory OLTP system [29]. In Section 4, we present an ex- through isolation. perimental evaluation of S-Store in comparison to H-Store, as well 2135

3.as to two representative stream processing systems - Esper [3] (first 2.2 Streaming Transactions & Dataflow Graphs generation) and Storm [37] (second generation). We discuss related Data Model. Our stream data model is very similar to many of the work in Section 5, and finally conclude the paper with a summary stream processing systems of a decade ago [7, 10, 16]. A stream is and a discussion of future research directions in Section 6. an ordered, unbounded collection of tuples. Tuples have a times- tamp [10] or, more generally, a batch-id [12, 27] that specifies si- 2. THE COMPUTATIONAL MODEL multaneity and ordering. Tuples with the same batch-id b logically In this section, we describe our computational model for trans- occur as a group at the same time and, thus, should be processed actional stream processing. This model allows us to support hy- as a unit. Any output tuples produced as a result of this processing brid workloads (i.e., independent OLTP transactions and streaming are also assigned the same batch-id b (yet they belong to a different transactions) with well-defined correctness guarantees. As we will stream). Furthermore, to respect the inherent stream order, batches discuss in more detail shortly, these guarantees include: of tuples on a given stream should be processed in increasing order of their batch-id’s. This batch-based model is very much like the 1. ACID guarantees for individual transactions approaches taken by STREAM (group tuples by individual times- (both OLTP and streaming) tamps) [10], or more recently, by Spark Streaming (group tuples 2. Ordered Execution guarantees for dataflow graphs of into “mini batches” of small time intervals) [40]. streaming transactions In our model, the above-described notion of a “batch” of tu- 3. Exactly-Once Processing guarantees for streams ples in a stream forms an important basis for transaction atomicity. (i.e., no loss or duplication) A streaming transaction essentially operates over non-overlapping “atomic batches” of tuples from its input streams. Thus, an atomic S-Store acquires ACID guarantees from the traditional OLTP batch corresponds to a finite, contiguous subsequence of a stream model (Sections 2.1 and 2.2), and adds ordered execution guar- that must be processed as an indivisible unit. Atomic batches for antees to capture stream-based processing semantics (Sections 2.3 input streams must be defined by the application programmer, and and 2.4) and exactly-once processing guarantees for correctly re- can be based on timestamps (like in [10, 40]) or tuple counts. covering from failures (Section 2.5). Processing Model. Stream processing systems commonly define 2.1 Overview computations over streams as dataflow graphs. Early streaming Our model adopts well-accepted notions of OLTP and stream systems focused on relational-style operators as computations (e.g., processing, and fuses them into one coherent model. We assume Filter, Join), whereas current systems support more general user- that the reader is already familiar with the traditional notions, and defined computations [1, 2, 9, 32, 33, 37, 40]. Following this trend strive to keep our model description brief and informal for them. and consistent with our OLTP model, we assume that computa- We assume that both OLTP and streaming transactions can share tions over streams are expressed as dataflow graphs of user-defined state and at the same time produce correct results. S-Store supports stored procedures. More formally, a dataflow graph is a directed three different kinds of state: (i) public tables, (ii) windows, and acyclic graph (DAG), in which nodes represent streaming trans- (iii) streams. Furthermore, we make a distinction between OLTP actions (defined as stored procedures) or nested transactions (de- transactions that only access public tables, and streaming transac- scribed in Section 2.4), and edges represent an execution ordering. tions that can access all three kinds of state. If there is an edge between node Ti and node Tj , there is also a For OLTP transactions, we simply adopt the traditional ACID stream that is output for Ti and input for Tj . We say that Ti pre- model that has been well-described in previous literature [39]. A cedes Tj and is denoted as Ti ≺ Tj . database consists of unordered, bounded collections (i.e., sets) of Furthermore, given the unbounded nature of streams, all stream tuples. A transaction represents a finite unit of work (i.e., a fi- processing systems support windowing as a means to restrict state nite sequence of read and write operations) performed over a given and computation for stateful operations (e.g., Join, Aggregate). A database. In order to maintain integrity of the database in the face window is a finite, contiguous subsequence of a stream. Windows of concurrent transaction executions and failures, each transaction can be defined in many different ways [12, 24], but for the pur- is executed with ACID guarantees. poses of this work, we will restrict our focus to the most common Each transaction (OLTP or streaming) has a definition and pos- type: sliding windows. A sliding window is a window which has sibly many executions (i.e., instances). We assume that all trans- a fixed size and a fixed slide, where the slide specifies the distance actions are predefined as stored procedures with input parameters. between two consecutive windows and must be less than or equal They are predefined, because: (i) OLTP applications generally use to the window size (if equal to window size, it has been called a a relatively small collection of transactions many times (e.g., Ac- tumbling window). A sliding window is said to be time-based if its count Withdrawal), (ii) streaming systems typically require prede- size and slide are defined in terms of tuple timestamps, and tuple- fined computations. Recall that it is the data that is sent to the query based if its size and slide are defined in terms of the number of in streaming systems in contrast to the standard DBMS model of tuples. Note that atomic batches and tumbling windows are similar sending the query to the data. The input parameters for OLTP in definition, but their use is orthogonal: batches are external to a transactions are assigned by the application when it explicitly in- streaming transaction T and are mainly used to set atomic bound- vokes them (“pull”), whereas streaming transactions are invoked as aries for T ’s instances, whereas windows are internal to T and are new data becomes available on their input streams (“push”). used to bound computations defined inside T . For the purpose of granularity, the programmer determines the Atomic batches of tuples arrive on a stream at the input to a transaction boundaries. Course-grain transactions protect state for dataflow graph from push-based data sources. We adopt the data- a longer period, but in so doing, other transactions may have to driven execution model of streams, where arrival of a new atomic wait. Fine-grained transactions are in general preferred when they batch causes a new invocation for all the streaming transactions are safe. Fine-grained transactions make results available to other that are defined over the corresponding stream. We refer to execu- transactions earlier. Said another way, in dataflow graphs with tion of each such transaction invocation as a transaction execution transactions, we can commit stable results when they are ready and (TE). (In the rest of this paper, we use the terms “transaction” and then continue processing as required by the dataflow graph. “stored procedure” interchangeably to refer to the definition of a 2136

4. correct execution. This is due to the precedence relation between T1 and T2 in the graph as well as the ordering of the atomic batches on their input streams. This requirement is in contrast to most OLTP transaction processors which would accept any serializable schedule (e.g., one that is equivalent to any of the 4! possible serial execution schedules if these were 4 independent transactions). Note that we make no ACID claims for the dataflow graph as a whole. The result of running a dataflow graph is to create an ordered execution of ACID transactions. Furthermore, in streaming applications, the state of a window must be shared differently than other stored state. To understand this, consider again the simple dataflow graph shown in Figure 2. Figure 2: Transaction Executions in a Dataflow Graph Let us assume for simplicity that the transaction input batch size for T1 is 1 tuple. Further, suppose that T1 constructs a window of size 2 that slides by 1 tuple, i.e., two consecutive windows in T1 transaction, whereas we use the term “transaction execution” (TE) overlap by 1 tuple. This means that window state will carry over to refer to a specific invocation of that definition). A TE essen- from T1,1 to T1,2 . For correct behavior, this window state must tially corresponds to an atomic batch and its subsequent processing not be publicly shared with other transaction executions. That is, by a stored procedure. For example, in Figure 2, a dataflow graph the state of a window can be shared among consecutive executions with two stored procedures (i.e., T1 and T2 ) are defined above the of a given transaction, but should not be made public beyond that. dashed line, labeled “Definition”, but each of those are executed Returning to Figure 2, when T1,1 commits, the window in T1,1 will twice for two contiguous atomic batches on their respective input slide by one and will then be available to T1,2 , but not to T2,1 . streams (i.e., s1 .b1 , s1 .b2 for T1 , and s2 .b1 , s2 .b2 for T2 ), yielding This approach to window visibility is necessary, since it is this way a total of four TE’s shown below the dashed line, labeled “Exe- of sharing window state that is the basis for continuous operation. cution” (i.e., T1,1 , T1,2 , T2,1 , and T2,2 ). Note, s1 .b2 denotes the Windows evolve and, in some sense, “belong” to a particular stored second batch on stream s1 and T1,2 denotes the second execution procedure. Thus, a window’s visibility should be restricted to the of T1 on that batch. transaction executions of its “owning” stored procedure. Given a dataflow graph, it is also useful to distinguish between We will now describe what constitutes a correct execution for a border transactions (those that ingest streams from the outside, dataflow graph of streaming transactions more formally. Consider a e.g., T1 in Figure 2) and interior transactions (others, e.g., T2 in dataflow graph D of n streaming transactions Ti , 1 ≤ i ≤ n. D is a Figure 2). Border transactions are instantiated by each newly ar- directed acyclic graph G = (V, E), where V = {T1 , . . . , Tn } and riving atomic batch (e.g., s1 .b1 , s1 .b2 ), and each such execution E ⊆ V × V , where (Ti , Tj ) ∈ E means that Ti must precede Tj may produce a group of output stream tuples labeled with the same (denoted as Ti ≺ Tj ). Being a DAG, G has at least one topological batch-id as the input that produced them (e.g., s2 .b1 , s2 .b2 , respec- ordering. A topological ordering of G is an ordering of its nodes tively). These output tuples become the atomic batch for the imme- Ti ∈ V such that for every edge (Ti , Tj ) ∈ E we have i < j. Each diately downstream interior transactions, and so on. topological ordering of G is essentially some permutation of V . Figure 2 also illustrates the different kinds of state accessed and Without loss of generality: (i) Let us focus on one specific topo- shared by different transaction instances (shown below the dashed logical ordering of G and call it O; (ii) For ease of notation, let us line, labeled “State”). T1 takes as input the stream s1 and the win- simply assume that O corresponds to the identity permutation such dow w1 , and produces as output the stream s2 , whereas T2 takes that it represents: T1 ≺ T2 ≺ .. ≺ Tn . as input the stream s2 and produces as output the stream s3 . Thus, Ti represents a transaction definition Ti (si , wi , pi ), where si de- TE’s of T1 (i.e., T1,1 and T1,2 ) share access to s1 , w1 , and s2 , notes all stream inputs of Ti (at least one), wi denotes all win- whereas TE’s of T2 (i.e., T2,1 and T2,2 ) do so for s2 and s3 . Note, dow inputs of Ti (optional), pi denotes all table partition inputs of there are two ways to output final results of a dataflow graph (e.g., Ti (optional). Similarly, Ti,j represents the j th transaction execu- s3 in Figure 2): (i) write them to a public table, or (ii) push them to tion of Ti as Ti,j (si .bj , wi , pi ), where si .bj denotes the j th atomic a sink outside the system (e.g., a TCP connection). batches of all streams in si . In order to ensure a correct execution, shared state accesses must A dataflow graph D is executed in rounds of atomic batches be properly coordinated. We discuss this issue in more detail next. 1 ≤ r < ∞, such that for any round r, atomic batch r from all streaming inputs into D generates a sequence of transaction ex- 2.3 Correct Execution for Dataflow Graphs ecutions Ti,r (si .br , wi , pi ) for each Ti . Note that this execution generates an unbounded schedule. However, as of a specific round A standard OLTP transaction mechanism guarantees the isola- r = R, we generate a bounded schedule that consists of all R ∗ n tion of a transaction’s operations from others’. When a transaction transaction executions: 1 ≤ r ≤ R, 1 ≤ i ≤ n, Ti,r (si .br , wi , pi ). T commits successfully, all of T ’s writes are installed and made In the traditional ACID model of databases, any permutation of public. During T ’s execution, all of T ’s writes remain private. these R ∗ n transaction executions would be considered to be a S-Store adopts such standard transaction semantics as a basic valid/correct, serial schedule. In our model, we additionally have: building block for its streaming transactions (thus ensuring ACID 1. Dataflow graph order constraint: Consider the topological guarantees in this way); however, the ordering of stored proce- ordering O of G as we defined above. Then for any given dures in the dataflow graph as well as the inherent order in stream- execution round r, it must hold that: ing data puts additional constraints on allowable transaction exe- T1,r (s1 .br , w1 , p1 ) ≺ . . . ≺ Tn,r (sn .br , wn , pn ) cution orders. As an example, consider again the dataflow graph 2. Stream order constraint: For any given transaction Ti , as of shown in Figure 2. The four TE’s illustrated in this example can any execution round r, the following must hold: be ordered in one of two possible ways: [T1,1 , T2,1 , T1,2 , T2,2 ] or Ti,1 (si .b1 , wi , pi ) ≺ . . . ≺ Ti,r (si .br , wi , pi ) [T1,1 , T1,2 , T2,1 , T2,2 ]. Any other orderings would not lead to a 2137

5. (1) follows from the definition of a dataflow graph which speci- happens and some TE’s are undone / redone during recovery, the fies a precedence relation on its nodes, whereas (2) is to ensure that database state must be “equivalent” to one that is as if s were pro- atomic batches of a given stream are processed in order. cessed exactly once by Ti . Any bounded schedule of D that meets the above two ordering For example, consider the streaming transaction T1 (s1 , w1 ) in constraints is a correct schedule. If G has multiple topological or- Figure 2. If a failure happens while TE T1,1 (s1 .b1 , w1 ) is still exe- derings, then the dataflow graph order constraint must be relaxed to cuting, then: (i) T1,1 should be undone, i.e., any modifications that accept any of those orderings for any given execution round of D. it may have done on s1 , w1 , and s2 should be undone; (ii) T1,1 should be reinvoked for the atomic batch s1 .b1 . Similarly, if a fail- 2.4 Correct Execution for Hybrid Workloads ure happens after TE T1,1 (s1 .b1 , w1 ) has already committed, then S-Store’s computational model allows OLTP and streaming trans- all of its modifications on s1 , w1 , and s2 should be retained in the actions to co-exist as part of a common transaction execution sched- database. In both of these failure scenarios, the recovery mecha- ule. This is particularly interesting if those transactions access nism should guarantee that s1 .b1 is processed exactly once by T1 shared public tables. Given our formal description of a correct and the database state will reflect the effects of this execution. schedule for a dataflow graph D that consists of streaming trans- Note that a streaming TE may have an external side effect other actions, any OLTP transaction execution Ti,j (pi ) (defined on one than modifying the database state (e.g., delivering an output tuple or more public table partitions pi ) is allowed to interleave anywhere to a sink that is external to S-Store, as shown for s3 at the top part of in such a schedule. The resulting schedule would still be correct. Figure 2). Such a side effect may get executed multiple times due We have also extended our transaction model to include nested to failures. Thus, our exactly-once processing guarantee applies transactions. Fundamentally, this allows the application program- only to state that is internal to S-Store (e.g., if s3 were alternatively mer to build higher-level transactions out of smaller ones, giving stored in an S-Store table as shown at the bottom part of Figure her the ability to create coarser isolation units among stored pro- 2). This is similar to other exactly-once processing systems such as cedures, as illustrated in Figure 3. In this example, two streaming Spark Streaming [40]. transactions, T1 and T2 , in a dataflow graph access a shared table If the dataflow graph definition allows multiple TE orderings partition p. T1 writes to the table and T2 reads from it. If another or if the transactions within a dataflow graph contain any non- OLTP transaction also writes to p in a way to interleave between deterministic operations (e.g., use of a random number generator), T1 and T2 , then T2 may get unexpected results. Creating a nested we provide an additional recovery option that we call weak recov- transaction with T1 and T2 as its children will isolate the behavior ery. Weak recovery will produce a correct result in the sense that of T1 and T2 as a group from other transactions (i.e., other OLTP it will produce results that could have been produced if the failure or streaming). Note that nested transactions also isolate multiple had not occurred, but not necessarily the one that was in fact be- instances of a given streaming dataflow graph (or subgraph) from ing produced. In other words, each atomic batch of each stream in one another. We describe such a scenario in Section 4.1.1. the database will still be processed exactly once and the TE’s will More generally, an S-Store nested transaction consists of two or be ordered correctly (as described in Sections 2.3 and 2.4), but the more stored procedures with a partial order defined among them final database state might look different than that of the original ex- [36]. The stored procedures within a nested transaction must ex- ecution before the failure. This is because the new execution might ecute in a way that is consistent with that partial order. A nested follow a different (but valid) TE ordering, or a non-deterministic transaction will commit, if and only if all of its stored procedures TE might behave differently every time it is invoked (even with the commit. If one or more stored procedures abort, the whole nested same input parameters and database state). transaction will abort. Nested transactions fit into our formal model of streaming trans- actions in a rather straight-forward way. More specifically, any 3. ARCHITECTURE & IMPLEMENTATION streaming transaction Ti in dataflow graph D can be defined as We chose to build S-Store on top of the H-Store main-memory a nested transaction that consists of children Ti1 , . . . , Tim . In this OLTP system [29]. This allows us to inherit H-Store’s support case, Ti1 , . . . , Tim must obey the partial order defined for Ti for for high-throughput transaction processing, thereby eliminating the every execution round r, 1 ≤ r < ∞. This means that no other need to replicate this complex functionality. We also receive associ- streaming or OLTP transaction instance will be allowed to inter- ated functionality that will be important for streaming OLTP appli- leave with Ti1 , . . . , Tim for any given execution round. cations, including indexing, main-memory operation, and support for user-defined transactions. 2.5 Fault Tolerance In this section, we briefly describe the H-Store architecture and Like any ACID-compliant database, in the face of failure, S- the changes required to incorporate S-Store’s hybrid model de- Store must recover all of its state (including streams, windows, scribed in the previous section. Nevertheless, we believe that the and public tables) such that any committed transactions (includ- architectural features that we have added to H-Store are conceptu- ing OLTP and streaming) remain stable, and, at the same time, any ally applicable to any main-memory OLTP system. uncommitted transactions are not allowed to have any effect on this state. A TE that had started but had not yet committed should be un- 3.1 H-Store Overview done, and it should be reinvoked with the proper input parameters H-Store is an open-source, main-memory OLTP engine that was once the system is stable again. For a streaming TE, the invocation developed at Brown and MIT [29], and formed the basis for the should also take proper stream input from its predecessor. design of the VoltDB NewSQL database system [6]. In addition to ACID, S-Store strives to provide exactly-once pro- All transactions in H-Store must be predefined as stored proce- cessing guarantees for all streams in its database. This means that dures with input parameters. The stored procedure code is a mix- each atomic batch s.bj on a given stream s that is an input to a ture of SQL and Java. Transaction executions (TEs) are instantiated streaming transaction Ti is processed exactly once by Ti . Note that by binding input parameters of a stored procedure to real values such a TE Ti,j , once it commits, will likely modify the database and running it. In general, a given stored procedure definition will, state (streams, windows, or public tables). Thus, even if a failure over time, generate many TEs. TEs are submitted to H-Store, and 2138

6. Figure 3: Nested Transactions Figure 4: S-Store Architecture the H-Store scheduler executes them in whatever order is required from streaming clients and dataflow graphs of stored procedures at to provide ACID guarantees. the PE layer, (ii) triggers at both the PE and the EE layers, (iii) H-Store follows a typical distributed DBMS architecture in which stream- and window-based queries at the EE layer, (iv) in-memory a client initiates the transaction in a layer (in H-Store, called the stream and window state. partition engine (PE)) that is responsible for managing transac- tion distribution, scheduling, coordination, and recovery. The PE 3.2.1 Streams manages the use of another layer (in H-Store, called the execu- S-Store implements a stream as a time-varying H-Store table. tion engine (EE)) that is responsible for the local execution of SQL Using this approach, stream state is persistent and recoverable. Since queries. This layering is very much like the transaction manager tables are unordered, the order of tuples in a stream is captured by / transaction coordinator division of labor in a standard distributed timestamps. An atomic batch of tuples is appended to the stream DBMS architecture. table as it is placed on the corresponding stream, and conversely, A client program connects to the PE via a stored procedure ex- an atomic batch of tuples is removed from the stream table as it is ecution request. If the stored procedure requires SQL processing, consumed by a downstream transaction in the dataflow. The pres- then the EE is invoked with these sub-requests. ence of an atomic batch of tuples within a stream can activate either An H-Store database is partitioned across multiple sites [34], a SQL plan fragment or a downstream streaming transaction, de- where a site corresponds to a CPU core. The available DRAM for a pending on what “triggers” are attached to the stream (described in node is divided equally among the partitions, and each stores a hor- Section 3.2.2). In case of the latter, the current stream table serves izontal slice of the database. A transaction is executed on the sites as input for the corresponding downstream streaming transaction. that hold the data that it needs. If the data is partitioned carefully, most transactions will only need data from a single site. Single- 3.2.2 Triggers sited transactions are run serially on that site, thereby eliminating Triggers enable push-based, data-driven processing needed to the need for fine-grained locks and latches. implement S-Store dataflow graphs. A trigger is associated with H-Store provides recovery through a checkpointing and command- a stream table or a window table. When new tuples are appended logging mechanism [31]. Periodically, the system creates a per- to such a table, downstream processing will be automatically ac- sistent snapshot or checkpoint of the current committed state of tivated. The alternative to triggers would be polling for newly- the database. Furthermore, every time H-Store commits a transac- arriving tuples, which would reduce throughput. tion, it writes a command-log record containing the name of that There are two types of triggers in S-Store to reflect the two-layer stored procedure along with its input parameters. This command- design of H-Store and of many other distributed database systems: log record must be made persistent before its transaction can com- mit. In order to minimize interactions with the slow persistent store, Partition engine (PE) triggers can only be attached to stream ta- H-Store offers a group-commit mechanism. bles, and are used to activate downstream stored procedures upon On recovery, the system’s state is restored to the latest snap- the insertion and commit of a new atomic batch of tuples on the cor- shot, and the command-log is replayed. That is, each command-log responding streams. As the name implies, PE triggers exist to cre- record causes the system to re-execute the same stored procedures ate a push-based dataflow within the PE by eliminating the need to with the same arguments in the same order that it did before the return back to the client to activate downstream stored procedures. failure. Note that an undo-log is unnecessary, as neither the pre- In Figure 4, the horizontal arrows between stored procedures inside vious checkpoint nor the command-log will contain uncommitted the PE layer denote PE triggers. changes. Execution Engine (EE) triggers can be attached to stream or win- dow tables, and are used to activate SQL queries within the EE. 3.2 S-Store Extensions These triggers occur immediately upon the insertion of an atomic The high-level architecture of S-Store, directly adapted from H- batch of tuples in the case of a stream, and upon the insertion of an Store, is shown in Figure 4. S-Store makes a number of exten- atomic batch of tuples that also cause a window to slide in the case sions to H-Store to enable stream processing in the engine (shown of a window. The SQL queries are executed within the same trans- in boldface in Figure 4). These include management of: (i) inputs action instance as the batch insertion which triggered them, and 2139

7.can also activate further downstream EE triggers. EE triggers are 3.2.5 Recovery Mechanisms designed to eliminate unnecessary communication between the EE As described in Section 2.5, S-Store provides two different re- and PE layers, for example when the execution of downstream pro- covery options: (i) strong recovery, which is guaranteed to produce cessing is conditional. In Figure 4, the horizontal arrows between exactly the same state as was present before the failure (note that SQL queries inside the EE layer denote EE triggers. this guarantee is feasible only if the workload does not contain any non-determinism), and (ii) weak recovery, which will produce a le- 3.2.3 Windows gal state that could have existed, but is not necessarily the exact state lost. Both of these options leverage periodic checkpointing Windows are also implemented as time-varying H-Store tables. and command-logging mechanisms of H-Store. However, they dif- A window is processed only when a new complete window state fer in terms of which transactions are recorded in the command-log is available. For a sliding window, a new full window becomes during normal operation and how they are replayed during crash available every time that window has one slide-worth of new tuples. recovery. Therefore, when new tuples are inserted into a window, they are flagged as “staged” until slide conditions are met. Staged tuples are Strong Recovery. S-Store’s strong recovery is very similar to H- not visible to any queries on the window, but are maintained within Store’s recovery mechanism. All committed transactions (both OLTP the window. Upon sliding, the oldest tuples within the window are and streaming) are recorded in the command-log along with their removed, and the staged tuples are marked as active in their place. input arguments. When a failure occurs, the system replays the All window manipulation is done at the EE level, and output can be command-log starting from the latest snapshot. The log is replayed activated using an EE trigger. in the order in which the transactions appear, which is the same as Due to the invisible “staging” state of a window table as well the order they were originally committed. This will guarantee the as the transaction isolation rules discussed earlier in Section 2.3, reads-from and the writes-to relationships between the transactions special scoping rules are enforced for window state. A window are strictly maintained. table must not be accessed in general by TE’s other than those of the There is one variation on H-Store’s recovery, however. Before stored procedure that defined it. In fact, a window table must only the log replay, we must first disable all PE triggers so that the ex- be visible to consecutive TE’s of the stored procedure that contains ecution of a stored procedure does not redundantly trigger the ex- it. As a consequence, one is not allowed to define PE triggers on ecution of its successor(s) in the dataflow graph. Because every window state, but only EE triggers. In other words, windows must transaction is logged in strong recovery, failing to do this would be contained within the TE’s of single stored procedures and must create duplicate invocations, and thus potentially incorrect results. not be shared across other stored procedures in the dataflow graph. Once triggers are disabled, the snapshot is applied, and recovery S-Store provides automatic garbage collection mechanisms for from the command-log can begin. tuples that expire from stream or window state, after any triggers When recovery is complete, we turn PE triggers back on. At that associated with them have all been fired and executed. point, we also check if there are any stream tables that contain tu- It should be noted that some optimizations, such as incremental ples in them. For such streams, PE triggers will be fired to activate window processing, have been left as future work. their respective downstream transactions. Once those transactions have been queued, then the system can resume normal operation. 3.2.4 Streaming Scheduler Weak Recovery. In weak recovery, the command-log need not record all stored procedure invocations, but only the ones that in- Being an OLTP database that implements the traditional ACID gest streams from the outside (i.e., border transactions). We then model, the H-Store scheduler can execute transaction requests in use a technique similar to upstream backup [25] to re-invoke the any order. On a single H-Store partition, transactions run in a serial other previously committed stored procedures (i.e., interior trans- fashion by design [29]. H-Store serves transaction requests from actions). In upstream backup, the data at the inputs to a dataflow its clients in a FIFO manner by default. graph are cached so that in the event of a failure, the system can re- As we discussed in Section 2.3, streaming transactions and data- play them in the same way that it did on first receiving them in the flow graphs require TE’s for dependent stored procedures to be live system. Because the streaming stored procedures in an S-Store scheduled in an order that is consistent with the dataflow graph (i.e., dataflow have a well-defined ordering, the replay will necessarily not necessarily FIFO). This is, of course, true for other streaming create a correct execution schedule. While transactions may not schedulers, but here we must obey the rules defining correct sched- be scheduled in the exact order that took place on the original run, ules as stated earlier in Section 2.3. Additionally, as discussed in some legal transaction order is ensured. Section 2.4, the application can specify (via defining nested trans- When recovering using weak recovery, we must first apply the actions) additional isolation constraints, especially when shared ta- snapshot, as usual. However, before applying the command-log, ble state among streaming transactions is involved. The simplest S-Store must first check existing streams for data recovered by the solution is to require the TE’s in a dataflow graph for a given input snapshot, and fire any PE triggers associated with those streams. batch to always be executed in an order consistent with a specific This ensures that interior transactions that were run post-snapshot topological ordering of that dataflow graph. but not logged are re-executed. Once these triggers have been fired, Although our ordering rules described earlier would allow trans- S-Store can begin replaying the log. Unlike for strong recovery, we action schedules that are ”equivalent” to any topological ordering do not need to turn off PE triggers during weak recovery. In fact, of the dataflow graph, our current scheduler implementation admits we rely on PE triggers for the recovery of all interior transactions, only one of them. We have found this approach to be practical in as these are not recorded in the command-log. Results are returned that it is amenable to a low-overhead implementation in H-Store through committed tables. and good enough to support all the S-Store use cases and bench- Weak recovery is a light-weight alternative to strong recovery, marks that we have so far studied (see Section 4). As we consider since it need not log all committed transactions. Section 4.2.3 pro- scaling to larger collections of workloads and nodes going forward, vides an experimental comparison of our strong and weak recovery issues of fairness and locality may require more sophisticated ap- mechanisms. proaches, such as flow-based scheduling [26]. 2140

8. Exactly- Max Tput System ACID Order Once (batches/sec) H-Store (async) × × 5300 H-Store (sync) × 210 Esper+ VoltDB × 570 Storm+ 600 VoltDB S-Store 2200 Figure 5: Leaderboard Maintenance Benchmark Table 1: Guarantees vs Max Tput (Leaderboard Maintenance) 4. EXPERIMENTS remaining candidates. This continues until a single winner is de- In this section, we present the results of our experimental study clared. During the course of the voting, each incoming vote needs that evaluates S-Store with respect to existing alternatives in OLTP to be validated and recorded. Furthermore, several leaderboards and stream processing. First, we demonstrate the benefits of inte- are maintained: one representing the top-3 candidates, another for grating state management with push-based processing in Section the bottom-3 candidates, and a third one for the top-3 trending 4.1. Specifically, we compare S-Store to H-Store, Esper, and Storm candidates of the last 100 votes. With each incoming vote, these in terms of overall throughput on a transactional stream processing leaderboards are updated with new statistics regarding the number workload. Then, Section 4.2 further explores a number of micro- of votes each candidate has received. benchmarks that focus on evaluating specific architectural features As shown in Figure 5, the dataflow graph contains three separate of S-Store in comparison to its base system H-Store (i.e., EE trig- stored procedures: one to validate and insert a new vote, a second gers, PE triggers, and recovery modes). to maintain the leaderboard, and a third to delete a candidate if To properly evaluate streaming workloads, we record throughput necessary. In order to ensure the correctness of the result in the in terms of “input batches per second”. This number represents the presence of shared tables, as well as to maintain consistency of the number of input batches that are processed to completion, regard- tables across the dataflow graph, these three stored procedures must less of the number of transactions executed. In order to simplify execute in sequence for each new vote. comparison to other systems, these experiments set the batch size to be a single tuple. For example, if any system processes 1,000 4.1.2 OLTP Systems (H-Store) tuples / sec, we consider it to be processing 1,000 batches / sec. As discussed at the beginning of Section 2, S-Store provides All experiments were run on a cluster of machines using the three primary guarantees: ACID, ordered execution, and exactly- Intel R Xeon R E7-4830 processors running at 2.13 GHz. Each once processing. When evaluating S-Store against an OLTP system machine contains a total of 64 cores and 264 GB of memory. Be- (H-Store), it is important to consider which of these guarantees are cause we focus on single-node S-Store deployments in this paper being provided. and due to the partitioned architecture of S-Store, effectively only a By default, H-Store provides only one of the three processing single core is used for data access. In order to create data isolation guarantees of S-Store: ACID. H-Store has no ordering guarantees, for an apples-to-apples comparison, we limit data access to a single as it has no concept of a dataflow graph. It can instead choose any core on all featured systems. The experiments were run using a sin- serializable transaction schedule (Section 3.1). In fact, we have pre- gle non-blocking client which asynchronously sends requests to the viously shown that, in a workload in which multiple stored proce- system. Command-logging was enabled unless otherwise stated. dures within a dataflow share state like the one in Figure 5, H-Store may produce incorrect results [14]. H-Store also does not guaran- 4.1 State-of-the-Art Comparison tee that a dataflow will be fully processed exactly once in the event In order to provide the best comparison between S-Store and of a system failure (again due to the lack of concept of a dataflow state-of-the-art systems, we chose to implement a Leaderboard Main- graph). tenance benchmark that exercises all of the architectural additions Because ordering guarantees are not considered, H-Store can of S-Store described in Section 3. We measure S-Store’s perfor- asynchronously queue transactions for the engine to process. Thus, mance against a main-memory OLTP system (H-Store [29]), a tra- H-Store can send a transaction request and immediately send an- ditional single-node CEP engine (Esper [3]), and a modern dis- other without waiting for the response. The queue provides the tributed streaming system (Storm [37]). system with a continuous supply of work, meaning H-Store is al- most constantly doing transactional work. As a result, H-Store is 4.1.1 Leaderboard Maintenance Benchmark able to process an impressive 5,300 input batches per second, as Consider a TV game-show in which viewers vote for their fa- can be seen in Table 1. vorite candidate. Leaderboards are periodically updated with the By comparison, S-Store is able to achieve 2,200 input batches number of votes each candidate has received. per second, while providing all three correctness guarantees. The Each viewer may cast a single vote via text message. Suppose primary performance difference lies within the ordered execution the candidate with the fewest votes will be removed from the run- guarantee. To provide this, S-Store’s scheduler must determine the ning every 20,000 votes, as it has become clear that s/he is the least proper order in which to run the transactions in its queue (discussed popular. When this candidate is removed, votes submitted for him in Section 3.2.4). This scheduling does reduce the number of trans- or her will be deleted, effectively returning the votes to the people actions per second that S-Store is able to process, but it is necessary who cast them. Those votes may then be re-submitted for any of the to ensure correct results. 2141

9. It is possible to execute the Leaderboard Maintenance bench- mark on H-Store in a way that provides ordering guarantees. This is accomplished by designing a pseudo-“dataflow-graph” within the client. The parameters of a downstream procedure depend on the result from an upstream procedure, and transaction ordering must be ensured by the client. As a result, all procedures are forced to be invoked synchronously, meaning that a response must be received before the next request can be made. This method ensures that the end results of the benchmark are correct, but performance suffers severely in the process. H-Store (a) EE Trigger Micro-Benchmark is only able to process 210 input batches per second when order- ing is enforced by the client (see Table 1). Because all transac- tion calls are synchronous, H-Store’s transaction queue never holds more than one transaction at a time. As a result, the client and the PE of H-Store must constantly wait for each other, severely hin- dering performance. S-Store, on the other hand, provides all three correctness guarantees while maintaining reasonable throughput. 4.1.3 Streaming Systems (Esper and Storm) To compete with pure streaming systems, S-Store’s performance must be comparable to both first-generation, single-node CEP en- gines as well as second-generation, distributed real-time streaming (b) EE Trigger Result systems. We chose Esper and Storm as representative systems for their respective categories. Figure 6: Execution Engine Triggers As further discussed in Section 5, neither Esper nor Storm are transactional. In order to provide comparable (though not compre- time, and the database must at a minimum wait for a full round-trip hensive) guarantees to S-Store, only serialized tuple processing was to and from the streaming system before it can process more work. allowed. All of Esper’s default delivery ordering guarantees remain Meanwhile, Esper and Storm must wait for VoltDB to process its activated, meaning each tuple must run to completion before the transaction request before evaluating the response and continuing next tuple may begin processing. For the Storm implementation, to process the dataflow graph. we opted to use Trident [5], an extension of Storm that supports By contrast, S-Store processes 2,200 batches per second. S-Store stateful stream processing and exactly-once semantics. Data dura- is able to handle multiple asynchronous transaction requests from bility in both systems is provided by command-logging each of the the client and still preserve the tuple processing order. This is be- three atomic processing units in the dataflow graph. cause all of the transaction ordering is handled directly by the S- On stateless, pure streaming workloads that do not require trans- Store partition engine. By combining the push-based semantics and actional guarantees, both Esper and Storm would easily outperform fully-integrated state management, S-Store avoids the costly block- S-Store. However, shared state management is key to many work- ing communication between the streaming system and the database. loads, including our Leaderboard Maintenance benchmark. Like many stream processing systems, both Esper and Storm rely on ex- 4.2 Micro-Benchmarks ternal data storage for durable, shared state. We added VoltDB[6], a main-memory, transactional database, as A number of micro-experiments were performed to evaluate the the backend for both Esper and Storm. VoltDB is an optimized, optimizations achieved by S-Store over its predecessor, H-Store, commercial version of H-Store, making the comparison with S- in the presence of transactional stream processing workloads. For Store fair. Esper and Storm serve as the driving push-based en- the experiments in Sections 4.2.1 and 4.2.2, command-logging was gines, choosing when to access state based on the results received disabled to emphasize the feature being measured. from the database. To maximize VoltDB’s potential and batch re- quests from Esper / Storm to the database, we compile the three op- 4.2.1 Execution Engine Triggers erations in Leaderboard Maintenance as VoltDB stored procedures. In this experiment, we evaluate the benefit of S-Store’s EE trig- Each streaming system sends stored procedure requests via JDBC. gers. The micro-benchmark contains a single stored procedure that Command-logging was unavailable in the open-source version of consists of a sequence of SQL statements (Figure 6(a)). In S-Store, VoltDB, so asynchronous command logging was implemented in these SQL statements can be activated using EE triggers such that Esper and Storm. all execution takes place inside the EE layer. H-Store, on the other After adding VoltDB, both Esper and Storm with Trident pro- hand, must submit the set of SQL statements (an insert and a delete) vide comparable guarantees to S-Store, outlined in Table 1. Esper for each query as a separate execution batch from PE to EE. Figure (+VoltDB) provides two of the three processing guarantees of S- 6(a) illustrates the case for 3 streams and 3 queries. S-Store’s EE Store (ACID and ordered execution guarantees), but has no support triggers enable it to trade off trigger execution cost for a reduction for exactly-once semantics. Storm with Trident (+VoltDB) pro- in the number of PE-to-EE round-trips (e.g., 2 triggers instead of 2 vides all three correctness guarantees. additional round-trips). Note also that the DELETE statements are As shown in Table 1, both Esper and Storm with Trident achieve not needed in S-Store, since garbage collection on streams is done roughly 600 batches per second, with data access being the signif- automatically as part of our EE trigger implementation. icant bottleneck. At all times, either Esper or Storm is waiting for Figure 6(b) shows how maximum throughput varies with the VoltDB, or vice-versa. Because tuples must be processed sequen- number of EE triggers. S-Store outperforms H-Store in all cases, tially, only a single transaction request can be sent to VoltDB at a and its relative performance further increases with the number of 2142

10. For the command-logging experiment, we use the same micro- benchmark presented in Section 4.2.2 (Figure 7(a)), using a dataflow with a variable number of SPs. Ordinarily in H-Store, higher through- put is achieved during logging by group-committing transactions, writing their log records to disk in batches. In S-Store, we have found that for trigger-heavy workloads, weak recovery can accom- plish a similar run-time effect to the use of group commit. As (a) PE Trigger Micro-Benchmark shown in Figure 8(a), without group commit, logging quickly be- comes a bottleneck in the strong recovery case. Each committed transaction is logged, so the throughput quickly degrades as the number of transactions in the dataflow graph increases. By con- trast, weak recovery logs only the committed border transactions, allowing up to 4x the throughput as it writes a smaller fraction of log records to disk. For the recovery experiment, we ran 5,000 input batches through the same PE micro-benchmark, recording logs for both weak and strong recovery. We then measured the amount of time it took S- Store to recover from scratch using each command-log. As shown in Figure 8(b), weak recovery not only achieves better throughput during normal operation, but it also provides lower re- (b) PE Trigger Result covery time. Typically during recovery, the log is read by the client Figure 7: Partition Engine Triggers and transactions are submitted sequentially to the engine. Each transaction must be confirmed as committed before the next can be EE triggers, reaching up to a factor of 2.5x for 9 triggers. This sent. Because weak recovery activates interior transactions within trend continues as more EE triggers are added. the engine, the transactions can be confirmed without a round-trip to the client. As a result, recovery time stays roughly constant for 4.2.2 Partition Engine Triggers weak recovery, even for dataflow graphs with larger numbers of This experiment compares the performance of S-Store’s PE trig- stored procedures. For strong recovery, recovery time increases gers to an equivalent implementation in H-Store, which has no such linearly with the size of the dataflow graph. trigger support in its PE. As illustrated in Figure 7(a), the micro- As previously stated, we expect the need for recovery to be rare, benchmark consists of a dataflow graph with a number of iden- and thus prioritize throughput at run time over total recovery time. tical stored procedures (SPs). Each SP removes tuples from its However, in real-time systems in which recovery time can be cru- input stream, and then inserts these tuples into its output stream. cial, weak recovery can provide a significant performance boost We assume that the dataflow graph must execute in exact sequen- while also improving run-time throughput. tial order. In H-Store, the scheduling request of a new transaction must come from the client, and because the dataflow order of these transactions must be maintained, transactions cannot be submitted 5. RELATED WORK asynchronously. Serializing transaction requests severely limits H- In the early 2000’s, there was a lot of interest in the database Store’s performance, as the engine will be unable to perform mean- community for stream processing. The main goal of this work was ingful work while it waits for a client request (as discussed in Sec- to process continuous queries with low latency as data streamed tion 4.1.2). In S-Store, a PE trigger can activate the next transac- into the system. This was largely inspired by the emergence of tion directly within the PE and can prioritize these triggered trans- sensor-based applications. Many academic prototypes (Aurora / actions ahead of the current scheduling queue using its streaming Borealis [7, 8], STREAM [10], TelegraphCQ [16], NiagraCQ [17]) scheduler. Thus, S-Store is able to maintain dataflow order while were built, and several commercial products were spawned as a re- both avoiding blockage of transaction executions and reducing the sult of this work (e.g., TIBCO StreamBase, CISCO Truviso, SAP number of round-trips to the client layer. Coral8 / ESP, IBM InfoSphere Streams, Microsoft StreamInsight, Figure 7(b) shows how throughput (plotted in log-scale) changes Oracle CEP, Esper). With the exception of STREAM and Coral8, with increasing dataflow graph size (shown as number of PE trig- these systems did not support an explicit notion of transactions. gers for S-Store). H-Store’s throughput tapers due to the PE’s need STREAM did not directly claim to have transactions, but its execu- to wait for the client to determine which transaction to schedule tion model was based on logical timestamps which could be inter- next. S-Store is able to process roughly an order of magnitude more preted as transaction IDs. Batches of tuples with the same times- input batches per second thanks to its PE triggers. Our experiments tamp were executed atomically. While this could be used to pro- show that this benefit is independent of the number of triggers. vide isolation, recovery was not discussed. Furthermore, modeling transactions as the execution of an entire query graph did not allow 4.2.3 Recovery Mechanisms finer-grained transaction definitions. Similarly, Coral8 provided so- As described earlier in Sections 2.5 and 3.2.5, S-Store provides called ”atomic bundles” as a configurable isolation/recovery unit two methods of recovery. Strong recovery requires every commit- embedded in its execution model, but did not provide any trans- ted transaction to be written to the command-log. Weak recovery, actional guarantees beyond ”at least once” for processing events. on the other hand, is a version of upstream backup in which only Furthermore, none of these early systems considered integrating committed border transactions are logged, and PE triggers allow in- stream processing with traditional OLTP-style query processing. terior transactions to be automatically activated during log replay. Fault tolerance issues have been investigated as stream process- We now investigate the performance differences between these two ing systems have been moved into distributed settings [8, 16]. A methods, both during normal operation as well as recovery. few fundamental models and algorithms have been established by 2143

11. (a) Logging (b) Recovery Figure 8: Recovery Mechanisms this work [11, 25, 35], including the upstream backup technique of a dedicated acker bolt. The data source must hold the tuple un- that we leverage in our weak recovery mechanism [25]. til a positive ack is received and the tuple can be removed (sim- There have also been several efforts in addressing specific trans- ilar to upstream backup [25]). If an ack is not received within a actional issues that arise in stream processing settings. For exam- given timeout period, then the source will replay the tuple again. ple, Golab et al. have studied the concurrency control problem Storm can only provide the weaker at-most-once semantics when that arises when a sliding window is advanced (write) while it is the ack mechanism is disabled. Trident provides a higher-level being accessed by a query (read) [22]. This work proposes sub- programming abstraction over Storm which provides a stronger, windows to be used as atomic access units and two new isolation exactly-once processing guarantee based on automatic replication levels that are stronger than conflict serializability. Such a problem [5]. While these guarantees ensure some level of consistency against never arises in S-Store, since window state is accessed by a single failures, they are not sufficient to support atomicity and isolation TE at a time (and never by TEs of different SPs). As another ex- as in the case of ACID guarantees. Furthermore, Storm focuses on ample, Wang et al. have considered concurrency issues that arise purely streaming topologies and thus lacks support for dealing with when adding active rule support to CEP engines in order to monitor persistent state and OLTP transactions. and react to streaming outputs [38]. In this case, the rules may re- Spark Streaming extends the Spark batch processing engine with quire accessing state shared with other queries or rules. This work support for discretized streams (D-Streams) [40]. Analytical com- defines a stream transaction as a sequence of system state changes putations are divided into a series of stateless, deterministic trans- that are triggered by a single input event, and proposes a timestamp- formations over small batches of input tuples. Like STREAM, tu- based notion of correctness enforced through appropriate schedul- ples are processed atomically within each of these batches. All state ing algorithms. S-Store investigates transactional stream process- in Spark Streaming is stored in in-memory data structures called ing in a more general context than active CEP engines. Resilient Distributed Datasets (RDDs). RDDs are partitioned and Botan et al.’s work was the first to recognize the need for an immutable. Like Storm+Trident, Spark Streaming provides exactly- explicit transaction model to support queries across both stream- once consistency semantics. Furthermore, the RDD-based state ing and stored data sources [13]. This work proposed to extend management model incurs high overhead for transactional work- the traditional page model [39] to include streams of events (as loads that require many fine-grained update operations (due to main- time-varying relations) and continuous queries (as a series of one- taining a large number of RDDs and managing their lineage). time queries activated by event arrivals). As a result, each one- Several of the new-generation streaming systems adopt a state- time query execution corresponds to a sequence of read/write op- ful dataflow model with support for in-memory state management. erations, and operations from one or more such sequences can be SEEP decouples a streaming operator’s state from its processing grouped into transaction units based on the application semantics. logic, thereby making state directly manageable by the system via Transactions must then be executed in a way to ensure conflict se- a well-defined set of primitive scale-out and fault-tolerance op- rializability and event arrival ordering. Thus, this work focused on erations [20]. SEEP has also been extended to support iterative the correct ordering of individual read/write operations for a single cyclic computations [21]. Naiad extends the MapReduce model continuous query, and not so much on transaction-level ordering with support for structured cycles and streaming [32]. Naiad’s for complex dataflow graphs like we do. timely dataflow model uses logical timestamps for coordination. Recently, a new breed of stream processors has emerged. Unlike Samza isolates multiple processors by localizing their state and dis- the majority of the earlier-generation systems, these do not adopt a allowing them from sharing data, unless data is explicitly written to select-project-join operator environment. Instead, they expect the external storage [2]. Like S-Store, all of these systems treat state user to supply their own operators (UDF’s), and the system con- as mutable and explicitly manageable, but since they all focus on trols their execution in a scalable fashion over a cluster of compute analytical and cyclic dataflow graphs, they do not provide inherent nodes. Typically, these systems provide fault tolerance and recov- support for transactional access to shared state, thus their consis- erability, but do not support fully-ACID transactions. Essentially, tency guarantees are weaker than S-Store’s. they all aim at providing a MapReduce-like framework for real- Microsoft Trill is a new analytics engine that supports a diverse time computations over streaming data. Representatives include spectrum of queries (including streaming, historical, and progres- Storm [37], Spark Streaming [40], Samza [2], Naiad [32], Flink sive/exploratory) with real-time to offline latency requirements [15]. [1], MillWheel [9], and S4 [33]. Trill is based on a tempo-relational query model that incrementally Storm provides two types of semantic guarantees: at-least-once processes events in batches organized as columns. Trill’s adaptive and at-most-once. For at-least-once, each tuple is assigned a unique batching and punctuation mechanisms enable trading off through- message-id and its lineage is tracked. For each output tuple t that put for latency in case of higher loads. Both Trill and S-Store tar- is successfully delivered by a topology, a backflow mechanism is get hybrid workloads that include streaming, strive to maximize used to acknowledge the tasks that contributed to t with the help throughput while controlling latency, and are capable of in-memory 2144

12.processing of events in adjustable batch granularity. However, S- [16] S. Chandrasekaran et al. TelegraphCQ: Continuous Dataflow Store focuses more on OLTP settings with shared mutable state, Processing for an Uncertain World. In CIDR, 2003. whereas Trill focuses more on OLAP settings with read-mostly [17] J. Chen et al. NiagaraCQ: A Scalable Continuous Query state. Therefore, S-Store pays more attention to providing correct- System for Internet Databases. In SIGMOD, pages 379–390, ness guarantees in the face of concurrent access, processing depen- 2000. dencies, and failures without sacrificing performance. [18] C. Diaconu et al. Hekaton: SQL Server’s Memory-Optimized OLTP Engine. In SIGMOD, pages 1243–1254, 2013. 6. SUMMARY & FUTURE DIRECTIONS [19] A. Elmore et al. A Demonstration of the BigDAWG This paper has defined a new model of transactions for stream Polystore System (Demonstration). PVLDB, processing. We have presented the design and implementation of a 8(12):1908–1919, 2015. novel system called S-Store that seamlessly combines OLTP trans- [20] R. C. Fernandez et al. Integrating Scale-out and action processing with our transactional stream processing model. Fault-tolerance in Stream Processing using Operator State We have also shown how this symbiosis can be implemented in Management. In SIGMOD, pages 725–736, 2013. the context of a main-memory, OLTP DBMS in a straight-forward [21] R. C. Fernandez et al. Making State Explicit for Imperative way. S-Store is shown to outperform H-Store, Esper, and Storm on Big Data Processing. In USENIX ATC, pages 49–60, 2014. a streaming workload that requires transactional state access, while [22] L. Golab et al. On Concurrency Control in Sliding Window at the same time providing stronger correctness guarantees. Queries over Data Streams. In EDBT, pages 608–626, 2006. Future work includes extending S-Store to operate on multiple [23] L. Golab and T. Johnson. Consistency in a Stream nodes. We plan to address a number of research issues includ- Warehouse. In CIDR, pages 114–122, 2011. ing data and workload partitioning, distributed recovery, and dis- [24] L. Golab and T. Ozsu. Issues in Data Stream Management. tributed transaction scheduling. We also plan to investigate han- ACM SIGMOD Record, 32(2):5–14, 2003. dling of dynamic and hybrid (OLTP+streaming) workloads. [25] J.-H. Hwang et al. High-Availability Algorithms for Acknowledgments. We thank Richard Tibbets for sharing his ex- Distributed Stream Processing. In ICDE, pages 779–790, perience about StreamBase use cases, as well as Chenggang Wu 2005. and Hong Quach for their contributions. This research was funded [26] M. Isard et al. Quincy: Fair Scheduling for Distributed in part by the Intel Science and Technology Center for Big Data, Computing Clusters. In SOSP, pages 261–276, 2009. by the NSF under grants NSF IIS-1111423 and NSF IIS-1110917, and by the Maseeh Professorship in Emerging Technologies. [27] N. Jain et al. Towards a Streaming SQL Standard. PVLDB, 1(2):1379–1390, 2008. [28] T. Johnson et al. Query-aware Partitioning for Monitoring 7. REFERENCES Massive Network Data Streams. In SIGMOD, pages [1] Apache Flink. http://flink.apache.org/. 1135–1146, 2008. [2] Apache Samza. http://samza.apache.org/. [29] R. Kallman et al. H-Store: A High-Performance, Distributed [3] Esper. http://www.espertech.com/esper/. Main Memory Transaction Processing System. PVLDB, [4] TIBCO StreamBase. http://www.streambase.com/. 1(2):1496–1499, 2008. [5] Trident Tutorial. http://storm.apache.org/ [30] A. Lerner and D. Shasha. The Virtues and Challenges of Ad documentation/Trident-tutorial.html. Hoc + Streams Querying in Finance. IEEE Data Engineering [6] VoltDB. http://www.voltdb.com/. Bulletin, 26(1):49–56, 2003. [7] D. Abadi et al. Aurora: A New Model and Architecture for [31] N. Malviya et al. Rethinking Main Memory OLTP Recovery Data Stream Management. VLDB Journal, 12(2):120–139, . In ICDE, pages 604–615, 2014. 2003. [32] D. G. Murray et al. Naiad: A Timely Dataflow System. In [8] D. Abadi et al. The Design of the Borealis Stream Processing SOSP, pages 439–455, 2013. Engine. In CIDR, pages 277–289, 2005. [33] L. Neumeyer et al. S4: Distributed Stream Computing [9] T. Akidau et al. MillWheel: Fault-Tolerant Stream Platform. In KDCloud, pages 170–177, 2010. Processing at Internet Scale. PVLDB, 6(11):1033–1044, [34] A. Pavlo et al. Skew-Aware Automatic Database Partitioning 2013. in Shared-Nothing, Parallel OLTP Systems. In SIGMOD, [10] A. Arasu et al. STREAM: The Stanford Data Stream pages 61–72, 2012. Management System. In Data Stream Management: [35] M. A. Shah et al. Highly Available, Fault-tolerant, Parallel Processing High-Speed Data Streams, 2004. Dataflows. In SIGMOD, pages 827–838, 2004. [11] M. Balazinska et al. Fault-tolerance in the Borealis [36] A. Silberschatz et al. Database System Concepts. Distributed Stream Processing System. ACM TODS, McGraw-Hill, 2010. 33(1):3:1–3:44, 2008. [37] A. Toshniwal et al. Storm @Twitter. In SIGMOD, pages [12] I. Botan et al. SECRET: A Model for Analysis of the 147–156, 2014. Execution Semantics of Stream Processing Systems. [38] D. Wang et al. Active Complex Event Processing over Event PVLDB, 3(1):232–243, 2010. Streams. PVLDB, 4(10):634–645, 2011. [13] I. Botan et al. Transactional Stream Processing. In EDBT, [39] G. Weikum and G. Vossen. Transactional Information pages 204–215, 2012. Systems: Theory, Algorithms, and the Practice of [14] U. Cetintemel et al. S-Store: A Streaming NewSQL System Concurrency Control and Recovery. Morgan Kaufmann, for Big Velocity Applications (Demonstration). PVLDB, 2001. 7(13):1633–1636, 2014. [15] B. Chandramouli et al. Trill: A High-Performance [40] M. Zaharia et al. Discretized Streams: Fault-tolerant Incremental Query Processor for Diverse Analytics. PVLDB, Streaming Computation at Scale. In SOSP, pages 423–438, 8(4):401–412, 2014. 2013. 2145