1. Design Considerations for High Fan-in Systems: The HiFi Approach Michael J. Franklin*, Shawn R. Jeffery*, Sailesh Krishnamurthy*, Frederick Reiss*, Shariq Rizvi*, Eugene Wu*, Owen Cooper*, Anil Edakkunni*, and Wei Hong+ + *EECS Dept., UC Berkeley Intel Research Berkeley visibility is enabled by continuing improvements in computing (e.g., wireless smart sensors) and Abstract communications (e.g., increasingly ubiquitous network connectivity). Advances in data acquisition and sensor technologies are leading towards the 1.1. Applications development of “high fan-in” architectures: In many cases, the phenomena being monitored exist in widely distributed systems whose edges consist the physical world. For example, environmental of numerous receptors such as sensor networks, monitoring using sensors is emerging as an area of great RFID readers, or probes, and whose interior interest, where the phenomena being monitored include nodes are traditional host computers organized wildlife behavior, air and weather conditions, or seismic using the principles of cascading streams and readings. Other physical monitoring applications are successive aggregation. Examples include more closely tied to human organizations, such as the RFID-enabled supply chain management, large- monitoring and control of supply chains, logistics, traffic, scale environmental monitoring, and various factories, pipelines, or energy usage. types of network and computing infrastructure In other cases, the phenomena being measured are monitoring. In this paper, we identify the key virtual, such as network and computing infrastructure or characteristics and data management challenges application monitoring. Many emerging applications presented by high fan-in systems, and argue for a combine data from both worlds to provide increasingly uniform, query-based approach towards detailed real-time models of complex, widely-distributed addressing them. We then present our initial organizations and environments. design concepts behind HiFi, the system we are These applications span many different domains but building to embody these ideas, and describe a they all share a dependence on a sophisticated computing proof-of-concept prototype. and communications infrastructure to deliver data that will provide an accurate and actionable view of their 1. Introduction domain. Such applications also vary widely in Organizations across a large spectrum of endeavors are requirements, but in general, they all depend on the becoming increasingly dependent on the availability of accuracy, timeliness, completeness, and relevance of data accurate, targeted, and up-to-the-minute information in order to support more effective decision making. about the status of their operations. This information 1.2. High Fan-in Systems – The “Bowtie” provides real-time visibility into disparate phenomena, which can be used to monitor operations, detect problems, We envision systems in which a large number (many and support both short and long-term planning. Such thousands or more) of receptors exist at the edges of the network to collect raw data readings. For example, in a *This work was funded in part by NSF under ITR grants IIS-0086057 supply chain management deployment, collections of and SI-0122599, by the IBM Faculty Partnership Award program, and sensors and RFID readers on individual store shelves (in a by research funds from Intel, Microsoft, and the UC MICRO program. retail scenario) or dock doors (in a Permission to copy without fee all or part of this material is granted warehouse/manufacturing scenario) continuously collect provided that the copies are not made or distributed for direct readings. These readings include “beeps” from low- commercial advantage, the VLDB copyright notice and the title of the function passive RFID tags indicating the presence of publication and its date appear, and notice is given that copying is by permission of the Very Large Data Base Endowment. To copy particular tagged objects (such as cases of goods), as well otherwise, or to republish, requires a fee and/or special permission from as more content-rich information from smart sensors and the Endowment higher-function tags such as temperature readings and Proceedings of the 2nd CIDR Conference, shipping histories. Asilomar, California, 2005
2. These “edge” devices produce data that will be infrastructure.2 Nonetheless, our position is that the aggregated locally with data from other nearby devices. general notion of hierarchical structure and the ideas of That data will be further aggregated within a larger area, successive aggregation and cascading streams (as and so on. This arrangement results in a distinctive presented in Section 3) that go with it are powerful bowtie topology we refer to as a high fan-in system (see concepts for organizing these complex systems, and Figure 1). A sophisticated system such as one supporting where appropriate, can provide many advantages in a nation-wide supply chain application may consist of programmability, ease of deployment, and efficiency. many widely dispersed receptor devices and many levels 1.3. Towards a Unified High Fan-in Framework of successively wider-scoped aggregation and storage. Such systems will comprise a disparate collection of Today, the state-of-the-art in building high fan-in heterogeneous resources, including inexpensive tags, information systems is a piecemeal approach — a device- wired and wireless sensing devices, low-power compute specific programming environment is used to task the nodes and PDAs, and computers ranging from laptops to edge receptors, a separate transport or information bus is the largest mainframes and clusters. used to route the acquired readings, and a database system or other data manager is used to collect and process the them. As a result, high fan-in deployments have tended to be costly, difficult, and inflexible. In contrast, our work is based on the notion that stream query processing and streaming views can serve as a unified declarative framework for data access across an entire high fan-in environment. As we discuss in Section 3, stream-oriented queries can be used to accomplish many of the data manipulation tasks required in a high fan-in system, including data cleaning, event monitoring, data stream correlation, outlier detection, and of course, aggregation. Our work builds on the growing body of work in the areas of data stream processing, sensor network databases, Figure 1 - The high fan-in bowtie and data integration, but it also addresses a number of challenges that arise from the unique properties of high This hierarchical bowtie shape arises due to two main fan-in architectures and the applications they support. reasons. First, the sheer volume of raw data produced at From a data management perspective, the most the edges of a large system could easily overwhelm a challenging new aspect that high fan-in systems bring to flatter architecture, in terms of both bandwidth and the table is the wide range they span in terms of three key processing costs. Data cleaning, filtering, and aggregation characteristics: time, space, and resources. must be done as close to the edges as possible to minimize Time – Timescales of interest in a high fan-in system data handling requirements of the system as a whole. The can range from seconds or less at the edges, to years in the bowtie shape lends itself naturally to an approach where interior of the system. At the edges of a high fan-in computation is pushed out to the lowest common ancestor system are receptor devices that repeatedly measure some (LCA) of the edge nodes that are producing data used in aspect of the world. These devices are typically any particular computation. Second, organizational concerned with fairly short time scales, perhaps on the concerns stemming from the geographic-oriented nature order of seconds or less. of many of our target applications and from the structure As one moves away from the edges, the timescales of of the organizations that deploy these systems lend interest increase. For example, in a retail RFID scenario, themselves naturally to a hierarchical structure.1 individual readers on shelves may read several times a In many situations, of course, the topology will be second, while the manager of a store may be concerned much more complicated than that implied by Figure 1 with how sales of particular items are going over the above. For example, there will be cases where course of a morning, and planners at regional and computations or data flows skip levels of the system, or corporate centers may be more concerned with longer- there may be connections at various points in the network term sales trends over a season or several seasons. to external systems as would arise when multiple organizations choose to federate parts of their information 2 Such federation, for example, is envisioned in the standards for 1 See the supply chain scenario described in Section 2 for an supply chain information sharing being proposed by the example of this. EPCGlobal organization .
3. Space - As with time, the area of geographic interest technology along with mandates from large organizations grows significantly as one moves from the edges of a high including Wal-Mart, the DoD, and the FDA are driving fan-in system to the interior. Again using the retail RFID tremendous interest in real-time SCM solutions. scenario, individual readers are concerned with a space of The promise of RFID-enabled supply chains is that a few square meters, aggregation points within the store organizations will have complete, accurate, and timely would be concerned with entire departments or perhaps visibility into all aspects of their supply chain, from the store as a whole, and regional and national centers are suppliers, manufacturing, distribution, sales, and even concerned with those much larger geographical areas. return of goods. Such visibility can enable a wide range Resources – Finally, the range of computing resources of applications, including “track and trace” of individual available at various levels of a high fan-in system also sales units (e.g., cases or cans of soft drinks, vials of pills, vary considerably, from small, cheap sensor motes on the etc.), accurate replenishment scheduling, and more edges, up to mainframes or clusters in the interior of the efficient shipping and receiving. Accurate information system. Communication resources also can range from can also greatly enhance planning and monitoring at all low-power, lossy radios at the edges, to dedicated high- levels of an organization and across organizations. speed fiber in the interior. At the base of an RFID-enabled supply chain are In addition to the fundamental issues that arise from passive EPC (Electronic Product Code) tags attached to the issues of scale along these three dimensions, there are pallets, cases, or individual units . These tags are able many other technical challenges to be addressed. These to transmit a 96-bit EPC code that identifies the include fundamental questions about how best to optimize manufacturer, type of item, and a unique serial number. and run queries in the network; how to process queries More sophisticated tags can provide additional using views over streaming data that involve aggregation, information such as sensor readings or transportation hierarchies, and time windows; how to archive detail data history. These tags are remotely sensed by RFID readers. at various points in the system; and how to build an Readers are placed at numerous locations in the supply infrastructure that is easy to deploy, manage, and adapt. chain, such as manufacturing lines, loading dock doors, forklifts and trucks, store shelves and store checkouts. 1.4. Paper Overview In a typical supply chain, a single tag is likely to be In the remainder of this paper, we present a current snapshot scanned 10 to 15 times. These readings (called “beeps”) of our work on high fan-in systems. Specifically, we represent, in aggregate, a potentially huge data stream. describe the design considerations for the HiFi system, Furthermore, current RFID technology is inherently error- which is currently under development at UC Berkeley. We prone, meaning that these streams may contain a present the overall philosophy behind HiFi, detail our initial significant amount of dirty data. The solution to both of system’s architecture and query processing approach, discuss these problems is to process the raw beeps to remove some of the open issues we are beginning to address, and erroneous data and to reduce the volume through give an overview of our initial proof-of-concept prototype aggregation and event detection. built using the TelegraphCQ adaptive data stream processor For the purposes of this paper, we focus on the “back-  and the TinyDB sensor network database system . end” of an SCM system, from suppliers to the back rooms First, however, we describe an example high fan-in of stores. As an example, consider the following levels environment in more detail. (see the left half of Figure 2): 2. A Motivating Scenario: Supply Chain • RFID Readers – the edges of the system consist Management of the readers that interrogate the tags and collect As stated in the introduction, there are numerous their values. Future SCM deployments also application scenarios that exhibit a high fan-in topology. envision the use of sensors and sensor networks to In this section, we briefly describe one such application, provide additional monitoring capabilities. Supply Chain Management (SCM). SCM presents a particularly compelling case for high fan-in systems for • Dock Doors – due to limitations on the range and several reasons. First, emerging SCM systems are large- orientation at which tags can be read, multiple scale systems that can span national or even global readers (or antennae) are arranged around a single distribution networks. These systems have natural door to reduce the probability of missed tags. aggregation points resulting in the characteristic hierarchical, high fan-in structure. Second, there is now • Warehouse – the information from all the readers widespread recognition in industry of the cost savings and in the warehouse is aggregated in a computer efficiency gains to be had by exploiting accurate, up-to- (possibly a cluster) located in the warehouse. the-minute information throughout the supply chain. Third, the impending availability of RFID and related
4. Figure 2 – High fan-in system levels with associated CSAVA processing stages in an SCM Environment • Regional Center – a location where logistics can driving system functionality. A stream-oriented query be collected and managed for an entire language is used to specify data requests and to describe geographical region, for example, the logical views of the data available in various parts of the Southwestern United States. system. The use of successive stream-oriented query • Headquarters – the main location or data center processing at each level of the system results in a flow of for the corporation. For example, Wal-Mart does data from the edges inwards. We refer to this data flow as its corporate data processing in Bentonville, AR. cascading streams. Note that this uniform approach stands in contrast to the current state-of-the-art, where Such a supply chain is a high fan-in system, where each level of the system presents its own distinct API. data collected at the edges is successively cleaned, HiFi uses stream-oriented query languages across all refined, and aggregated. As can be seen, this scenario levels, which simplifies programming and enables a wide exhibits the wide range of scales in terms of time, range of optimizations. geography, and resources typical of high fan-in systems. 3.1. Uses of Queries We use this example throughout the rest of the paper. Although stream-based queries may not be suitable for all 3. Cascading Streams in HiFi data processing tasks in HiFi, we believe that they can accomplish a wide array of tasks in a high fan-in The HiFi system provides data management infrastructure environment. We list these tasks roughly in order from for high fan-in environments. HiFi implements a uniform the edges to the interior of the system. At the edges, declarative framework for specifying data requests and transformations are chiefly involved with making sense of
5.what the receptors produce; they typically become more we term “CSAVA” (pronounced “Cassava”). CSAVA complex as data moves towards the interior. consists of five core stages of processing (clean, smooth, arbitrate, validate, and analyze) designed to translate raw Data Cleaning – Sensors and RFID readers are receptor readings into useful data for driving business notoriously noisy devices, and dealing with the poor processes. Figure 2 depicts CSAVA for our SCM quality of data they produce is one of the main challenges example. At the bottom of the figure, RFID readers feed in a high fan-in (or any receptor-based) system. We use into processing nodes on warehouse doors which, in turn, declarative queries to specify cleaning functionality for feed into the main node at the warehouse. These local single devices as well as across groups of devices. nodes aggregate and send their data to regional centers and so on. At each level, the nodes export views that Detecting Faulty Receptors – In addition to being noisy, perform successively more sophisticated functionality. receptors sometimes just fail. Depending on the actual failure mode (fail stop or fail dirty) and past history, it Receptor level: Cleaning might be possible to detect a faulty sensor. Each receptor itself performs the first step in data Conversion and Calibration – In order to produce processing and cleans the stream by filtering anomalous meaningful measurements, physical sensors typically readings that do not have a signal strength higher than require calibration. Such calibration can be quite some threshold strength_T: sophisticated and may need to be done on a continuous basis. Furthermore, the raw data produced by physical CREATE VIEW cleaned_rfid_stream AS sensors must often be converted into units that are (SELECT receptor_id, tag_id meaningful to a given application. Queries may be used to FROM rfid_stream rs do simple conversion and calibration. WHERE read_strength >= strength_T) Outlier Detection – In many monitoring systems, Dock door level: Smoothing expected events are of less immediate interest than anomalies. Queries can be used to detect and propagate Smoothing is the process of interpolating to compensate various types of outliers in a streaming environment. for lost readings and discarding anomalous readings by running a windowed aggregate (in this case, a count) Data Aggregation – While the raw data from a single over the cleaned stream. In this example, readings that receptor may not be a high volume stream, processing raw have been seen at least count_T times in a window3 are data from thousands of receptors is unsustainable. Queries considered legitimate; others are dropped: that have a wider scope of interest must necessarily summarize raw data in the form of coarse-grained CREATE VIEW smoothed_rfid_stream AS aggregate histories. (SELECT receptor_id, tag_id FROM cleaned_rfid_stream Stream Correlation – Queries are also useful for [range by ‘t1’, slide by ‘t2’] comparing and correlating data from multiple streams. GROUP BY receptor_id, tag_id Such streams may be homogeneous, as in the case of HAVING count(*) >= count_T) temperature readings from a group of identical sensors, or heterogeneous, as in the case of combining temperature Warehouse level: Arbitration readings with RFID “beeps”. The tags reported after smoothing are those that a reader is reasonably sure to have seen. Multiple nearby readers, Complex Event Monitoring – One of the main functions however, may have seen the same tag. To avoid over- of HiFi is to continuously monitor the environment for counting or other inaccuracies, data from multiple readers interesting events. Such events are not limited to simple must be arbitrated to determine where the product events at the edge. Rather, they also include composite corresponding to the tag actually is located. In this events described in terms of widely varying timescales example, the system also aggregates what each node has and geographic areas. A streaming query language, seen before passing the stream to the higher levels: suitably extended with event processing constructs, can be used to describe these events. 3.2. Multi-level Query Processing In this section, we illustrate the power of successive 3 Note that we use “range by” to specify the width of the processing of cascading streams through an example that window and “slide by” to specify its movement.
6.CREATE VIEW product_counts AS In this example, each task in CSAVA was placed at a (SELECT receptor_id, reasonable location in the hierarchy; in practice, however, count (distinct tag_id) the placement of each stage in CSAVA is flexible, FROM smoothed_rfid_stream rs provided that the appropriate data is available. For [range by ‘t3’, slide by ‘t4’] example, validation may occur lower in the hierarchy if GROUP BY receptor_id, tag_id the validating data is also pushed down (e.g., the system HAVING count(*) >= ALL could push a static relation containing expected RFID tags (SELECT count(*) to the edge of the hierarchy). We address the issue of FROM smoothed_rfid_stream operator placement in more depth in Section 4.3. [range by ‘t3’, slide by ‘t4’] WHERE tag_id = rs.tag_id 4. HiFi Design Concepts GROUP BY receptor_id)) In the previous sections we laid out the motivation, Regional center level: Validation applications, and the cascading stream processing model for high fan-in environments. Given this background, we At this point in the hierarchy, the product_counts now describe the key aspects of our emerging design for stream contains an aggregated view of products seen at the HiFi system. each warehouse or store. With this data, the regional 4.1. Hierarchical Windowed Views center can use known business rules such as “I know that the warehouse in Springfield should have 10,000 widgets” One consequence of our decision to use stream-oriented to validate that the supply chain is behaving as expected. queries as the common API throughout HiFi is that the nodes at various levels of the system can expose the data Headquarters level: Analysis they provide using views. Using views to structure Once the high-level business behavior is determined, distributed systems has long been studied in the data headquarters can analyze this data through data mining- integration and federated database literature , type query operations to understand how the supply chain and many of the techniques developed there can be used is behaving. Note that this is done in real time to drive in HiFi. There are, however, several aspects of high fan- organizational decision-making. in architectures that push the envelope of this technology. First, the views in HiFi are typically over streaming 3.3. CSAVA Discussion data and the queries are continuous. In such systems, CSAVA processing can be generalized to handle data windows play a crucial role in both query processing and from other types of receptors in other applications. In semantics. Window specifications divide unbounded general, clean involves operations over a single data item, streams into finite collections of data items over which smooth occurs over a window of data items from a single queries can be executed. Windows are specified by receptor, and arbitration occurs over streams from range and slide parameters (typically expressed in time or multiple receptors. For example, cleaning for a sensor tuples). The first parameter specifies the width of the network application involves filtering individual readings window; the second specifies how the window moves as that do not make sense (i.e., a negative sound reading) or time progresses or as new tuples arrive. are not interesting to the application, while arbitration The definition and use of streaming views with entails comparing values from multiple sensors in the windows is still an open problem. When the slide/range same area for calibration and outlier detection. of a query does not precisely match that of a view it is not In addition to the steps outlined above, there are a immediately obvious how or even if the view can be variety of auxiliary tasks that may occur throughout the exploited. For instance, exploitation is generally possible CSAVA process4. Data values retrieved from RFID if the range of the query matches (or in some cases, is readers may need to be converted from their raw RFID subsumed by) that of the view and the slide of the query is code to some organizationally meaningful handle such as a multiple of that of the view, or possibly if the slides are product ID. Additionally, there may be organizational relatively prime. This has the flavor of periodic data information, such as tracking history, which can augment processing . the bare product ID. Finally, aggregation, both in time Second, because aggregation is such a fundamental and space, occurs throughout this process, whenever the concept in HiFi, the views at each level of the system will raw data is not needed or bandwidth is scarce. often contain aggregates. When a query’s range exceeds its slide, the windows are overlapping. Evaluating aggregate queries over overlapping windows is challenging because input tuples must participate in 4 Note that these additional steps all begin with a “C” or multiple separate aggregate computations. We are “A”, thus preserving our CSAVA acronym.
7.working on techniques to efficiently share the execution 4.3. Query Planning and Data Placement of periodic overlapping windowed aggregates. Once a query is submitted to HiFi, it must be planned and Third, the hierarchical nature of the applications to be disseminated before it can be run. Query planning in a supported by HiFi emphasizes the issues of granularity high fan-in system involves a wide range of tasks. First and scope. As stated above, we expect that in general, of all, the system must identify the relevant data streams the granularity of requests will become coarser and the and determine the responsible receptors. If data is not scope larger as one moves from the edges towards the already flowing from these receptors, the system must interior of a high fan-in topology. Aggregation and union initiate data collection with appropriate settings (sample operators can be used to achieve this; however, cases that period, for instance). The system must determine the do not follow this anticipated pattern are more difficult to general flow of the data from the leaves and decide upon handle. For example, privacy and security constraints the operators needed to process, split, merge, and may restrict the detail and scope of information allowed to transform streams. Finally, the system must employ be passed to some other node in the system. participating nodes to run these operators as data flow up 4.2. Topological Fluidity the tree. A key efficiency consideration in HiFi is the Another important issue in the design of HiFi is the placement of queries and data across the nodes of the rigidity of the connections between nodes in the system. system. Given a query with a set of operators, the query In some levels of a high fan-in system, a hardwired planner must determine where in the hierarchy to place topology may be natural. For instance, it makes sense for each operator. This decision attempts to reduce overall a node keeping track of items on a shelf to be hardwired system bandwidth usage by pushing operators down the to talk to the store's node, which in turn, is likely to talk to hierarchy. Some data streams (or static relations) may not the regional node, and so forth. In such an arrangement, be visible at lower levels of the hierarchy. Thus, the each node has a static parent (or small set of parents) and query planner should tend to push operators to the lowest a relatively small static set of children with which it level at which the streams and relations it operates over communicates. With static connections at all levels, are visible. nodes require only a small amount of state to keep track Furthermore, the query planner must consider existing of the other nodes with which they communicate. queries and data flows in order to exploit shared Furthermore, both query and data flow are greatly processing. For instance, if multiple operators from simplified in a static system, as there are only a small different queries process the same underlying data stream, number of paths through the system. then it may be advantageous to pull the operators up. It is desirable in many cases to have more fluid Alternatively, it may be possible to improve the visibility connections between interior nodes. In such a topology, of some queries by pushing streams or static relations nodes would still be grouped into levels, but connections down the hierarchy. This incurs initial bandwidth costs to parents (for data flow) and children (for query flow) and complexity due to replication, but may improve would occur on an ad-hoc basis. Thus, the system can parallelism and utilization of resources, and could provide respond to runtime conditions by adding, removing, or overall bandwidth savings. Caching can also improve changing links. Through fluid interior links, the system performance, but query and data placement in a cache- can route around overloaded or failed nodes and links, based system are inherently inter-dependent . thus providing load balancing and fault tolerance. In a high fan-in system, the manner in which this Furthermore, some components in a high fan-in query planning takes place may be done in a variety of system, such as mobile nodes, do not fit into a static ways, ranging from completely centralized to fully topology or may be disconnected for periods. For distributed. The simplest approach is to fully centralize instance, a node mounted on a supply truck driving the planning decisions. A new query would be sent to a between distribution centers must have the ability to single query planning node that has global knowledge and dynamically switch parents en route as well as be able to is able to fully plan and then disseminate the query. This support disconnected operation. Finally, fluidity enables approach is suitable for fully trusting organizations with more fine-grained privacy and security provisioning. An relatively static data, query, and network characteristics. organization can specify exactly which queries and data Alternatively, a recursive approach, where the flows can go where on a flow-by-flow basis. planning and dissemination phases are combined, may be Of course, dynamism presents many challenges, more applicable. In this case, a query is introduced at including metadata management and query planning. For some location, which becomes the root of the hierarchy HiFi, we are developing a hybrid approach, where there for that query. The query then propagates from this point are preferred wirings between nodes, but where alternate to the data sources one level at a time. At each step of the routes through the system can be used in response to process, the current node plans its portion of the query runtime conditions. using only knowledge of its immediate children. As we
8.discuss in Section 5.4, we are implementing a flexible from other operators). They trigger an event when a planning and optimization approach that follows this transition to an accepting state is made. The output of recursive query planning paradigm. these operators can be further processed by other data 4.4. Event Processing operators, giving a unified framework for event and data processing. An important use case for HiFi involves the real-time monitoring and management of large distributed 4.5. Archiving and Prioritization organizations such as supply chains. For such In many situations, in addition to real-time information, applications, it is necessary that the system enable the there is also a need (or at least a desire) to have access to delivery of important status information and events in a the underlying detail information, perhaps in a delayed or timely fashion. archival fashion. Examples include data mining and long- An event is defined as any significant occurrence in term planning applications as well as regulation-driven the system. A user may be interested in a variety of requirements, such as those arising from Sarbanes-Oxley simple events over streaming data: compliance . • A taken-out-of-store event may be defined We are designing HiFi to support a spectrum of simply as seeing a tuple on a particular data delivery requirements, spanning the range from real-time stream originating from an RFID receptor delivery of status and event notifications to background located at the exit of a store. delivery and archiving of detail information. The basic • A fire-in-room event may be defined as a simple approach is: “send summaries, anomalies, and alerts first; “filter event” which is detected when a tuple with the details can follow later”. a temperature value more than 100°C is seen on HiFi meets these varied delivery requirements through a data stream. a dynamic prioritization architecture. The first priority of HiFi's scheduling subsystem is ensuring that archival data Complex events may also be of interest to a user. is not permanently lost . Each node must ensure that Unlike simple events, these require the joining and its archival data is eventually delivered to a node with aggregation of multiple streams under intricate notions of permanent storage. Nodes without local storage keep a time, ordering, and negation. Examples include: buffer of recent data and send the contents of this buffer • A shoplifting event may be triggered when the to remote nodes as needed. taken-out-of-store event is seen for an item Once HiFi has ensured the integrity of archival data, it WITHOUT the occurrence of the purchased-at- devotes time to the other types of data. For these data, counter event for that item. HiFi employs Data Triage  to provide the highest • A person-in-danger event may be triggered when quality of timely results possible given the resources the simple fire-in-room and person-in-room remaining. If there is time to process all data relevant to a events are seen for the same room within a 10 monitoring query, HiFi will do so; otherwise, the system second window. will shed load by summarizing data that it does not have time to process and sending these synopses in place of the In addition to real-time scenarios, event specifications original data. The system reconstructs complete query may span large scales of time and space (e.g., a CEO who results by combining computations on complete data with wants an alert when the nation-wide sales of a product in computations on synopses. the previous week goes below a threshold). In a system like HiFi, event and (SQL-based) data 4.6. Real World Data processing need to be done in a unified manner. However, Perhaps the most unique challenge presented by high fan- SQL does not provide natural ways of expressing queries in environments is the need to seamlessly integrate the over ordered data (like time-ordered data streams) physical world with the digital world. However, the . Hence, specification of complex events over characteristics of each realm differ greatly. Real world streams needs to be done in a language that provides user- data can be seen as an infinite collection of unbounded friendly ways of expressing ordering and negation, in continuous streams with loose semantics, whereas the addition to other constructs. Unified support for data and digital world is inherently discrete (for our purposes, it is event processing is achieved in HiFi by extending the tuple-oriented) with strict semantics and guarantees. query language available to the user in a manner suitable Furthermore, data collection techniques are imperfect at to express complex event queries on data streams. best and provide only a flawed glimpse of the real world. HiFi handles event processing using state machine- Physical receptor devices can introduce significant based operators in the core data stream processing engine complexity due to their wide variance in terms of along with traditional relational operators. These new interface, behavior, and reliability. Thus, a challenge operators maintain and update state for event queries as facing any receptor-based system is to bridge these they see tuples on different data streams (possibly coming
9.disparate worlds in a manner that enables users of the information available at the node is leaked to the higher system to both trust and make sense of the data the system levels in an unauthorized manner. provides. The use of SQL views for specifying authorization Towards this end, HiFi uses virtual devices to interact policies and enforcing access control by query rewriting with the physical world. A virtual device interfaces with using views has been discussed for the centralized case in multiple raw receptors that are in close proximity, . For HiFi, we can extend the approach to a processing and fusing their streams to produce more distributed scenario in which authorization views are useful, higher-quality data. It does this by incorporating exported by distributed data sources. CSAVA-like processing, conversion and calibration, 4.8. System Management virtualization, lineage tracking, and quality assessment. Thus, a virtual device may combine declarative query Finally, a major requirement and challenge for the processing with non-declarative processing, such as with deployment of a large, integrated, distributed system such soft sensors . as HiFi is the ability to continuously monitor the state of One of the more important services a virtual device the system itself and adaptively adjust its behavior. provides is the support for the notions of answer quality Furthermore, the system must be easy to modify in terms and lineage. To provide for the first component, the of the addition and removal of new components and types virtual device augments receptor-based data with error of components. While we are only beginning our estimates and confidence intervals. For example, a virtual investigation into the system management issue, we device for a sensor network can use known techniques intend to exploit the fact that HiFi is itself a hierarchical  for determining answer quality based on probability system for monitoring and managing phenomena in distributions. Similar methods apply to other receptors, hierarchical environments. Thus, we expect to use the provided that the error characteristics are known. Lineage HiFi infrastructure itself to accomplish much of the is also tracked for each data item (or set of items) as it is system management task. processed within the virtual device. A virtual device provides a rich interface to HiFi for 5. Initial Architecture and Service Design interacting with the receptor (or set of receptors). It Having outlined the major design issues for HiFi, we now exposes an interface that consists of a suite of virtual present a description of the initial system. We detail the streams, including multiple levels of processed data functionality and services provided by HiFi by outlining streams (ranging from raw to fully cleaned data), quality its major components: the Metadata Repository (MDR), streams, and lineage streams. HiFi interacts with a virtual the Data Stream Processor (DSP), and the HiFi Glue. device by querying and correlating these streams to produce useful information. For instance, to determine 5.1. Metadata Repository the quality of a certain data value, HiFi would correlate a data stream with the corresponding quality stream. Thus, The Metadata Repository (MDR) serves as a globally the virtual device exports an interface that is richer and accessible catalog for system-wide information. This more useful than a cleaning view over the raw data. metadata is of three types: schema, views, and system A virtual device provides other services as well, such information. as archiving, prioritization, actuation, and receptor The schema contained in the MDR is the mediated management. All of these services are exposed via this schema of the system over which all application queries same stream-based interface. For instance, a query over and views are written. It is assumed that this changes very the archive “stream” allows access to past data. infrequently. For instance in our SCM example, the We are currently exploring the extent of a virtual mediated schema consists of sensor and RFID data. device’s functionality and defining its behavior in various The views stored in the MDR are those exported by environments. each node in the system. The MDR also maintains a mapping of the views exported by a node and its physical 4.7. Privacy and Access Control location, which is vital for supporting a fluid, loosely- coupled topology. Privacy of data is a prime concern in environments where The system information contained in the MDR the flow of information crosses organizational boundaries. includes node capabilities, authorization and privacy This is another case where the use of views to express controls, and information relating to organizational exported data plays a role. Each HiFi node exports a boundaries and administrative domains. Additionally, particular set of views to the higher level nodes based on the MDR maintains runtime information, such as the the access control policies specified by the node’s current set of queries running on each node, current organization. The query planner ensures that only those network usage, and unavailable/unreachable nodes to help queries that can be written on top of these views are guide and optimize system behavior. executed on that node. This restriction ensures that no
10. Figure 3 - Internal architecture of a HiFi node The MDR can be implemented in a variety of ways, from fully centralized to fully decentralized, and this is a The DSP is oblivious of HiFi and could (in principle) be topic we are currently investigating. any stream processor such as TelegraphCQ , Aurora  or STREAM 5. 5.2. Data Stream Processor 5.3. HiFi Glue The Data Stream Processor (DSP) lives entirely within a HiFi node and is responsible for all single site data The HiFi Glue, which runs on each HiFi node, is the stream processing. Only the following simple fabric that seamlessly binds together the system. It functionality is expected of a DSP: coordinates its local DSP, communicates with other HiFi nodes, and manages incoming and outgoing streams. The 1. The ability to process continuous queries HiFi Glue itself consists of local and global sets of 2. The ability to add continuous queries on-the-fly services. The glue and its relationship to the MDR and 3. The ability to add sources on-the-fly DSP are shown in Figure 3. 4. The ability to cancel queries 5.3.1. Local Services Additionally, when present, a HiFi node can profitably The local HiFi Glue services perform actions that involve exploit: local decisions only. 1. The ability to modify a currently running query 5 2. The ability to suspend a currently running query Our current implementation uses a combination of two 3. The archiving of streams versions of TelegraphCQ and the TinyDB system, as these have 4. The querying of archived data been previously developed by our group. The ease of incorporating other stream processors remains to be seen.
11. Logical Query Planner: The Logical Query Planner received from a user. It does this by determining the scope converts queries into a local query plan (the DSP Plan) and granularity of the query and consulting the MDR to and a set of queries to be run on child nodes (the Remote determine the lowest common ancestor that can serve as Query Set or RQS). We describe this process in more the query root. detail in Section 5.4. Physical Query Planner: Once the Logical Query DSP Manager: This module is responsible for Planner produces a set of plans involving child views, this starting, stopping, suspending, and modifying locally module consults the MDR to determine the physical node running DSP queries and streams based on input from the location of these views. This is what enables fluid Query Planner. It also handles syntax translation from topologies in HiFi. HiFi’s internal query representation to the local DSP’s Control Manager: The Control Manager interacts query language, if necessary. with other HiFi nodes to perform overall system Resource Manager: This module’s job is to adapt a management. This includes system health monitoring and node’s behavior to unpredictable run-time conditions and global and local startup/shutdown. perform functions such as prioritization and load- 5.4. A Day in the Life of a Query shedding. Local View Manager: The Local View Manager In this section, we illustrate the functionality of many of provides a way to describe and manage the views that the architectural components by walking through the represent the data exposed by the node. It interacts with processing of a query. the MDR to export and revoke the current set of views The primary query planning mechanism on each node active on this node. Additionally, it allows authorization, is called the shared view infrastructure (SVI), which is privacy, and other constraints to be specified for each part of the Logical Query Planner. Each HiFi node is view. aware of a set of views (V1, V2,…, Vn) that describe all Archive Manager: This component manages the the data available to it (i.e., the views exported by all of archiving of streams for the purposes of historical its children). The SVI converts each view Vi to a succinct querying. Note that some DSPs support this functionality form composed of a set of sources and operators. The SVI internally . Furthermore, the Archive Manager may then merges this view into its shared view representation interact with other nodes to place data (both relations and in a manner identical to the way in which new queries are streams) for efficient query processing. merged into a common shared plan in TelegraphCQ . Cache Manager: The Cache Manager snoops Thus, the SVI contains an agglomerated form of the incoming data streams and determines what data to cache individual views. based on current workload. Additionally, it interacts with When a query Q enters the HiFi system, the Query the Query Planner to enable query processing using Placement Service is consulted to dispatch the query to its cached data (i.e., materialized views). root. Once a query arrives at its root, the following steps, Query Listener: The Query Listener listens for coordinated by the Query Planner on each node, take remote connections, parses incoming requests, and passes place recursively to both plan and disseminate the query. them on to the Query Planner. Query Dispatcher: This component dispatches 1. SVI conversion: Q is transformed into the same queries of the Remote Query Set to remote HiFi nodes. representation used for views in the SVI. Data Listener/Disseminator: These components 2. Logical planning: This plan is folded into the SVI in handle incoming and outgoing data streams. Incoming same manner in which views are added to produce streams from multiple sources may be merged into the the following: same stream en route to the DSP. Outgoing streams may • Remote Query Set (RQS): A set of queries be split among multiple destinations. Additionally, these (QV1, QV2,…, QVi), created by the Logical Query components handle pre- or post-processing of streams, Planner, that represent the current query such as encryption/decryption or format translation. rewritten using the views of children. For each child node’s view Vi that must provide data to 5.3.2. Global Services this query, a corresponding query QVi is created Global HiFi services require non-local knowledge and to be run on the corresponding child. Note that a interaction with other nodes in the system. In some cases, query QVi can be different from the view Vi as this portion of the glue need not physically reside on each more operations can generally be pushed down node. Instead it can be viewed as a set of globally into the provider of Vi. available services that perform actions on behalf of the • DSP Plan: A local DSP query Qlocal that operates requesting node. over the input streams produced by the RQS. Query Placement Service: This component determines the best node to start executing a query when
12.3. DSP setup: The DSP Manager creates a new stream processing. This level serves as the aggregation point for definition corresponding to each QVi in the RQS and the receptors and consists of small computing devices then adds Qlocal on-the-fly to the DSP. capable of field deployment. For our system, we use Intel 4. Physical planning: For each QVi to be run on a child Stargates , small, single-board Linux-based compute node, the Physical Query Planner produces the devices built with Intel XScale processors. As shown in following: Figure 4, MoteServer and RFIDServer processes interact • Where: Li - a location to run the query. If with the receptors and inject their streams into HiFi. For multiple child nodes export the same view, then data processing, the Stargates run a scaled-down version the physical query planner chooses one of the of TelegraphCQ, capable of running simple continuous nodes based on runtime conditions. queries. Here the system performs additional cleaning • What: Q'Vi - the actual query that has to be run. and performs basic aggregation and correlation using Note that Q'Vi is a textual representation of QVi queries similar to those shown in Section 3.2 before in the syntax expected by the HiFi node Li. passing on the data. A Stargate along with an associated 5. Query dissemination: Each Q'Vi is sent to its sensor network and RFID reader represent our field appropriate location Li where this same process takes deployable unit (FDU) which monitors one area. place recursively. 6.3. Core Processing 6. Data sourcing: As results from the query Q'Vi are received by the Data Listener, they are streamed into The Stargates feed their processed and aggregated streams the DSP. to the root of our hierarchy, a full-fledged server running 7. Returning results: As the local DSP produces TelegraphCQ. This node runs queries that correlate results, the Data Disseminator directs them to the streams across all devices, for example, “find the appropriate parent(s). maximum (sound/number_of_tags) quotient across all areas.” 6. Prototype System 6.4. Experiences We have built an initial version of HiFi using the We have deployed this prototype as described with TelegraphCQ (TCQ) stream query processor and the several FDUs and a streaming visualization interface for TinyDB sensor database system. The goal of this demonstration at the 2004 VLDB conference . In prototype is to examine the feasibility of the uniform many ways this experience influenced the design declarative framework and to derive a better presented above. We briefly discuss some of these understanding of the core components required for experiences and the lessons we learned from them. building high fan-in systems. Figure 4 depicts this initial There were many basic problems arising from the deployment. It consists of a three-level hierarchy: inherent complexity of a high fan-in system. Each new receptors, initial processing, and core processing. device incorporated into the system brought with it its own implementation challenges. We discovered many 6.1. Receptors small bugs in different parts of the system as we moved to each new platform. These challenges provide a strong The receptor level consists of sensor networks and RFID argument for a general-purpose data management readers monitoring the physical world. For our sensornet platform such as HiFi to remove this source of complexity system, we use TinyDB , which supports a SQL- when deploying high fan-in systems. esque interface for query processing. Our current Although our deployment had only three levels in its prototype uses RFID readers to make up the other branch hierarchy, we discovered that there was severe data lag, of the receptor level. Although the RFID reader does not from the time when a receptor read a data value to the export a SQL interface, we have built a simple adapter to time when that value was reflected in the output. This interact with the device. Both of these systems are stemmed from the fact that hierarchical, window-based capable of some form of processing, ranging from query processing, naively implemented, has inherent aggressive in-network aggregation in TinyDB to simple delays. If a query’s windowing specifications (i.e., its buffering (i.e., windowing) in the RFID reader. HiFi range and slide parameters) are disseminated unchanged exploits this functionality to clean the data as it samples it. through the hierarchy, a lag on the order of the slide parameter is introduced for each level. Thus, the system 6.2. Initial Processing must be careful in handling the window clause as a query is propagated down the hierarchy. We are currently The receptors feed their streams of partially cleaned data developing techniques to address this issue. to the second level in our prototype hierarchy for initial
13. Figure 4 – The initial HiFi prototype  Our implementation of CSAVA provided many interesting lessons. As development and testing 7. Related Work progressed, we discovered that the RFID data produced As has been discussed in the previous sections, HiFi by the readers were highly unreliable. By applying builds on a large body of related work. In addition to CSAVA processing, we were able to clean up this data to work already discussed about the individual components a certain extent. Additionally, this effort validated our of HiFi (e.g., view-based query rewrite), there is previous uniform declarative framework, as CSAVA deployment work relating to both the architecture of high fan-in took relatively little development time. systems in general and to the design of HiFi in particular. Finally, deploying this system reinforced our belief that system management is very important. Our 7.1. Hierarchical and Receptor-based Systems deployment was relatively small in scope, yet it had five There are a variety of projects aimed at managing and different platforms running four different data processing querying the data produced by receptors, both physical systems across more than 20 devices. Without and virtual. These projects have assumed topologies management tools for start up, shutdown, and status, the similar to the high fan-in approach described here, system would have been largely unusable. Additionally, although there are significant differences. we discovered that compartmentalized design of each IrisNet uses a two level hierarchy consisting of node provides a benefit in that faults were isolated to the receptors feeding into a core composed of a set of nodes component that failed. We were able to dramatically running a distributed database. Queries are posed in increase the uptime of the system through strict XQuery over a hierarchical schema which represents both compartmentalization of each component in the system.
14.the node organization and data organization. Thus, operator repartitioning and movement to achieve load- queries contain full information to enable the query to be balancing and fault-tolerance. The Medusa System  routed to the lowest common ancestor necessary to arranges single site Aurora data stream processors in a answer that query. The focus is on ease of service loosely federated network mediated by agoric principles deployment and scalability. Although IrisNet is organized to enable spanning of organizational boundaries and load in a hierarchy, it does not address hierarchical aggregation balancing. This work differs from HiFi that it has focused or successive processing of queries. on distributing stream processing for load balancing and Astrolabe  is designed for distributed system high availability. In contrast, HiFi is focused on monitoring and data mining for system management. It identifying and addressing the problems that arise in organizes its nodes in a hierarchical manner (termed systems that naturally assume a high fan-in topology. zones) with a primary focus on aggregation to enable More recently, Ahmad and Cetintemel have reported system scalability. While not explicitly dealing with on an in-depth study of operator placement in a streams, it does handle rapid updates to the underlying distributed stream processing system . This work data and re-computes aggregates on-the-fly. It does not analyzes multiple algorithms and exposes the trade-off address windowing semantics. Astrolabe is designed to between bandwidth usage and answer latency. run on a relatively homogenous system and doesn’t take into account differing system capabilities. 7.3. Distributed Data Management Systems The MIT Auto-ID center defines a set of specifications on how to interact with RFID data, including Savant . More traditional database research has focused on They address a similar hierarchical framework with distributed data management in the form of both tightly multiple Savants talking to each other. The also define and loosely coupled distributed databases as well as some of the same types of data processing stages we federated databases. Relevant efforts in this area include discuss in our CSAVA example. However, each stage in Mariposa, Information Manifold , and their processing involves a different data model and Tukwila. different protocol for interacting with the data. The Hourglass project  from Harvard is 8. Conclusions developing a data collection network (DCN) for accessing In this paper, we have introduced the notion of high fan-in sensor-based data. Their infrastructure consists of an systems, an emerging information systems architecture overlay network of wired nodes collecting data from that leverages advances in data acquisition and sensor various sensor networks. They generalize system technologies to enable disparate, widely distributed components into producers, consumers, and services and organizations to continuously monitor, manage, and focus on how best to establish and maintain circuits in the optimize their operations. The technology required to overlay network. support high fan-in systems builds on previous work in The D-Stampede project  at Georgia Tech federated data management, data stream processing, provides a programming system for managing what they sensor network query processing, and distributed data term an “Octopus” hardware configuration, with a wide management; however, the unique architectural, range of receptors feeding into a cluster for further application, and environmental considerations that arise in processing. Their goal is to provide an API to support such systems raises a wealth of new and interesting high performance application development for a research questions. heterogeneous (both hardware and software) environment. We described our initial design ideas and outlined They focus on providing an application development currently open issues in the development of HiFi, a high environment and not on data management. fan-in infrastructure currently being implemented at UC Berkeley. We have built an initial prototype of HiFi using 7.2. Data Stream Processing the TelegraphCQ and TinyDB code bases, and have successfully demonstrated the usefulness of stream- As we have discussed previously, HiFi draws heavily oriented query processing for correlating, aggregating, from the large body of recent work on single site data and visualizing readings from sensor motes and RFID stream processing. Projects in this area include readers. This paper represents a current snapshot of our TelegraphCQ , STREAM, Aurora , and development and identifies areas of future research. Of NiagaraCQ . To date, there has been less work on course, as the project develops, we anticipate that both our distributed stream processing. design and our research agenda will evolve as new issues The Aurora Project has branched into two separate and opportunities arise. efforts to extend stream processing to a distributed environment. Aurora*  is designed for a single administrative domain and addresses QoS and dynamic
15.References  Kossmann, D., Franklin, M., and Drash, G., Cache Investment: Integrating Query Optimzation and Distributed  Abadi, D. et al., Aurora: A New Model and Architecture Data Placement, ACM TODS Vol. 25, No. 4, Dec. 2000. for Data Stream Management. In VLDB Journal,  Kirk, T., Levy, A., Sagiv, J., Srivastava D. The information (August 2003). manifold. Technical report, AT&T Bell Laboratories, 1995.  Abadi, D., Lindner, W., Madden, S., Schuler, J., An  Lerner, A. et al., AQuery: Query Language for Ordered Integration Framework for Sensor Networks and Data Data, Optimization Techniques, and Experiments, In Stream Management Systems. VLDB 2004. VLDB 2003.  Adhikari, S., Paul, A., and Ramachandran, U., D-  Levy, A., Rajaraman, A., and Ordille, J. J., Querying Stampede: distributed programming system for Heterogenous information sources using source ubiquitous computing. ICDCS 2002. descriptions. In VLDB (1996).  Afrati, F. and Chirkova, R., Selecting and Using Views  Madden, S., Franklin, M., Hellerstein, J., Hong, W., A Tiny to Compute Aggregate Queries, (unpublished Aggregation Service for ad hoc Sensor Networks, In OSDI manuscript, 12/08/2003). (2002).  Ahmad, Y. and Cetintemel, U. Network-Aware Query  Madden, S., and Franklin, M., Fjording the Stream: An Processing for Stream-based Applications. In VLDB Architecture for Queries Over Streaming Sensor Data, In (2004). ICDE (2002).  Acharya, S., Alonso, R., Franklin, M., and Zdonik, S.,  Motwani, R. et al., Query Processing, Resource Broadcast Disks: Data Management for Asymmetric Management and Approximation in a Data Stream Communications Environments. In SIGMOD (1995). Management System, In CIDR (2003).  Balazinska, M., Balakrishnan, H., and Stonebraker, M.,  Oat Systems and MIT Auto-ID Center. The Savant. Load Management and High Availability in the Medusa Technical Report MIT-AUTOID-TM-003, MIT Auto-ID Distributed Stream Processing System. In SIGMOD Center, May 2002. (2004).  Pottinger, R., and Levy, A., A Scalable Algorithm for  Bonnet, P., Gehrke, J., and Seshadri, P., Towards Sensor Answering Queries Using Views, In VLDB (2000). Databases. In MDM (2001).  Qian., X., Query Folding. In ICDE (1996).  Bonnet, P., and Seshadri, P., Device Database Systems. In  Qin, S. J., "Neural networks for intelligent sensors and ICDE (2000). control --- Practical issues and some solutions," In: O.  Carney, D. et al., Monitoring Streams - A New Class of Omidvar and D.L. Elliott (Ed.), Neural Systems for Data Management Applications. In VLDB (2002). Control, Academic Press, chapter 8 (1997).  Chen, J., DeWitt, D., Tian, F., and Wang, Y., NiagaraCQ:  Ramachandran, U. et al., Stampede: A Cluster A Scalable Continuous Query System for Internet Programming Middleware for Interactive Stream- Databases. In SIGMOD (2000). oriented Applications. IEEE Trans. on Parallel and  Chandrasekaran, S. et al., TelegraphCQ: Continuous Distributed Systems, Nov. (2003). Dataflow Processing for an Uncertain World. In CIDR  Ramakrishnan, R. et al., SRQL: Sorted Relational Query (2003). Language, In Statistical and Scientific Database  Chandrasekaran, S., and Franklin, M., Remembrance of Management 1998. Streams Past: Overload-Sensitive Management of Archived  Reiss, F., and Hellerstein, J. , Data Triage: An Adaptive Streams, In VLDB(2004). Architecture for Load Shedding in TelegraphCQ, ITB-TR-  Cherniack, M. et al., Scalable Distributed Stream 04-004, Intel Research, February (2004). Processing, In CIDR (2003).  Rizvi, S., Mendelzon, A., Sudarshan, S., and Roy, P.,  Cooper, O. et al., HiFi, A Unified Architecture for High Extending Query Rewriting Techniques for Fine-Grained Fan-In Systems (System Demonstration), In VLDB 2004. Access Control, SIGMOD (2004).  Crossbow. The Stargate single board computer.  Sarbanes-Oxley Act. http://www.sarbanes-oxley.com http://www.xbow.com/Products/XScale.htm.  Shneidman, J. et al., Hourglass: An Infrastructure for  Deshpande, A., Nath, S., Gibbons, P. B., Seshan, S. IrisNet: Connecting Sensor Networks and Applications. Harvard Internet-scale resource-intensive sensor services. ACM Technical Report TR-21-04 SIGMOD 2003.  Srivastava, D., Dar, S., Jagadish, H. V., Levy, A. Y.,  Duschka, O. M., and Genesereth, M. R., Answering Answering Queries with Aggregation Using Views. VLDB Recursive Queries Using Views. In PODS (1997). 1996.  EPCGlobal, EPCGlobal Homepage,  Stonebraker, M., Aoki, P. A., Litwin, W., Pfeffer, A., http://www.epcglobalinc.org. Sah, A., Sidell, J., Staelin, C., and Andrew Yu.  Faradjian, A., Gehrke, J. E., Bonnet, P. GADT: A Mariposa: A Wide-Area Distributed Database System. probability space ADT for representing and querying the VLDB Journal, 1996. physical world. In Proceedings of ICDE 2002 (2002).  van Renesse, R., Birman, K. P., and Vogels, W.,  Hellerstein, J. et al., Adaptive Query Processing: Astrolabe: A robust and scalable technology for distributed Technology in Evolution, IEEE Data Engineering Bulletin, system monitoring, management, and data mining. ACM June 2000. TOCS, 2003.  Harren, M. et al., “Complex Queries in DHT-Based Peer- to-Peer Networks”. In IPTPS (2002).  Ives, Z. G., Florescu, D., Friedman, M., Levy, A., Weld, D. S., An Adaptive Query Execution System for Data Integration. In SIGMOD (1999).