The Case for a Signal-Oriented Data Stream Management System

Sensors capable of sensing phenomena at high data rates—on theorder of tens to hundreds of thousands of samples per second—areuseful in many industrial, civil engineering, scientific, networking,and medical applications. In these applications, high-rate streams ofdata produced by sensors must be processed and analyzed using acombination of both event-stream and signal-processing operations.This paper motivates the need for a data management and continu-ous query processing architecture that integrates these two differentdesired classes of functions into a single, unified software system.The key goals of such a system include: the ability to treat a se-quence of samples that constitute a “signal segment” as a basic datatype; ease of writing arbitrary event-stream and signal-processingfuncti

1. The Case for a Signal-Oriented Data Stream Management System Position Paper Lewis Girod, Yuan Mei, Ryan Newton, Stanislav Rost, Arvind Thiagarajan, Hari Balakrishnan, Samuel Madden MIT CSAIL Email: ABSTRACT like MATLAB. Signal processing operations in these external languages are usually coded in terms of operations on arrays, Sensors capable of sensing phenomena at high data rates—on the whereas most SPEs represent data streams as sequences of tuples. order of tens to hundreds of thousands of samples per second—are These sequences need be packed into and unpacked from arrays useful in many industrial, civil engineering, scientific, networking, and be passed back and forth. The conversion overheads imposed and medical applications. In these applications, high-rate streams of by this mismatch also limit the performance of existing SPEs data produced by sensors must be processed and analyzed using a when performing signal processing operations, constraining the combination of both event-stream and signal-processing operations. applicability of these existing systems to lower rate domains. This paper motivates the need for a data management and continu- Another option for building signal processing applications is to ous query processing architecture that integrates these two different use a graphical modeling package such as Simulink or LabVIEW. desired classes of functions into a single, unified software system. These systems, which offer high-level programming languages The key goals of such a system include: the ability to treat a se- (sometimes data-flow oriented), lack stream processing capabili- quence of samples that constitute a “signal segment” as a basic data ties, database-like optimization features, distributed execution, and type; ease of writing arbitrary event-stream and signal-processing the ability to integrate naturally with relational data stored on disk. functions; the ability to process several million samples per second It is both inconvenient and inefficient to just use one of these signal on conventional PC hardware; and the ability to distribute applica- processing systems as a front- or back-end to an conventional tion code across both PCs and sensor nodes. stream processor because many applications require alternating sequences of event stream and signal processing operations. 1. INTRODUCTION This paper describes the motivation and high-level architecture There is a need for data management and continuous query of a combined event-stream and signal-processing system that we processing systems that integrate high data rate event-stream and are building as part of the WaveScope project. The project’s com- signal-processing operations into a single system. This need is ponents include: evident in a large number of signal-oriented streaming applica- • A programming language, WaveScript, that allows users to tions, including preventive maintenance of industrial equipment; express signal processing programs as declarative queries detection of fractures and ruptures in pipelines, airplane wings, or over streams of data. buildings; in situ animal behavior studies using acoustic sensing; network traffic analysis; and medical applications such as anomaly • A high-performance execution engine that runs on multipro- detection in electrocardiogram signals. cessor PCs. These target applications use a variety of embedded sensors, • A distributed execution engine that executes programs writ- each sampling at fine resolution and producing data at rates as ten in WaveScript over both conventional PCs across a net- high as hundreds of thousands of samples per second. In most work and between PCs and embedded sensor nodes. applications, processing and analyzing these streaming sensor samples requires non-trivial event-stream and signal-oriented WaveScript includes several noteworthy features. Its data model analysis. In many cases, signal processing is application-specific, introduces a new basic data type, the signal segment. A signal seg- and hence requires some amount of user defined code. Current ment is a sequence of isochronous (i.e., sampled regularly in time) general-purpose data management systems fail to provide the data values (samples) from a signal that can be manipulated as a right features for these applications: stream processing engines batch. WaveScript natively supports a set of operations over signal (SPEs) such as Aurora [8], STREAM [23], and TelegraphCQ [9] segments. These include various transforms and spectral analyses, as well as more recent commercial offerings (e.g., StreamBase, filtering, resampling, and decimation operations. Another important Coral8) handle event processing over streaming data, but don’t feature of WaveScope is that users express both queries and user- provide a convenient way to write user-defined custom code to defined functions (UDFs) in the same high-level language (Wave- handle signal processing operations. In particular, they suffer from Script). This approach avoids the cumbersome “back and forth” an “impedance mismatch”, where data must be converted back of converting data between relational and signal-processing opera- and forth from its representation in the streaming database to an tions. The WaveScript compiler produces a low-level, asynchronous external language like Java or C++, or even to a separate system data-flow graph similar to query plans in traditional streaming sys- tems. The runtime engine efficiently executes the query plan over multiprocessor PCs or across networked nodes, using both compiler optimizations and domain-specific rule-based optimizations. This article is published under a Creative Commons License Agreement This position paper is primarily concerned with the main fea- ( tures of the WaveScript language and outlines some elements of the You may copy, distribute, display, and perform the work, make derivative execution engines and the optimization framework. It attempts to works and make commercial use of the work, but you must attribute the make the case for a signal-oriented streaming system, but does not work to the author and CIDR 2007. 3rd Biennial Conference on Innovative Data Systems Research (CIDR) Jan- describe design details, report on an implementation, or provide any uary 7-10, 2007, Asilomar, California, USA. experimental results.

2. Fast 1-ch Detection Interestingly, the processing needed to answer this sort of query ProfileDetector Temporal selection turns out to be similar across domains ranging from industrial mon- <t1,t2,emit> itoring to network traffic analysis. For example, in industrial mon- Audio0 <audio> itoring, vibration signals are matched against known signatures to Audio1 <audio> sync determine if a piece of equipment is about to fail. We have distilled Audio2 <audio> the issues raised by a range of applications into five key features that Audio3 <audio> <w1,w2,w3,w4> must be provided by a signal-oriented stream processing system: Enhance and classify full dataset (expensive) A data model with explicit support for signals. Most signal processing operations need to run over windows of hundreds or Beamformer <DOA, enhanced> Classifier <type, id> zip thousands of data samples. To process these windows, existing stream processing systems would either treat groups of samples as Output <DOA, combined, type, id> uninterpreted blocks of data, or would represent each sample as a separate tuple and impose sliding windows on these tuples as they Figure 1: Marmot call detection, direction-of-arrival estimation, and stream into operators. In contrast, our approach is to define a new classification workflow. fundamental data type, the signal segment (SigSeg). A SigSeg is a 2. KEY FEATURES first-class object that represents an isochronous window of fixed To understand the challenges involved in building a signal ori- bitwidth samples in a data-stream. Query processing operators ented stream processor, it is helpful to first consider a specific exam- process these SigSeg objects, operating directly on the SigSeg’s ple application. Here we present an example drawn from an acous- implied time index without incurring the overhead of explicit tic sensor network deployed by scientists to study the behavior of per-sample timestamps. It is important to note that our data model the yellow-bellied marmot1 (Marmota flaviventris) in the wild [24]. is not completely isochronous; although readings within a SigSeg The idea is to first use one audio channel to detect the possibility of are assumed to be isochronous, individual tuples (each containing a nearby marmot. A detection triggers processing on multiple au- SigSegs) may arrive asynchronously. dio channels to detect, orient, localize, and classify the most likely This approach should yield a substantial performance improve- behavior of one or more of these rodents. ment over traditional approaches. Keeping samples within a SigSeg Each sensor produces samples of acoustic data at a certain con- isochronous is one key to providing this performance boost, be- figurable frequency—typical data rates are 44 KHz per sensor, and cause there is no space overhead for explicitly storing timestamps, a typical setup [14] might include eight or ten microphone arrays and efficient time-range lookups are possible. By providing a com- each consisting of four microphones (for a combined data rate of pact data representation that can be passed by reference, we antic- about 1 MHz). The following questions are of interest: ipate that using SigSegs will also improve cache performance, a critical factor in processing-intensive applications. 1. Is there current activity (energy) in the frequency band corre- Integrated language for expressing queries and UDFs. Wave- sponding to the marmot alarm call? Script combines relational and signal processing operations in one language, for the reasons mentioned in Section 1. 2. If so, which direction is the call coming from? Use that di- rection to enhance the signal using beamforming on a four- Efficient runtime execution. The runtime system must minimize channel array. in-memory copying and avoid scheduler overhead. First, it is essen- tial to use reference counting on SigSegs to eliminate copies as data 3. Is the call that of a male or female? Is it a juvenile? When flows between operators. Second, the scheduling discipline must be possible, identify and distinguish individual marmots. designed to allow inter-operator control flow to bypass the sched- uler where possible, while still supporting query plans with branch- 4. Where is each individual marmot located over time? ing and enabling intra-query parallelism via multi-threading. Extensible optimization framework. We aim to provide an opti- 5. Are marmots more responsive to alarm calls from juveniles? mization framework that supports database-like commutativity and Are males and females commonly found in mixed or separate merging optimizations, rule-based optimizations similar to extensi- groups? Are juveniles predominantly located near males or ble query optimizers [16,26] to express signal processing optimiza- females? Do subsets of individuals tend to stay together? tions over query plans, and various compiler optimizations. Figure 1 shows a block diagram representing the processing steps Distributed execution. WaveScope targets many applications that required by the first two queries shown above. The first query uses must be implemented using distributed sensors. To address this, continuous spectrum analysis to estimate the energy content in the the query plan must be divided into segments that run on differ- frequency range of interest, followed by a smoothed noise estima- ent nodes, and each segment must be compiled and optimized to tor and threshold detector with hysteresis to capture complete calls; run on the appropriate target platform. Finally, inter-node links in this is labeled “Fast 1-ch Detection” in Figure 1. The second query the query plan must efficiently support both wired and wireless net- is implemented by first extracting these time windows of interest works. from the historical data recorded at a multi-channel acoustic array; this is labeled “Temporal Selection”. Next, the query estimates the 3. DATA AND PROGRAMMING MODEL direction of arrival (DOA) of the marmot call using an Approx- In this section, we summarize the WaveScope data model, and imate Maximum Likelihood (AML) beamforming algorithm, en- discuss how queries and operators are programmed. hances the signal by phase-shifting and combining the four separate channels, and finally passes the enhanced signal to a classification 3.1 Data Model algorithm; this is the “Enhance and Classify” box. WaveScope models data as a stream of tuples. Each tuple within a 1 A medium-sized rodent common in the western US. stream is drawn from the same schema, and each field in the schema

3.has a type. Field types are either primitive types (e.g., integer, float, A single, integrated language for queries, reusable subquery- character, string), arrays or sets, tagged unions (variant records), or constructors, and POD functions yields a number of advantages. signal segments (SigSegs).2 A SigSeg represents a window into a First, by using a single data model, WaveScript avoids the com- signal (time series) of fixed bitwidth values that are regularly spaced plexities of mediating between the query itself and user-defined in time (isochronous). Hence, a typical signal in WaveScope will functions residing in an external language (such as C). Further, it be represented by a stream of tuples, where each tuple contains a enables type-safe construction of queries. In contrast, a language SigSeg object that represents a fixed sized window on that signal. like SQL is frequently embedded into other languages, but the pro- A SigSeg object is conceptually similar to an array, in that it pro- grammer that embeds the query is given no compile-time guarantee vides methods to get values of elements in the portion of the signal about the well-formedness of that SQL query. it contains and determine its overall length. However, SigSegs also We illustrate WaveScript with a code snippet from the marmot contain a timebase that specifies the measurement times of values in detection query. ProfileDetect is a subquery-constructor that the SigSeg and provides a set of methods for comparing and map- connects together a number of more basic stream-processing oper- ping between signals sampled from sensors at different rates. Al- ators in a reusable way. It instantiates a series of stream-processing though values within a SigSeg are isochronous, a data stream itself operators which search a data stream for windows matching a given may be asynchronous, in the sense that the arrival times of tuples frequency profile (see Figure 1). will not be spaced completely regularly in time (this is particularly fun profileDetect(S, scorefun, <winsize,step>, threshsettings) { likely to be true of streams in the middle of a WaveScope plan, after // Window input stream, ensuring that we will hit each event filtration and transformation operations have been applied.) wins = rewindow(S, winsize, step); Streams of tuples in WaveScope follow pass-by-value (copying) // Take a hanning window and convert to frequency domain. semantics between operators, including tuples containing SigSegs. scores : Stream< float > scores = iterate(w in hanning(wins)) { Pass-by-value can be implemented in several ways. For example, // Compute frequency decomposition in the case of SigSegs, the implementation will likely include per- freq = fft(w); formance optimizations to reduce the cost imposed by copy seman- // Score each frequency-domain window emit (scorefun(freq)); tics, e.g., using reference-counting and copy-on-write, which are }; discussed in more detail in Section 4. The implementation of the // Associate each original window with its score. data model has important performance implications, but should not withscores : Stream<float, SigSeg<int16>> withscores = zip2(scores, wins); affect application semantics. // Find time-ranges where scores are above threshold. Regardless of the particular implementation, the WaveScope data // threshFilter returns <bool, starttime, endtime> tuples. model treats SigSegs as first-class entities that are transmitted in return threshFilter(withscores, threshsettings); } streams, and may be stored, processed, and forwarded at will. This is unlike other streaming systems that impose windows on individ- Note that WaveScript is statically typed, but types are in- ual tuples as a part of the execution of individual operators, but ferred from variable usages using standard techniques [22].3 do not allow the windows themselves to be manipulated. By mak- In the above program, wins, scores, and withscores are all ing SigSegs first-class entities, windowing can be done once for a streams of tuples. The type of withscores, for example, is whole chain of operators, and logical windows can be stored and Stream<float,SigSeg<int16>>. Notice that we allow a tuple passed around dynamically, rather than being defined by the query to contain SigSegs, and set-valued types. plan. Packing blocks of readings together in SigSegs is natural for ProfileDetect first creates a new windowed view of the many signal processing operations that operate on fixed sized win- samples in the input stream using the rewindow operator. Here, dows, and is much more efficient for high data rate operations as the input stream S is divided into windows of size winsize that operators are invoked on blocks of hundreds or thousands of sam- advance by step. These values are application-specific: winsize ples at once. determines the resolution of the frequency decomposition, while In this paper we focus on one dimensional signals (e.g., streams step determines how sparsely the stream is sampled. For example, of audio and vibration data), but in general the WaveScope data for events shorter than one window, step must be chosen such that model also supports SigSegs which refer to multidimensional sig- adjacent windows overlap, whereas longer events can be detected nals (e.g., streams of images). with sparser sampling of the channel. The detector then computes the frequency decomposition of each individual window, and passes this frequency map to a custom scoring function to generate 3.2 Programming Model a match score. It accomplishes this using the iterate construct A compiled WaveScript program is a data-flow graph of stream (discussed below) to execute code against each individual window operators. The WaveScript source, however, is a script that gen- in the stream. Finally, the detector zips together the original data erates this graph. Within this script, the user may define and in- with the computed match score. voke reusable stream-graph constructor functions. We call these Zip2 is a simple operator that merges two streams pair-wise, syn- subquery constructors. These, like macros, evaluate at compile time chronously. Since data streams in general are asynchronous, zip- to produce clusters of connected stream operators. In addition, the ping only works properly when it is known that there will be a one- user writes POD functions (plain-old-data functions), which in our to-one correspondence between the input streams being zipped. Of- context refer to functions that neither produce nor consume streams. ten a more sophisticated strategy is needed to merge streams. Zip2 These functions may be used from within the bodies of stream oper- turns out to be good enough in this case because all its inputs are ators as they process data, and may be either inlined (like subquery derived from the same source stream. constructor functions) or remain separately compiled. Note that operations like hanning, fft and rewindow are library functions—common signal processing operators included 2 The WaveScript source language also allows user defined functions to 3 process nested tuples, as well as polymorphic tuples (where not all field Type annotations may optionally be included for clarity. For example, types are determined). These features, however, disappear during the the marmot-detection code in the appendix contains type declarations compilation process. for each top-level function.

4. Class Examples fun sync2 (ctrl_strm, S1, S2) { POD Functions arithmetic, SigSeg operations, S3 = union3(S1, S2, ctrl_strm); (built-in, linked from C, or user-defined) timebase operations, FFT/IFFT, S4 = iterate (tagged in S3) { Subquery-Constructors profileDetect, classify state { acc1 = NullSeg; beamForm, sync,zip acc2 = NullSeg; } Fundamental Stream Operators iterate, union switch(tagged) { case Input1 v : acc1 := append(acc1, v); Table 1: Classes of Programming Constructs in WaveScript. case Input2 v : acc2 := append(acc2, v); case Input3 <flag,t1,t2> : with WaveScope. We are developing an extensive library of such if (flag) functions, but do not detail them here. then emit <subseg(acc1,t1,t2), subseg(acc2,t1,t2)>; Table 1 lists the different classes of programming constructs acc1 := subseg(acc1, t2, acc1.end); that are available in WaveScript. Subquery-constructors may be acc2 := subseg(acc2, t2, acc2.end); application-defined (like profileDetect) or may be defined } } within the WaveScript library. In either case they may be aug- return S4; mented with optimization rules as discussed in Section 5. Built-in } POD functions are low-level primitive operations (or externally linked C functions) that cannot or should not be implemented in Figure 2: A 2-input temporal synchronizer with error-handling omitted. WaveScript. The basic stream operators (iterate, union) are enhanced versions of the raw input data. Finally, this enhanced sig- special programming constructs for manipulating streams and are nal is fed into an algorithm that classifies the calls by type (male, the fundamental operators recognized by the runtime engine (as female or juvenile), and, when possible, identifies individuals. discussed in Section 4.1). Defining custom operators: The queries we have see thus far pri- Main query body: Next, we will take a look at the body of a Wave- marily wire together existing operators, rather than writing custom Script program for detecting and classifying marmots based on their ones. Although we seek to provide a broad array of useful opera- audio calls (shown graphically in Figure 1). The first thing to do is tors in the WaveScope libraries, it is likely that most applications configure the data sources. This can occur in the query file, or sep- will require custom operators that in turn may invoke user-defined arately in an included catalog file: POD functions. Ch0 = AudioSource(0, 48000, 1024); In WaveScope, the user employs the iterate construct to con- Ch1 = AudioSource(1, 48000, 1024); struct a one-input, one-output operator that invokes a user-provided Ch2 = AudioSource(2, 48000, 1024); Ch3 = AudioSource(3, 48000, 1024); block of code on its input stream. For example, the following snip- pet constructs a generic aggregation operator parameterized by the These statements declare that C0-C3 are audio streams sampled init, aggr, and out function arguments. at 48 KHz from audio channels 0 through 3 and are to be win- fun build_aggr(S, init, aggr, out) { dowed into windows of 1024 samples each. With the variables C0- S2 = iterate (x in S) { C3 bound, we can now write a basic marmot detector using pro- state { acc = init(); } fileDetect and a few other subquery constructors defined in Wave- acc := aggr(acc, x); emit out(acc); Script. } control = profileDetect(Ch0, marmotScore, <64,192>, return S2; <16.0, 0.999, 40, 2400, 48000>); } // Use the control stream to extract actual data windows. Within the iterate construct, state{} is used to declare persistent datawindows = sync4(control, Ch0, Ch1, Ch2, Ch4); beam<doa,enhanced> = beamform(datawindows, arrayGeometry); (static) state, and an emit produces a value on the output stream marmots = classify(beam.enhanced, marmotClassifier); (S2). In any given iteration, emit may be called zero or more times. The code inside a iterate construct is restricted to enable efficient return zip2(beam, marmots); compilation to a low-level imperative language (such as C). But The above query first uses profileDetect as a real-time WaveScript is expressive enough to enable us to write nearly all of prefilter on one of the four audio channels. The result is a our library operators directly in the language. It allows condition- <bool,time,time> control stream, which is used to “snap- als, for-loops, arrays, and handles SigSegs and timebases. In fact, shot” certain time ranges and discard others. Sync4 accomplishes the sync4 primitive shown in the marmot-query above is written this by aligning the four streams in time, and extracting the data directly in WaveScript. In figure 3.2 is a definition for sync2 (the contained within the time-ranges specified by the the control two-input version of sync4) with error-handling omitted. sync2 stream—in this case representing the time-ranges suspected by the takes three inputs: one control stream of the same type as sync4, profile-detector as containing marmot calls. The inputs to Sync4 and two streams of SigSegs, are of type Stream<SigSeg<int16>> for the data streams and The union family of operators are the only primitive operators Stream<bool,time,time> for the control stream. Note that allowed to take multiple input streams. Union3 takes three input int16 could be replaced by any type: the audio channels happen streams and produces a value on its output stream whenever a to carry int16 samples, but sync4 is generic. value arrives on any input. The output value must be tagged for Next, the synchronized windows of data from all four audio chan- downstream operators to distinguish which input stream created the nels are processed by a beamforming algorithm. This is the second value. To accomplish this tagging in a type-safe way, WaveScope processing phase which makes the query multi-pass. The algorithm allows tagged union types (also called “variant records”, similar computes a direction-of-arrival (DOA) probability distribution, and to type-safe enums) [22]. In particular, the variable tagged has enhances the input signal by combining phase-shifted versions of three variants: Input1, Input2, and Input3 corresponding to which the four channels according to the most likely direction of arrival. channel the data arrived on. The switch/case construct uses The beam function returns a stream of two-tuples. We use a special pattern matching to dissect variants and their fields. The Input1 binding syntax, “beam<doa,enhanced> = . . . ”, to give temporary and Input2 cases carry SigSeg values from S1 and S2 respectively. names to the fields of these tuples. That is, beam.doa projects a These SigSegs are appended to their respective accumulators.4 The 4 stream of direction-of-arrivals, and beam.enhanced contains the In the full version of sync2, exception handling would be required to

5.state of the iterate operator at any point in time is simply two Query plan Thread pool & scheduler SigSegs (acc1 and acc2). Sync2 accumulates both signals until I it receives a message from the control stream. The control stream I provides a time-range and a boolean flag indicating whether to S I U discard or snapshot that range. The subseg function is used to crop the accumulators; it produces a new SigSeg that represents a I I sub-range of the input SigSeg. Result By using a union operator together with a state-carrying SigSegs iterate construct, it is possible to implement arbitrary synchro- Audio ADC 48KHz timebase nization policies (e.g., different buffering policies and temporal sensitivities). We have designed WaveScope in this way because CPU timebase there are a plethora of viable application specific synchronization policies. The WaveScript library, however, will include a suite of TimeSeries generally useful synchronization operators, such as sync2. 4. SYSTEM ARCHITECTURE Memory manager Timebase conversion graph Queries in WaveScope are WaveScript programs that make use Figure 3: An example of a WaveScope query running in an execution of a library of built-in signal processing functions. A WaveScope engine of a node. Node types: S: source, I: iterate, U: union. A seg- query, initially of the form we discussed in Section 3.2, must go ment of one of the timeseries has been garbage-collected. Two of the through a number of stages to reach deployment and execution. timeseries share the same timebase. input stream. The code inside the function may call an emit • Preprocessor: Eliminates syntactic sugar and type-checks construct to output tuples, which will be routed to all of the the WaveScript source. operator’s successors in the query plan. • Expander: Inlines all subquery-constructors and many • union: Union is the only operator in compiled query plans POD functions, erasing abstraction boundaries and leaving a that takes multiple inputs and is the primitive upon which dataflow graph of basic operators—the query plan. stream synchronization operations, such as sync and zip are • Optimizer: Applies a number of inter- and intra-operator built. Our previous example code for sync2 illustrates its use. optimizations described in Section 5. (Mixed with expander phase.) • source: A source operator interfaces with I/O devices to produce signal data that feed to operators in the query • Compiler: Generates the query plan, which will be in a low plan. For example, the AudioSource in the marmot detec- level imperative language (e.g., C) for efficiency. The query tion application (Section 3.2) continuously samples audio plan wires together functions compiled from each iterate data from a sound card, and invokes the memory manager operator in the original WaveScript query, and links against (Section 4.2.2) to window contiguous chunks of audio into the WaveScope runtime library. SigSegs. These SigSegs are wrapped within tuples and emitted to operators that perform the actual processing. In • Runtime: The runtime library for single-node execution con- the implementation, we envision that source operators will sists principally of three modules: a scheduler, a memory usually live in their own threads to avoid blocking on I/O. manager, and a timebase manager (Section 4.2). Although the runtime engine only needs to handle the small col- The final step in compilation entails compiling the query plan to lection of primitive operators described above (simplifying imple- machine code and executing the query. Since WaveScope will also mentation), the WaveScript compiler and optimizer will recognize support distributed query execution and network deployment (e.g., the special properties of many signal processing functions (e.g., on a sensor network or PC cluster), there is an additional phase FFT, IFFT, Convolve). This enables several high-level query op- where query plans are disseminated to nodes in the network. (Sec- timizations, as discussed in Section 5. tion 4.3). 4.1 Compiled Query Plans 4.2 Single-Node Runtime After the expander has produced an initial query plan, the opti- Once deployed on a physical node, the WaveScope program runs mizer performs multiple passes over the plan (Section 5) and gener- within the execution engine. We provide an example of an audio ates an optimized query plan. The final query plan is an imperative processing query running within an engine of a single node in Fig- program, corresponding to an Aurora-style directed graph that rep- ure 3. The figure demonstrates the three main modules of the en- resents the flow of streaming data through the program. The only gine: the scheduler, the memory manager, and the timebase man- operators that survive to this point are iterate, union, and spe- ager. Below, we describe the functions of each of these subsystems cial source operators for each data source in the query. in more detail. • iterate: An iterate operator is the basic workhorse of a 4.2.1 Scheduler WaveScope query. Each iterate construct in a WaveScript The WaveScope scheduler chooses which operators in the query program is compiled to an imperative procedure in the query to run next, and provides the tuple passing mechanisms by which plan, optionally with a piece of persistent state. This function operators communicate. The scheduler maintains a thread pool, and is repeatedly invoked on each of the tuples in the operator’s assigns idle threads to available operators in the query graph. An handle the case where the SigSegs cannot be appended because they are operator is available if its input queues are non-empty and it is not not adjacent, or are not in the same timebase. already being executed by another thread.

6. A good scheduler should possess the following desirable proper- create sigsegs, batches chunks of isochronous samples into ties: SigSegs, which are returned to the application and in turn passed to Compact memory footprint to prevent thrashing, especially on subsequent operators in the query plan. embedded platforms with little RAM. Compactness includes reduc- Operators in the query plan pass SigSegs between each other. ing the overhead of allocating and deallocating memory, which can Semantically, copying a SigSeg is equivalent to making a copy of place a heavy burden on the operating system when WaveScope the underlying signal data. However, to scale to high data rates and operates on high-rate streams. reduce in-memory copying overhead, we plan to use an implemen- tation where SigSegs are passed by reference with copy-on-write Cache locality, which affects performance and leads to several im- semantics. In this implementation, the signal store automatically portant design tradeoffs. For example, executing an operator until garbage collects signal data using standard reference counting. its input queues are drained offers good instruction cache locality SigSegs could overlap in range, so maintaining the correct refer- (operator code stays in cache) and data cache locality (operator’s ence counts requires some care. In addition, to ensure that a SigSeg internal state stays in cache). However, processing many input tu- always refers to valid data, WaveScript restricts how they can be ples in one sweep may lead to producing many output tuples, which created to three interfaces: create sigsegs, which appends new could dramatically grow the size of the output queues. On the other samples to an existing timeseries; append, which creates a SigSeg hand, executing each operator one tuple at a time may incur more by joining two existing adjacent SigSegs; and subseg, which cre- CPU cache misses, but could help lower the memory footprint. ates a subrange of an existing SigSeg. The latter is useful in sev- Fairness helps avoid starving parts of the query plan. Devoting dis- eral applications (including the acoustic monitoring application de- proportionate time to the execution of any particular operator may scribed earlier) that identify and lookup an interesting time range of result in a buildup in its output queues. Fairness is also important data for further processing. for multi-input operators (such as Sync and Join), where the skew in arrival rates of input streams may cause accumulation of data 4.2.3 Timebase Manager in internal buffers, and lead to delays in materializing the output Managing timing information corresponding to signal data is a tuples. common problem in signal processing applications. Signal process- Because of possible differences in the sampling rates of sources ing operators typically process vectors of samples with sequence and the disparity in the costs of computing different input streams numbers, leaving the application developer to determine how to to such operators, achieving fairness is an interesting scheduling interpret those samples temporally. For example, a decimation fil- problem which may require run-time profiling of each operator in ter that halves the input rate takes in 2N samples and outputs N the query plan. Since profiling of operators may be relatively expen- samples—but the fact that the rates of the two vectors are different sive, our design may consider profiling the system and adjusting the must be tracked separately by the application. schedule occasionally when rate skew is detected. To address this problem, WaveScope introduces the concept of a timebase, a dynamic data structure that represents and maintains a Scalability with the number of processors or CPU cores. A good mapping between sample sequence numbers and time units. As part scheduler should minimize the amount of thread synchronization of its metadata, a SigSeg specifies a timebase that defines what its and ensure affinity between operators and CPU cores to maintain sequence numbers mean. Based on input from signal source drivers cache residency of data and code. and other WaveScope components, the timebase manager maintains One possible design we intend to investigate divides a query plan a conversion graph (shown in Figure 3) that denotes which conver- into “slices,” constraining the operators in each slice to only exe- sions are possible. In this graph, every node is a timebase, and an cute on a particular CPU core. The scheduler may run one thread edge indicates the capability to convert from one timebase to an- per core, and schedule the order of execution within each slice sepa- other. The graph may contain cycles as well as redundant paths. rately, which avoids synchronization on centralized data structures. Conversions may be composed along any path through the graph; Determining a partitioning of the query plan that maximizes steady- when redundant paths exist, a weighted average of the results from state throughput is an interesting problem that we plan to investi- each path may result in higher accuracy [18]. gate. WaveScope will support several types of timebases: High-throughput tuple passing. The choice of the scheduler can also help to entirely eliminate queuing in the system. If the sched- • Analytic timebases represent abstract units, e.g., Seconds uler design gives up some flexibility in choosing the next operator (since the epoch), Hertz (for frequency domain signals), to execute, and instead traverses the query plan along the opera- Meters, and Sequence Numbers—essentially, anything that tor connections (for example, in depth-first order), it can use direct could represent the independent variable (“x axis”) of a operator-to-operator function calls, passing tuples by reference. Of signal or time series. For example, a SigSeg of data sampled course, assuring fairness in such schemes may be more difficult be- at one sample per second would have a timebase of Seconds, cause of extra constraints on the order of execution. since each sequence number would correspond directly to that number of seconds. 4.2.2 Memory Management • Derived timebases represent a linear relationship to an ex- The task of the memory manager is to provide a simple and ef- isting “parent” timebase. For example, a SigSeg sampled at 2 ficient way for operators to create, access and garbage collect all samples per second would derive its timebase as 2 × Seconds. signal data flowing through the query plan. All signal data in a WaveScope query originates either from • Empirical timebases represent a real free-running clock, source operators in the query plan (e.g., sensors or network e.g., a CPU clock or a sample clock. Conversions associ- sockets), or from intermediate operators that create new signals as ated with empirical timebases are empirically derived by output (e.g., FFT). Conceptually, these operators invoke a memory collecting measurements that associate clock values in dif- manager API call to continuously append signal samples to a time ferent timebases. Since clocks are locally linear, conversions series in the in-memory signal store. This API call, which we term to empirical timebases can generally be approximated by

7.Each Sensor Node over the network, the signal data they contain must be marshaled ToRegion and transmitted, rather than only transmitting the metadata. This ProfileDetector <t1,t2,emit> ToRegion operation can be expensive, but there will be instances in which it can be optimized. For example, if a SigSeg is forwarded through Uniquify Union ToRegion, and all of the nodes in the region will want the data, a Audio0 <w> multicast distribution tree might be the most efficient way to dis- Audio1 <w> sync seminate the signal data. Alternatively, if only a few of the target Audio2 <w> Audio3 <w> <w1,w2,w3,w4> nodes will want to process the data, a lazy transmission scheme could be used, in which the metadata is disseminated, and the sig- nal data is “pulled” only by those nodes that need it. Beamformer Classifier <species> <DOA, combined> zip Although these simple primitives appear to be sufficient for this application, we expect that other primitives may arise as we explore ToCollector <species, DOA, combined> new applications. Another common example of a distributed query is the case where each node maintains a local persistent store of data Backend (e.g., data that might be needed in the future but is not currently im- <location, event, node, portant enough to send back over the network). If the sync operator species, DOA, combined> Localization ToCollector spills the raw signal data to disk, queries could be installed post- Global Beamforming Classfication Log facto to perform further analysis of that data. Such a query could be installed by the system operator when something particularly inter- esting was observed, and the results sent back via ToCollector. Figure 4: A distributed query for marmot localization. In Figure 4, the query defines the desired physical network mapping by explicitly specifying network links and where different piece-wise linear functions. For example, the CPU clock and parts of the query are executed. In principle, WaveScope could a sound card’s sample clock are both empirical timebases. optimize the selection of these “cuts” based on a variety of metrics A relation between them can be derived by periodically including processing and network capacity. We do not envision a observing the CPU time at which a particular sample was completely autonomous cut selection algorithm in the near future; taken. rather, we plan to provide a variety of profiling tools that can inform the user about where best to make the cut. Using the timebase infrastructure, application developers can pass around SigSegs without worrying about keeping track of 4.4 Querying Stored Data the temporal meaning of the samples, because this is captured automatically in the timebase graph. For example, a SigSeg that In addition to handling streaming data, many WaveScope appli- was transformed by a decimation filter can be related to the original cations will need to query a pre-existing stored database, or histori- signal by simply converting any sequence number in the decimated cal data archived on secondary storage (e.g., disk or flash memory). version to the corresponding sequence number in the original. Since For instance, consider the query mentioned in Section 2 that tracks conversion can be composed along any path through the timebase the positions of individual marmots as a function of time. In order graph, this is even true across multiple composed decimations. to identify an individual marmot from an audio segment, the ap- Empirical timebases can also be used to represent node-to-node plication needs to compare the segment with signatures of audio time conversion, by correlating events observed in terms of both segments from previously seen marmots and determine the closest CPU clocks. By adding these node-to-node conversions to the time- match, if any. This requires the ability to archive past events (like base graph, sensor data measured on separate nodes can be related historical marmot detections) and query this archive at some point just as data measured on the same node. in the future. We plan to provide two special WaveScope library functions that 4.3 Distributed Query Execution will support archiving and querying stored data declaratively: Support for distributed queries is a key component of many • DiskArchive, which consumes tuples from its input stream WaveScope applications. For example, consider the marmot de- and writes them to a named relational table on disk. The table tection example from Figure 1. Suppose we now want to run that name is a global, system-wide identifier. detection algorithm on embedded sensor nodes in a deployment and combine the results to localize the detected marmots via • DiskSource, which reads tuples from a named relational ta- triangulation. ble on disk and feeds them upstream. This operator is similar A workflow diagram for the localization step is shown in Fig- to the other source operators that were discussed earlier, but ure 4. The components in the upper dotted box run in parallel on in addition may support indexing signal data, and pushing each sensor node in the system, detecting and classifying marmot down predicates to allow efficient access to relevant regions calls. The components in the lower box run only on a centralized of history. server, identifying events that are common across multiple nodes, triangulating the marmots, and re-classifying based on complete Storing and retrieving large segments of signal data (both at a information. These computational domains are connected together single node, and across a network of nodes) will pose several inter- through two simple network primitives, ToRegion and ToCollector. esting research questions, including: ToRegion accepts tuples and multicasts them to all nodes in the specified region, where they are received back into the query plan. • Assuming that the goal is to support both efficient archiving The target region might be specified as a particular set of nodes, and retrieval, what is the best way to store and represent sig- a region in physical space, or by some other predicate. ToCollec- nal data on disk? For instance, compressing signal data might tor accepts tuples and forwards them along a sink tree to a node be one strategy to save space and enable faster lookup by re- identified as the collector. Note that when SigSegs are transmitted ducing disk bandwidth usage.

8. • For how long in the past should signal data be retained, and S2=autocorr(S1); S3=FFT(S2); autocorr(X) ≡ convolve(X,X) which data should be discarded first? Discard policies are convolve(X,Y) ≡ IFFT(mult(FFT(X),FFT(Y)) likely to be application specific, but the library could include S2=IFFT(Mult(FFT(S1),FFT(S1))); S3=FFT(S2); several policies for applications to choose from. For instance, Common Sub-expression some viable policies might include allowing applications to T1=FFT(S1); prioritize important data, or using progressively lossier en- S2=IFFT(Mult(T1,T1)); S3=FFT(S2); coding for older data (e.g., using wavelets). FFT(IFFT(X)) ≡ X T1=FFT(S1); • What are the best ways to index signal data stored on disk so S3=Mult(T1,T1); that lookup (e.g., matching audio segments against signatures of previously seen marmots) is efficient? Traditional indexing Figure 5: Diagram illustrating rewrite optimizations in WaveScope. techniques are unlikely to work for signal data [7, 19], and terms of named operators, so that when a new operator is added to indexing strategies may also need to be application specific the WaveScope library, the programmer can also add rewrite rules (e.g., audio and images have different requirements). There- for that operator (e.g., a typical rule might express that IFFT is the fore, the above mentioned operators will provide an interface inverse for FFT). to specify and make use of custom indexing policies. Some signal processing operators permit complex rewrite op- We leave these, and related questions to future work. timizations. For example, the denial-of-service detection scheme described in [17] analyzes the power spectrum of network packet 5. OPTIMIZATIONS counts to classify attacks. This technique involves an autocorrela- A restricted query language and intermediate representation, and tion operation, followed by power spectrum analysis to determine support for isochronous signal segments as first-class objects enable if activity at a particular frequency is unusual. The standard way to a wide range of optimizations in WaveScope. We illustrate some compute the autocorrelation is to convolve a signal with itself, and classes of optimizations below. the standard way to compute a power spectrum is to compute the in- tegral of the FFT of a signal. To optimize this query, the optimizer Query Plan Transformations: Optimizations such as predi- takes advantage of several signal processing identities, which can cate reordering (the compiler can determine that certain stateless be specified as rules to the optimizer as follows: iterates are merely predicates) and query merging are possible in WaveScope, similar to optimizations in other streaming database 1. autocorrelate(S ) = convolve(S , S ) systems. For example, in the marmot application, queries for both marmot classification and localization involve a common prepro- 2. convolve(X, Y) = IFFT(FFT(X)*FFT(Y)) cessing and signature extraction step. The query optimizer can statically analyze and merge portions of the two queries. 3. FFT(IFFT(X)) = X Another plan-level optimization is merging adjacent iterate operators, which has two benefits. First, fusing adjacent iterate Applying the rules reduces the above sequence of operations to operators enables optimizations across multiple user functions. finding the FFT of the packet count sequence and squaring it, as The resulting fused code can expose optimizations such as loop shown in Figure 5. This requires only Θ(n log n) operations—faster fusion or common subexpression elimination. Second, because than the original implementation which performs Θ(n2 ) operations, each iterate corresponds to a different operator in the wiring where n is the length of the packet-count time-series. diagram, reducing the number of operators can reduce scheduling overhead (at the cost of decrease in potential parallelism). 6. RELATED WORK Conversely, the optimizer can factor iterate operators to sep- WaveScope is related to streaming data management systems [1– arate out processing on the input (or output) channel that does not 3, 8, 9, 23]. The key differentiating feature of WaveScope is that depend on the operator’s state. This, in turn, offers further opti- it provides a single language and a unified framework that inte- mization possibilities. For example, consider multiple iterate grates both stream and signal processing operations. This also dis- operators that first apply an FFT to their input. The compiler can tinguishes WaveScope from existing math, scientific and engineer- factor these FFTs out into their own, stateless, iterate operators. ing tools like MATLAB, Simulink and LabVIEW [4–6] that have Then, if the original iterates are applied to the same stream, excellent support for signal processing functions but unsatisfactory query merging can eliminate the redundant FFT computation. This support for event-stream operations. scenario is common in signal processing. For example, a common There is a large body of related work on temporal and sequence way to implement speaker identification involves computing the databases (databases containing time-varying data) that provide a aggregate power (area under the FFT) of overlapping windows context for our research, including SEQ and Gigascope [10, 27]. in an audio signal in both preprocessing and feature extraction These systems do view time-series data as first class objects, but are stages. WaveScope eliminates one redundant FFT computation per targeted at simpler trend analysis queries, as opposed to our effort, window, yielding significant savings. These optimizations become which is focused on supporting more complex signal processing particularly important when the user is relying heavily upon high- operations. level subquery constructors contained in libraries. They may not The compiler technology in WaveScope is related to the authors’ be aware of redundant computations within the abstractions they previous work on the Regiment programming language [25]. In invoke. fact, many programming languages have been proposed for stream- processing, and many general purpose languages include streaming Domain-specific Rewrite Optimizations: WaveScope will sup- features [29]. For example, StreamIt system [30] is a programming port a rule-based framework for rewrite optimizations that rely on language and compiler that targets high performance streaming ap- domain-specific properties of particular signal processing library plications. StreamIt’s compiler backend targets architectures rang- functions. This framework is similar to previous work on extensi- ing from highly-parallel research platforms to commodity unipro- ble relational optimizers [15, 26]. Optimization rules are written in cessors. StreamIt, however, is based on a synchronous data-flow

9.model where the data production and consumption patterns of op- Sailesh Krishnamurthy, Samuel R. Madden, Vijayshankar erators are static and known at compile time. WaveScope, in con- Raman, Fred Reiss, and Mehul A. Shah. Telegraphcq: trast, is targeted at the asynchronous data-flow world where opera- Continuous dataflow processing for an uncertain world. In tors are less predictable and produce data at varying rates. Instead, CIDR, 2003. our proposal exploits isochrony to leverage some of the benefits of [10] C. Cranor, T. Johnson, O. Spataschek, and Vladislav synchronous data-flow. Shkapenyuk. Gigascope: a stream database for network Ptolemy II [21] is widely-used data-flow system for modeling applications. In SIGMOD, 2003. and simulation of signal processing applications. Unlike Wave- [11] Jeremy Elson and Deborah Estrin. Time synchronization for Scope, it is not focused on efficiency or on providing a high-level, wireless sensor networks. In Proceedings of the 2001 optimizable programming language. International Parallel and Distributed Processing Symposium There has been work on individual signal processing applica- (IPDPS), Workshop on Parallel and Distributed Computing tions in the sensor network community [14, 28], but these systems Issues in Wireless and Mobile Computing, April 2001. are typically built from scratch, requiring many months of effort [12] Jeremy Elson, Lewis Girod, and Deborah Estrin. to implement. The Acoustic ENSBox [14] is a development plat- Fine-grained network time synchronization using reference form specifically designed to support distributed signal process- broadcasts. In OSDI, 2002. ing. While this has significantly reduced application development [13] S. Ganeriwal, R. Kumar, and M. B. Srivastava. Timing-sync time, it requires error-prone C programming and lacks an interac- protocol for sensor networks. In Proceedings of the First tive, query-like interface. ACM Conference on Embedded Networked Sensor Systems Time synchronization is a critical aspect of distributed sensing (SenSys), November 2003. applications, and a fertile topic in the sensor network commu- [14] Lewis Girod, Martin Lukac, Vlad Trifa, and Deborah Estrin. nity [11–13, 18, 20]. The WaveScope timebase construct is similar The design and implementation of a self-calibrating acoustic to those used in the implementation of Reference Broadcast Syn- sensing system. In SenSys, 2006. chronization [12] used in the Acoustic ENSBox system [14]. [15] Goetz Graefe. Rule-based query optimization in extensible However, in WaveScope these principles are more deeply inte- database systems. PhD thesis, 1987. grated, providing a more natural interface. [16] Goetz Graefe. Query evaluation techniques for large databases. ACM Computing Surveys, 25(2):73–170, 1993. 7. CONCLUSION [17] Alefiya Hussian, John Heidemann, and Christos Today, developers of many streaming applications that require Papadopoulos. A Framework for Classifying Denial of even the simplest forms of signal processing face a devil’s choice: Service Attacks. In Proceedings of ACM SIGCOMM, August either write significant amounts of custom code in a language like 2003. Java or C++, or use a stream processing engine together with an ex- [18] Richard Karp, Jeremy Elson, Deborah Estrin, and Scott ternal system such as Simulink (MATLAB) or LabVIEW and move Schenker. Optimal and global time synchronization in data back and forth between the two. Both approaches have a num- sensornets. Technical Report CENS-0012, Center for ber of obvious flaws. In this position paper, we have sought to re- Embedded Networked Sensing, 2003. dress these problems with WaveScope, a data stream management [19] Flip Korn, H. V. Jagadish, and Christos Faloutsos. Efficiently system that combines even stream and signal processing operations. supporting ad hoc queries in large datasets of time sequences. We believe—and hope to demonstrate soon—that WaveScope will In SIGMOD, 1997. be significantly faster and more usable than current approaches. [20] B. Kusy, P. Dutta, P. Levis, M. Mar, A. Ledeczi, and D. Culler. Elapsed time on arrival: A simple and versatile 8. ACKNOWLEDGMENTS primitive for canonical time synchronization services, in press, 2006. We thank Kyle Jamieson for his contributions to the ideas de- [21] Edward A. Leet. Overview of the ptolemy project. Technical scribed in this paper. This work was supported by the National Sci- Report Technical Memorandum No. UCB/ERL M03/25, UC ence Foundation under Award Numbers CNS-0520032 and CNS- Berkeley, 2003. 0205445 and by a National Science Foundation Fellowship. [22] R. Milner. A theory of type polymorphism in programming. 9. REFERENCES Journal of Computer and System Sciences, 1978. [23] R. Motwani, J. Window, A. Arasu, B. Babcock, S.Babu, [1] M. Data, C. Olston, J. Rosenstein, and R. Varma. Query [2] processing, approximation and resource management in a [3] data stream management system. In CIDR, 2003. [4] [24] D.D. Nanayakkara and D.T. Blumstein. Defining [5] yellow-bellied marmot social groups using association [6] indices. Oecologia Montana. In Press. [7] Rakesh Agrawal, Christos Faloutsos, and Arun N. Swami. [25] Ryan Newton and Matt Welsh. Region streams: Functional Efficient Similarity Search In Sequence Databases. In macroprogramming for sensor networks. In Proceedings of Proceedings of the 4th International Conference of the First International Workshop on Data Management for Foundations of Data Organization and Algorithms (FODO), Sensor Networks (DMSN), August 2004. pages 69–84, Chicago, Illinois, 1993. Springer Verlag. [26] Hamid Pirahesh, Joseph M. Hellerstein, and Waqar Hasan. [8] D. Carney, U. Cetintemel, M. Cherniak, C. Convey, S. Lee, Extensible/rule based query rewrite optimization in Starburst. G. Seidman, M. Stonebraker, N. Tatbul, and S. Zdonik. In SIGMOD, 1992. Monitoring streams—a new class of data management [27] Praveen Seshadri, Miron Livny, and Raghu Ramakrishnan. applications. In VLDB, 2002. The design and implementation of a sequence database [9] Sirish Chandrasekaran, Owen Cooper, Amol Deshpande, systems. In VLDB, 1996. Michael J. Franklin, Joseph M. Hellerstein, Wei Hong,

10.[28] G Simon, M Mar´oti, A ` L´edeczi, G Balogh, B Kusy, A N´adas, withscores = zip2(scores, wins); G Pap, J Sallai, and K Frampton. Sensor network-based // Find time-ranges where scores are above threshold. // threshFilter returns <bool, starttime, endtime> tuples. countersniper system. In SenSys, 2004. return threshFilter(withscores, threshsettings); [29] Robert Stephens. A survey of stream processing. Acta } Informatica, 34(7):491–541, 1997. // Aggregate a stream of scored windows into labeled, contiguous [30] William Thies, Michal Karczmarek, and Saman // time ranges that are either above or below threshold. Amarasinghe. Streamit: A language for streaming threshFilter : Stream<float, SigSeg<int16>>, <float, float, int, int, int> applications. In ICCC, April 2002. -> Stream<bool,int,int> fun threshFilter(scorestrm, <hi_thresh, alpha, refract_interval, APPENDIX: MARMOT PHASE 1 FILTER padding, max_run_length>) { // Constants Below is the full source for the first pass of marmot-dection ap- startup_init = log(0.75)/log(alpha); plication. This pass performs the fast, real-time prefiltering of iterate((score,win) in scorestrm) { the audio-stream, looking for time windows containing potential state { marmot calls. Note that before each top-level function, or variable thresh_value = 0.0; we have included a type annotation of the form “name : type”. trigger = false; trigger_value = 0.0; smoothed_mean = 0.0; smoothed_var = 0.0; (Function types are written using arrows (->). These are optional, startind = 0; refract = 0; but provide useful documentation. Further, they may be used to startup = startup_init; assert more restrictive types than those that would be infered by the } type-inferencer. For example, we assert that marmotScore expects if trigger then { if win.end - startind > max_run_length then { complex numbers rather than any numeric type. print("Detection length exceeded maximum of " ++ show(max_run_length) ++ // We use rewindow, sync4, cnorm, hanning, and fft ", re-estimating noise\n"); // from the WaveScript standard library. // Reset all state variables: include ""; thresh_value := 0.0; trigger := false; //======================================== smoothed_mean := 0.0; // Main query: smoothed_var := 0.0; startind := 0; Ch0, Ch1, Ch2, Ch3 : Stream< SigSeg<int16> > trigger_value := 0.0; Ch0 = AudioSource(0, 48000, 1024); startup := startup_init; Ch1 = AudioSource(1, 48000, 1024); refract := 0; Ch2 = AudioSource(2, 48000, 1024); } Ch3 = AudioSource(3, 48000, 1024); // Over threshold; set refractory. if score > thresh_value then { // Invoke profileDetector with appropriate scoring refract := refract_interval; // function and settings. } else if refract > 0 then { control : Stream<bool, int, int> // refractory counting down control = profileDetect(Ch0, marmotScore, refract := refract - 1; <64,192>, <16.0, 0.999, 40, 2400, 48000>); } else { // Untriggering! // Use the control stream to extract actual data windows. trigger := false; datawindows : Stream<SigSeg<int16>, SigSeg<int16>, emit <true, // yes, snapshot this SigSeg<int16>, SigSeg<int16>> startind - padding, // start sample datawindows = sync4(control, Ch0, Ch1, Ch2, Ch4); win.end + padding>; // end sample startind := 0; // Final query result: } return datawindows; } else { // If we are not triggering... // Compute the new threshold. //==================================================== thresh = int_to_float(hi_thresh) * // Application POD functions and subquery-constructors sqrt(smoothed_var) + smoothed_mean; // POD function: Compute a score for a time-window of audio data. if startup == 0 && score > thresh then { marmotScore : SigSeg<complex> -> Stream<float> // We’re over threshold and not in startup fun marmotScore(freqs) { // period (noise estimation period). // Return the complex magnitude for energy trigger := true; // in particular hardcoded frequency bins. refract := refract_interval; return cnorm(freqs[6] + freqs[7] + freqs[8] + freqs[9]); thresh_value := thresh; } startind := win.start; trigger_value := score; // The main detection function. } else { profileDetect : Stream<SigSeg<int16>>, (SigSeg<complex> -> float), // Otherwise, update the smoothing filters. <int,int>, <float,float,int,int,int> smoothed_mean := score * (1.0 - alpha) + -> Stream<bool, int, int> smoothed_mean * alpha; fun profileDetect(S, scorefun, <winsize,step>, threshsettings) { delt = score - smoothed_mean; // Window input stream, ensuring that we will hit each event smoothed_var := (delt * delt) * (1.0 - alpha) + wins = rewindow(S, winsize, step); smoothed_var * alpha; // Take a hanning window and convert to frequency domain. } scores : Stream< float > // Count down the startup phase. scores = iterate(w in hanning(wins)) { if startup > 0 then startup := startup - 1; // Compute frequency decomposition // Note: our fft produces complex numbers. // We know there are no events here, freq = fft(w); // so we free from time 0 to the present. // Score each frequency-domain window emit <false, 0, max(0, win.end - samples_padding)>; emit scorefun(freq); } }; } // Associate each original window with its score. } withscores : Stream<float, SigSeg<int16>>