gorilla

Large-scale internet services aim to remain highly availableand responsive in the presence of unexpected failures. Pro-viding this service often requires monitoring and analyzingtens of millions of measurements per second across a largenumber of systems, and one particularly effective solutionis to store and query such measurements in a time seriesdatabase (TSDB).A key challenge in the design of TSDBs is how to strikethe right balance between efficiency, scalability, and relia-bility. In this paper we introduce Gorilla, Facebook’s in-memory TSDB. Our insight is that users of monitoring sys-tems do not place much emphasis on individual data pointsbut rather on aggregate analysis, and recent data points areof much higher value than older points to quickly detect anddiagnose the root cause of an
展开查看详情

1.Gorilla: A Fast, Scalable, In-Memory Time Series Database Tuomas Pelkonen Scott Franklin Justin Teller Paul Cavallaro Qi Huang Justin Meza Kaushik Veeraraghavan Facebook, Inc. Menlo Park, CA ABSTRACT FB Servers Large-scale internet services aim to remain highly available and responsive in the presence of unexpected failures. Pro- Web Tier viding this service often requires monitoring and analyzing tens of millions of measurements per second across a large number of systems, and one particularly effective solution Long term is to store and query such measurements in a time series Gorilla storage database (TSDB). (HBase) A key challenge in the design of TSDBs is how to strike Back-end the right balance between efficiency, scalability, and relia- Services bility. In this paper we introduce Gorilla, Facebook’s in- memory TSDB. Our insight is that users of monitoring sys- tems do not place much emphasis on individual data points but rather on aggregate analysis, and recent data points are Ad-hoc visualizations and of much higher value than older points to quickly detect and dashboards Alarms and diagnose the root cause of an ongoing problem. Gorilla op- automatic timizes for remaining highly available for writes and reads, remediation even in the face of failures, at the expense of possibly drop- ping small amounts of data on the write path. To improve Time Series query efficiency, we aggressively leverage compression tech- Correlation niques such as delta-of-delta timestamps and XOR’d floating point values to reduce Gorilla’s storage footprint by 10x. This allows us to store Gorilla’s data in memory, reduc- Figure 1: High level overview of the ODS monitor- ing query latency by 73x and improving query throughput ing and alerting system, showing Gorilla as a write- by 14x when compared to a traditional database (HBase)- through cache of the most recent 26 hours of time backed time series data. This performance improvement has series data. unlocked new monitoring and debugging tools, such as time series correlation search and more dense visualization tools. Gorilla also gracefully handles failures from a single-node to ual systems running on many thousands of machines, often entire regions with little to no operational overhead. across multiple geo-replicated datacenters. An important requirement to operating these large scale services is to accurately monitor the health and performance 1. INTRODUCTION of the underlying system and quickly identify and diagnose Large-scale internet services aim to remain highly-available problems as they arise. Facebook uses a time series database and responsive for their users even in the presence of unex- (TSDB) to store system measuring data points and provides pected failures. As these services have grown to support quick query functionalities on top. We next specify some of a global audience, they have scaled beyond a few systems the constraints that we need to satisy for monitoring and running on hundreds of machines to thousands of individ- operating Facebook and then describe Gorilla, our new in- memory TSDB that can store tens of millions of datapoints (e.g., CPU load, error rate, latency etc.) every second and This work is licensed under the Creative Commons Attribution- NonCommercial-NoDerivs 3.0 Unported License. To view a copy of this li- respond queries over this data within milliseconds. cense, visit http://creativecommons.org/licenses/by-nc-nd/3.0/. Obtain per- Writes dominate. Our primary requirement for a TSDB mission prior to any use beyond those covered by the license. Contact is that it should always be available to take writes. As copyright holder by emailing info@vldb.org. Articles from this volume we have hundreds of systems exposing multiple data items, were invited to present their results at the 41st International Conference on the write rate might easily exceed tens of millions of data Very Large Data Bases, August 31st - September 4th 2015, Kohala Coast, points each second. In constrast, the read rate is usually Hawaii. Proceedings of the VLDB Endowment, Vol. 8, No. 12 a couple orders of magnitude lower as it is primarily from Copyright 2015 VLDB Endowment 2150-8097/15/08. automated systems watching ’important’ time series, data 1816

2.visualization systems presenting dashboards for human con- We addressed the reliability requirements by running mul- sumption, or from human operators wishing to diagnose an tiple instances of Gorilla in different datacenter regions and observed problem. streaming data to each without attempting to guarantee State transitions. We wish to identify issues that emerge consistency. Read queries are directed at the closest avail- from a new software release, an unexpected side effect of a able Gorilla instance. Note that this design leverages our configuration change, a network cut and other issues that re- observation that individual data points can be lost without sult in a significant state transition. Thus, we wish for our compromising data aggregation unless there’s significant dis- TSDB to support fine-grained aggregations over short-time crepancy between the Gorilla instances. windows. The ability to display state transitions within tens Gorilla is currently running in production at Facebook of seconds is particularly prized as it allows automation to and is used daily by engineers for real-time firefighting and quickly remediate problems before they become wide spread. debugging in conjunction with other monitoring and analy- High availability. Even if a network partition or other sis systems like Hive [27] and Scuba [3] to detect and diag- failure leads to disconnection between different datacenters, nose problems. systems operating within any given datacenter ought to be able to write data to local TSDB machines and be able to retrieve this data on demand. 2. BACKGROUND & REQUIREMENTS Fault tolerance. We wish to replicate all writes to multi- ple regions so we can survive the loss of any given datacenter 2.1 Operational Data Store (ODS) or geographic region due to a disaster. Operating and managing Facebook’s large infrastructure Gorilla is Facebook’s new TSDB that satisfies these con- comprised of hundreds of systems distributed across mul- straints. Gorilla functions as a write-through cache of the tiple data centers would be very difficult without a moni- most recent data entering the monitoring system. We aim toring system that can track their health and performance. to ensure that most queries run within 10’s of milliseconds. The Operational Data Store (ODS) is an important portion The insight in Gorilla’s design is that users of monitor- of the monitoring system at Facebook. ODS comprises of ing systems do not place much emphasis on individual data a time series database (TSDB), a query service, and a de- points but rather on aggregate analysis. Additionally, these tection and alerting system. ODS’s TSDB is built atop the systems do not store any user data so traditional ACID guar- HBase storage system as described in [26]. Figure 1 repre- antees are not a core requirement for TSDBs. However, a sents a high-level view of how ODS is organized. Time series high percentage of writes must succeed at all times, even data from services running on Facebook hosts is collected by in the face of disasters that might render entire datacenters the ODS write service and written to HBase. unreachable. Additionally, recent data points are of higher There are two consumers of ODS time series data. The value than older points given the intuition that knowing if first consumers are engineers who rely on a charting system a particular system or service is broken right now is more that generates graphs and other visual representations of valuable to an operations engineer than knowing if it was time series data from ODS for interactive analysis. The broken an hour ago. Gorilla optimizes for remaining highly second consumer is our automated alerting system that read available for writes and reads, even in the face of failures, at counters off ODS, compares them to preset thresholds for the expense of possibly dropping small amounts of data on health, performance and diagnostic metrics and fires alarms the write path. to oncall engineers and automated remediation systems. The challenge then arises from high data insertion rate, total data quantity, real-time aggregation, and reliability re- 2.1.1 Monitoring system read performance issues quirements. We addressed each of these in turn. To address In early 2013, Facebook’s monitoring team realized that the first couple requirements, we analyzed the Operational its HBase time series storage system couldn’t scale handle Data Store (ODS) TSDB, an older monitoring system that future read loads. While the average read latency was ac- was widely used at Facebook. We noticed that at least 85% ceptable for interactive charts, the 90th percentile query of all queries to ODS was for data collected in the past 26 time had increased to multiple seconds blocking our au- hours. Further analysis allowed us to determine that we tomation. Additionally, users were self-censoring their us- might be able to serve our users best if we could replace a age as interactive analysis of even medium-sized queries of disk-based database with an in-memory database. Further, a few thousand time series took tens of seconds to execute. by treating this in-memory database as a cache of the persis- Larger queries executing over sparse datasets would time- tent disk-based store, we could achieve the insertion speed out as the HBase data store was tuned to prioritize writes. of an in-memory system with the persistence of a disk based While our HBase-based TSDB was inefficient, we quickly re- database. jected wholesale replacement of the storage system as ODS’s As of Spring 2015, Facebook’s monitoring systems gener- HBase store held about 2 PB of data [5]. Facebook’s data ate more than 2 billion unique time series of counters, with warehouse solution, Hive, was also unsuitable due to its al- about 12 million data points added per second. This repre- ready orders of magnitude higher query latency comparing sents over 1 trillion points per day. At 16 bytes per point, to ODS, and query latency and efficiency were our main the resulting 16TB of RAM would be too resource intensive concerns [27]. for practical deployment. We addressed this by repurposing We next turned our attention to in-memory caching. ODS an existing XOR based floating point compression scheme to already used a simple read-through cache but it was pri- work in a streaming manner that allows us to compress time marily targeted at charting systems where multiple dash- series to an average of 1.37 bytes per point, a 12x reduction boards shared the same time series. A particularly difficult in size. scenario was when dashboards queried for the most recent data point, missed in the cache, and then issued requests 1817

3.directly to the HBase data store. We also considered a sep- 3.1 OpenTSDB arate Memcache [20] based write-through cache but rejected OpenTSDB is based on HBase [28], and very closely re- it as appending new data to an existing time series would sembles the ODS HBase storage layer we use for long term require a read/write cycle, causing extremely high traffic to data. Both systems rely on similar table structures, and the memcache server. We needed a more efficient solution. have come to similar conclusions for optimization and hori- zontal scalability [26, 28]. However, we had found that sup- 2.2 Gorilla requirements porting the volume of queries necessary to build advanced With these considerations, we determined the following monitoring tools required faster queries than a disk based requirements for a new service: store can support. Unlike OpenTSDB, the ODS HBase layer does do time roll • 2 billion unique time series identified by a string key. up aggregation for older data to save space. This results in older, archived data having lower time granularity compared • 700 million data points (time stamp and value) added to more recent data in ODS, while OpenTSDB will keep the per minute. full resolution data forever. We have found that cheaper long time period queries and space savings are worth the • Store data for 26 hours. loss of precision. • More than 40,000 queries per second at peak. OpenTSDB also has a richer data model for identifying time series. Each time series is identified by a set of arbitrary • Reads succeed in under one millisecond. key-value pairs, called tags [28]. Gorilla identifies time series with a single string key and relies on higher level tools to • Support time series with 15 second granularity (4 points extract and identify time series meta data. per minute per time series). • Two in-memory, not co-located replicas (for disaster 3.2 Whisper (Graphite) recovery capacity). Graphite stores time series data on local disk in the Whis- per format, a Round Robin Database (RRD) style database • Always serve reads even when a single server crashes. [1]. This file format expects time series data to be times- tamped at regular intervals, and does not support jitter in • Ability to quickly scan over all in memory data. the time series. While Gorilla does work more efficiently • Support at least 2x growth per year. if data are timestamped at regular intervals, it can handle arbitrary and changing intervals. With Whisper, each time After a brief comparison with other TSDB systems in series is stored in a separate file, and new samples overwrite Section 3, we detail the implementation of Gorilla in Sec- old ones after a certain amount of time [1]. Gorilla works in tion 4, first discussing its new time stamp and data value a similar fashion, only holding the most recent day of data in compression schemes in Section 4.1. We then describe how memory. However, with its focus on on-disk storage, query Gorilla remains highly available despite single node failures latency using Graphite/Whisper is not fast enough to match and region-wide disasters in Section 4.4. We describe how the requirements for Gorilla. Gorilla has enabled new tools in Section 5. We close out by 3.3 InfluxDB describing our experience developing and deploying Gorilla in Section 6. InfluxDB is a new open-source time series database, with an even richer data model than OpenTSDB. Each event in a time series can have a full set of meta data. While this 3. COMPARISON WITH TSDB SYSTEMS flexibility does allow for rich data, it necessarily results in There are a number of publications detailing data mining larger disk usage than schemes that only store time series techniques to search, classify, and cluster enormous amounts within the database [2]. of time series data efficiently [8, 23, 24]. These systems InfluxDB also contains the code to build it as a distributed demonstrate many of the uses of examining time series data, storage cluster, allowing users to scale horizontally without from clustering and classifying [8, 23] to anomaly detection the overhead of managing an HBase/Hadoop cluster [2]. At [10, 16] to indexing the time series [9, 12, 24]. However, Facebook, we already have dedicated teams to support our there are fewer examples detailing systems able to gather HBase installations, so using it for ODS did not involve a and store massive amounts of time series data in real-time. large extra investment in resources. Like other systems, In- Gorilla’s design, focusing on reliable real-time monitoring fluxDB keeps data on-disk, leading to slower queries than if of production systems, makes stand out compared to other data are kept in memory. TSDBs. Gorilla occupies an interesting design space, where being available for reads and writes in the face of failures 4. GORILLA ARCHITECTURE prioritized over availability of any older data. Gorilla is an in-memory TSDB that functions as a write- Since Gorilla was designed from the beginning to store through cache for monitoring data written to an HBase data all data in memory, its in-memory structure is also different store. The monitoring data stored in Gorilla is a simple 3- from existing TSDBs. However, if one views Gorilla as an tuple of a string key, a 64 bit time stamp integer and a intermediate store for in-memory storage of time series data double precision floating point value. Gorilla incorporates in front of another on-disk TSDB, then Gorilla could be a new time series compression algorithm that allows us to used as a write through cache for any TSDB (with relatively compress each by series down from 16 bytes to an average simple modifications). Gorilla’s focus on speed of ingest and of 1.37 bytes, a 12x reduction in size. Further, we have horizontal scaling is similar to existing solutions. arranged Gorilla’s in-memory data structures to allow fast 1818

4. Data stream March 24, Value: a) 2015 02:01:02 12 02:02:02 12 02:03:02 24 Compressed data Header: 62 12 '10' : -2 '0' '0' '11' : 11 : 1 :'1' March 24, 2015 02:00:00 Bit length 64 14 64 9 1 1 2+5+6+1 b) c) N-2 timestamp 02:00:00 - Previous Value 12.0 0x4028000000000000 N-1 timestamp 02:01:02 Delta: 62 Value 24.0 0x4038000000000000 timestamp 02:02:02 Delta: 60 XOR - 0x0010000000000000 Delta of deltas: -2 11 leading zeros, # of meaningful bits is 1 Figure 2: Visualizing the entire compression algorithm. For this example, 48 bytes of values and time stamps are compressed to just under 21 bytes/167 bits. and efficient scans of all data while maintaining constant Gorilla is focused on keeping the full resolution representa- time lookup of individual time series. tion of data. The key specified in the monitoring data is used to uniquely Our work was inspired by a compression scheme for float- identify a time series. By sharding all monitoring data based ing point data derived in scientific computation. This scheme on these unique string keys, each time series dataset can be leveraged XOR comparison with previous values to generate mapped to a single Gorilla host. Thus, we can scale Go- a delta encoding [25, 17]. rilla by simply adding new hosts and tuning the sharding Gorilla compresses data points within a time series with function to map new time series data to the expanded set of no additional compression used across time series. Each data hosts. When Gorilla was launched to production 18 months point is a pair of 64 bit values representing the time stamp ago, our dataset of all time series data inserted in the past and value at that time. Timestamps and values are com- 26 hours fit into 1.3TB of RAM evenly distributed across 20 pressed separately using information about previous values. machines. Since then, we have had to double the size of the The overall compression scheme is visualized in Figure 2, clusters twice due to data growth, and are now running on showing how time stamps and values are interleaved in the 80 machines within each Gorilla cluster. This process was compressed block. simple due to the share-nothing architecture and focus on Figure 2.a illustrates the time series data as a stream con- horizontal scalability. sisting of pairs of measurements (values) and time stamps. Gorilla tolerates single node failures, network cuts, and Gorilla compresses this data stream into blocks, partitioned entire datacenter failures by writing each time series value by time. After a simple header with an aligned time stamp to two hosts in separate geographic regions. On detecting a (starting at 2 am, in this example) and storing the first value failure, all read queries are failed over to the alternate region in a less compressed format, Figure 2.b shows that times- ensuring that users do not experience any disruption. tamps are compressed using delta-of-delta compression, de- scribed in more detail in Section 4.1.1. As shown in Figure 4.1 Time series compression 2.b the time stamp delta of delta is −2. This is stored with a two bit header (‘10’), and the value is stored in seven bits, In evaluating the feasibility of building an in-memory time for a total size of just 9 bits. Figure 2.c shows floating-point series database, we considered several existing compression values are compressed using XOR compression, described in schemes to reduce the storage overhead. We identified tech- more detail in Section 4.1.2. By XORing the floating point niques that applied solely to integer data which didn’t meet value with the previous value, we find that there is only a our requirement of storing double precision floating point single meaningful bit in the XOR. This is then encoded with values. Other techniques operated on a complete dataset a two bit header (‘11’), encoding that there are eleven lead- but did not support compression over a stream of data as ing zeros, a single meaningful bit, and the actual value (‘1’). was stored in Gorilla [7, 13]. We also identified lossy time se- This is stored in fourteen total bits. ries approximation techniques used in data mining to make the problem set more easily fit within memory [15, 11], but 1819

5.����������������������������� ��������� ���� ���������������������� ��� ��������� �� �������� ���� ��������� ��������� ����� � ������ ����� � � �� ����� ��������� ��� �� �������������������������������������������� Figure 4: Visualizing how XOR with the previous value often has leading and trailing zeros, and for Figure 3: Distribution of time stamp compression many series, non-zero elements are clustered. across different ranged buckets. Taken from a sam- ple of 440, 000 real time stamps in Gorilla. and selecting the ones that gave the best compression ra- tio. A time series might have data points missing but the 4.1.1 Compressing time stamps existing points likely arrived at fixed intervals. For example We analyzed the time series data stored in ODS so we if there’s one missing data point the deltas could be 60, 60, could optimize the compression scheme implemented in Go- 121 and 59. The deltas of deltas would be 0, 61 and -62. rilla. We noticed that the vast majority of ODS data points Both 61 and -62 fit inside the smallest range and fewer bits arrived at a fixed interval. For instance, it is common for a can be used to encode these values. The next smallest range time series to log a single point every 60 seconds. Occasion- [-255, 256] is useful because a lot of the data points come ally, the point may have a time stamp that is 1 second early in every 4 minutes and a single data point missing still uses or late, but the window is usually constrained. that range. Rather than storing timestamps in their entirety, we store Figure 3 show the results of time stamp compression in an efficient delta of deltas. If the delta between time stamps Gorilla. We have found that about 96% of all time stamps for subsequent data points in a time series are 60, 60, 59 can be compressed to a single bit. and 61 respectively, the delta of deltas is computed by sub- tracting the current time stamp value from the previous one 4.1.2 Compressing values which gives us 0, -1 and 2. An example of how this works is In addition to the time stamp compression, Gorilla also shown in Figure 2. compresses data values. Gorilla restricts the value element We next encode the delta of deltas using variable length in its tuple to a double floating point type. We use a com- encoding with the following algorithm: pression scheme similar to existing floating point compres- sion algorithms, like the ones described in [17] and [25]. 1. The block header stores the starting time stamp, t−1 , From analyzing our ODS data, we discovered that the which is aligned to a two hour window; the first time value in most time series does not change significantly when stamp, t0 , in the block is stored as a delta from t−1 in compared to its neighboring data points. Further, many 14 bits. 1 data sources only store integers into ODS. This allowed us 2. For subsequent time stamps, tn : to tune the expensive prediction scheme in [25] to a simpler implementation that merely compares the current value to (a) Calculate the delta of delta: the previous value. If values are close together the sign, D = (tn − tn−1 ) − (tn−1 − tn−2 ) exponent, and first few bits of the mantissa will be identical. (b) If D is zero, then store a single ‘0’ bit We leverage this to compute a simple XOR of the current and previous values rather than employing a delta encoding (c) If D is between [-63, 64], store ‘10’ followed by scheme. the value (7 bits) We then encode these XOR’d values with the following (d) If D is between [-255, 256], store ‘110’ followed by variable length encoding scheme: the value (9 bits) 1. The first value is stored with no compression (e) if D is between [-2047, 2048], store ‘1110’ followed by the value (12 bits) 2. If XOR with the previous is zero (same value), store single ‘0’ bit (f) Otherwise store ‘1111’ followed by D using 32 bits 3. When XOR is non-zero, calculate the number of lead- The limits for the different ranges were selected by sam- ing and trailing zeros in the XOR, store bit ‘1’ followed pling a set of real time series from the production system by either a) or b): 1 The first time stamp delta is sized at 14 bits, because that (a) (Control bit ‘0’) If the block of meaningful bits size is enough to span a bit more than 4 hours (16,384 sec- falls within the block of previous meaningful bits, onds), If one chose a Gorilla block larger than 4 hours, this i.e., there are at least as many leading zeros and size would increase. as many trailing zeros as with the previous value, 1820

6. ������������������������������� ���� �� ����������������������������� �� ������������������������� ��� ����������� � ��� ��� ������������ � ��� ������������ ���� �� � ���������� ���������������� ���������������� � �� �� ��� ��� ���������������������������������� �������������������������������� Figure 5: Distribution of values compressed across Figure 6: Average bytes used for each ODS data different XOR buckets. Taken from a sample of 1.6 point as the compression bucket is varied from 0 million real values in Gorilla. (no compression) to 240 minutes. Bucket size larger than two hours do not give significant additional compression for our dataset. This is across the en- tire production Gorilla data set (approximately 2 use that information for the block position and billion time series). just store the meaningful XORed value. (b) (Control bit ‘1’) Store the length of the number of leading zeros in the next 5 bits, then store the 4.2 In-memory data structures length of the meaningful XORed value in the next The primary data structure in Gorilla’s implementation is 6 bits. Finally store the meaningful bits of the a Timeseries Map (TSmap). Figure 7 provides an overview XORed value. of this data structure. TSmap consists of a vector of C++ standard library shared-pointers to time series and a case- insensitive, case-preserving map from time series names to The overall compression scheme is visualized in Figure 2 the same. The vector allows for efficient paged scans through which depicts how our XOR encoding can store the values all the data, while the map enables constant time lookups in a time series efficiently. of particular time series. Constant time lookup is necessary Figure 5 shows the distribution of actual values in Gorilla. to achieve the design requirement for fast reads while still Roughly 51% of all values are compressed to a single bit since allowing for efficient data scans. the current and previous values are identical. About 30% of The use of C++ shared-pointers enables scans to copy the values are compressed with the control bits ‘10’ (case b), the vector (or pages thereof) in a few microseconds, avoid- with an average compressed size of 26.6 bits. The remaining ing lengthy critical sections that would impact the flow of 19% are compressed with control bits ‘11’, with an average incoming data. On deletion of a time series, a vector entry size of 36.9 bits, due to the extra 13 bits of overhead required is tombstoned, and the index is placed in a free pool which to encode the length of leading zero bits and meaningful bits. is re-used when new time series are created. Tombstoneing This compression algorithm uses both the previous float- a section of memory marks it as ’dead’, and ready to be ing point value and the previous XORed value. This results reused, without actually freeing it to the underlying system. in an additional compression factor because a sequence of Concurrency is attained with a single read-write spin lock XORed values often have a very similar number of leading protecting map and vector access and 1-byte spin lock on and trailing zeros, as visualized in Figure 4. Integer values each time series. Since each individual time series has a compress especially well because the location of the one bits relatively low write throughput, the spin lock has very low after the XOR operation is often the same for the whole contention between reads and writes. time series, meaning most values have the same number of As illustrated in Figure 7, the mapping of shard identi- trailing zeros. fier (shardId) to TSmap, named ShardMap, is maintained One trade-off that is inherent in our encoding scheme is with a vector of pointers to the TSmaps. Mapping of a the time span over which the compression algorithm oper- time series name to a shard is done using the same case- ates. Using the same encoding scheme over larger time peri- insensitive hash in the TSmap, mapping it to an ID between ods allows us to achieve better compression ratios. However, [0,NumberOfShards). Since the total number of shards in the queries that wish to read data over a short time range might system is constant and numbering in the low thousands, the need to expend additional computational resources on de- additional overhead of storing null pointers in the ShardMap coding data. Figure 6 shows the average compression ratio is negligible. Like the TSmaps, concurrent access to the for the time series stored in ODS as we change the block ShardMap is managed with a read-write spin lock. size. One can see that blocks that extend longer than two Since the data are already partitioned by shard, individual hours provide diminishing returns for compressed size. A maps remain sufficiently small (about 1 million entries), the two-hour block allows us to achieve a compression ratio of C++ standard library unordered-map has sufficient perfor- 1.37 bytes per data point. mance, and there have been no issues with lock contention. 1821

7. ShardMap vector<unique_ptr<TSmap>> TSmap TSmap TS a) TSmap spinlock TSmap RW c) TSmap TSmap lock TSmap d) Open data block vector<shared_ptr<TS>> Append only string b) Process Process unordered_map<string, Process shared_ptr<TS>> ✭ Closed Process Closed blocks blocks ✭ TSmap uses a case-preserving, case-insensitive hash Figure 7: Gorilla in-memory data structure. On a query, first a) the TSmap pointer is examined. If b) the pointer is null, it means this Gorilla host does not own the shard. If non-null, then c) the TSmap is read-locked, and the pointer to the time series structure (TS) is found in the unordered map and copied out. At this point, both RW locks can be unlocked. Next, d) the TS spinlock is locked, and data for the query time range can be directly copied out of the TS. A time series data structure is composed of a sequence of an integer identifier. This integer identifier is the index into closed blocks for data older than two hours and a single open the in-memory vector. New keys are append to the current data block that holds the most recent data. The open data key list, and Gorilla periodically scans all the keys for each block is an append-only string, to which new, compressed shard in order to re-write the file. time stamps and values are appended. Since each block As data points are streamed to Gorilla, they are stored in holds two hours of compressed data, the open block is closed a log file. The time stamps and values are compressed using once it is full. Once a block is closed, it is never changed the format described in Section 4.1. However, there is only until it is deleted out of memory. Upon closing, a block one append-only log per shard, so values within a shard are is copied to memory allocated from large slabs to reduce interleaved across time series. This difference from the in fragmentation. While the open block is often reallocated as memory encoding means that each compressed time stamp- it changes sizes, we find that the copy process reduces the value pair is also marked with it’s 32-bit integer ID, adding overall fragmentation within Gorilla. significant storage overhead to the per-shard log file. Data is read out by copying the data blocks that could Gorilla does not offer ACID guarantees and as such, the contain data for the queried time range directly into the log file is not a write-ahead-log. Data is buffered up to 64kB, output remote procedure call structure. The entire data usually comprising one or two seconds worth of data, before block is returned to the client, leaving the decompression being flushed. While the buffer is flushed on a clean shut- step to be done outside Gorilla. down, a crash might result in the loss of small amounts of data. We found this trade-off to be worth the data loss, 4.3 On disk structures as it allowed higher data rate to be pushed to disk and One of our goals for Gorilla is to survive single host fail- higher availability for writes compared with a traditional ures. Gorilla achieves persistence by storing data in Glus- write-ahead log. terFS, a POSIX-compliant, distributed file system [4] with Every two hours, Gorilla copies the compressed block data 3x replication. HDFS or other distributed file systems would to disk, as this format is much smaller than the log files. have sufficed just as easily. We also considered single host There is one complete block file for every two hours worth databases such as MySQL and RocksDB but decided against of data. It has two sections: a set of consecutive 64kB slabs these because our persistency use case did not require a of data blocks, copied directly as they appear in memory, database query language. and a list of <time series ID, data block pointer> pairs. A Gorilla host will own multiple shards of data, and it Once a block file is complete, Gorilla touches a checkpoint maintains a single directory per shard. Each directory con- file and deletes the corresponding logs. The checkpoint file tains four types of files: Key lists, append-only logs, com- is used to mark when a complete block file is flushed to disk. plete block files, and checkpoint files. If a block file was not successfully flushed to disk when it The key list is simply a map of the time series string key to on a process crash, when the new process starts up, the 1822

8.checkpoint file will not exist, so the new process knows it the system and the total data stored, each shard represents cannot trust the block file and will instead read from the about 16GB of on-disk storage. This can be read from Glus- log file only. terFS in just a few minutes, as the files are spread across several physical hosts. While the host is reading the data, 4.4 Handling failures it will accept new incoming data points and put them into a queue to be processed at the earliest possible time. When For fault tolerance, we chose to prioritize tolerating single shards are reassigned, clients immediately drain their buffers node, temporary failures with zero observable downtime and by writing to the new node. Going back to the Gorilla host large scale, and localized failures (such as a network cut to α in region A crashing example: when α crashes, the shards an entire region). We did this because single node failures are reassigned to host β in the same Gorilla instance. As happen quite often, and large scale, localized failures become soon as host β is assigned the shards, it begins accepting a concern at Facebook’s scale to allow the ability to operate streaming writes, so no data loss occurs for in-flight data. If under natural (or human-caused) disasters. There is the Gorilla host α is going down in a more controlled manner, added benefit that one can model rolling software upgrades it flushes all data to disk before exiting, so no data is lost as a set of controlled, single node failures, so optimizing for for software upgrades. this case means hassle-free and frequent code pushes. For all In our example, if host α crashes before successfully flush- other failures, we chose trade-offs that, when they do cause ing its buffers to disk, that data will be lost. In practice, data-loss, will prioritize the availability of more recent data this happens very rarely, and only a few seconds of data over older data. This is because we can rely on the existing is actually lost. We make this trade-off to accept a higher HBase TSDB system to handle historical data queries, and throughput of writes and to allow accepting more recent automated systems that detect level changes in time series writes sooner after an outage. Also, we monitor this situa- are still useful with partial data, as long as has the most tion, and are able to point reads at the more healthy region. recent data. Note that after a node failure, shards will be partially un- Gorilla ensures that it remains highly available to data available for reads until the new nodes hosting these shards center faults or network partitions by maintaining two com- read the data from disk. Queries will return partial data pletely independent instances in separate data center re- (blocks are read most recent to least recent) and will mark gions. On a write, data is streamed to each Gorilla instance, the results as partial. with no attempt to guarantee consistency. This makes large- When the read client library receives a partial result from scale failures easy to handle. When an entire region fails, its query to the Gorilla instance in region A, it will retry queries are directed at the other until the first has been back fetching the affected time series from region B and keep up for 26 hours. This is important to handle large scale dis- those results if they are not partial. If both region A and aster events, whether actual or simulated [21]. For example, region B return partial results, both partial results are re- when the Gorilla instance in region A completely fails, both turned to the caller with a flag set that some error caused reads and writes to that region will also fail. The read fail- incomplete data. The caller can then decide if it has enough ures will be transparently retried to the Gorilla instance in information to continue processing the request or if it should the healthy region B. If the event lasts long enough (more fail outright. We make this choice because Gorilla is most than one minute), data will be dropped from region A, and often used by automated systems to detect level changes requests will not be retried. When this happens, all reads in time series. These systems can function well with only can be turned off from region A until the cluster has been partial data, as long as it is the most recent data. healthy for at least 26 hours. This remediation can be per- Automatic forwarding of reads from an unhealthy host to formed either manually or automated. a healthy one means that users are protected from restarts Within each region, a Paxos-based [6, 14] system called and software upgrades. We find that upgrading the version ShardManager assigns shards to nodes. When a node fails, of software causes zero data drops, and all reads continue ShardManager distributes its shards among other nodes in to be served successfully with no manual intervention. This the cluster. During shard movement, write clients buffer also allows Gorilla to transparently serve reads across server their incoming data. The buffers are sized to hold 1 minute failures ranging from a single node to an entire region [21]. of data, and points older than a minute are discarded to Finally, we still use our HBase TSDB for long-term stor- make room for newer data. We have found that this time age of data. If all in-memory copies of data are lost, our period is sufficient to allow shard reassignment in most sit- engineers can still query the more durable storage system uations, but for extended outages, it prioritizes the most to do their analysis and drive ad-hoc queries, and Gorilla recent data, as more recent data is intuitively more useful can still drive real-time detection of level changes, once it is for driving automated detection systems. When a Gorilla restarted and accepting new writes. host α in region A crashes or goes down for any reason, writes are buffered for at least 1 minute as the Gorilla clus- ter tries to resurrect the host. If the rest of the cluster is healthy, shard movement happens in thirty seconds or less, 5. NEW TOOLS ON GORILLA resulting in no data loss. If the movement does not occur Gorilla’s low latency query processing has enabled the cre- fast enough reads can be pointed to the Gorilla instance in ation of new analysis tools. region B, either in a manual or automated process. When shards are added to a host, it will read all the data 5.1 Correlation engine from GlusterFS. These shards may have been owned by the The first is a time series correlation engine that runs same host before the restart or another. A host can read and within Gorilla. Correlation search allows users to perform process all the data it needs to be fully functional in about 5 interactive, brute-force search on many time series, currently minutes from GlusterFS. Because of the number of shards in limited to 1 million at a time. 1823

9. �� ������������������������������� ������� �������������� ��������������� ����� �� �������������������������� ��� ��� ���� �� �� ��� ���� �� ��� �� ��� �� ��� � ��� ��� ��� ������� ������� ������� ������� ������� ���������� ���� Figure 8: Total query latency breakdown with differ- Figure 9: Growth of the total query volume since ent TSDB solutions for ODS. Comparing to HBase, Gorilla’s introduction to ease data exploration and Gorilla has provided between 73x and 350x improve- develop new analysis tools. ment, depending on the query size. This plot also includes preliminary results of two other options: Gorilla using flash to store data older than 26 hours, all completed buckets every two hours, generating the new and HBase with ODS cache. values for the lower granularity table. Because scanning all data in Gorilla is very efficient, this move has reduced load on the HBase cluster, as we no longer need to write all the The correlation engine calculates the Pearson Product- high granularity data to disk and do expensive full table Moment Correlation Coefficient (PPMCC) which compares scans on HBase. a test time series to a large set of time series [22]. We find that PPMCC’s ability to find correlation between similarly shaped time series, regardless of scale, greatly helps auto- mate root-cause analysis and answer the question “What 6. EXPERIENCE happened around the time my service broke?”. We found that this approach gives satisfactory answers to our ques- 6.1 Fault tolerance tion and was simpler to implement than similarly focused We next describe several planned and unplanned events approaches described in the literature[10, 18, 16]. that occurred over the past 6 months that affected some To compute PPMCC, the test time series is distributed to portion of Facebook’s site availability. We restrict ourselves each Gorilla host along with all of the time series keys. Then, to discussing the impact of these events on Gorilla as other each host independently calculates the top N correlated time issues are beyond the scope of this paper. series, ordered by the absolute value of the PPMCC com- Network cuts. 3 unplanned events resembling network pared to the needle, and returning the time series values. cuts/outages to some portion of machines. The cuts were au- In the future, we hope that Gorilla enables more advanced tomatically detected and Gorilla automatically shifted reads data mining techniques on our monitoring time series data, to the unaffected coast without any service disruption. such as those described in the literature for clustering and Disaster readiness. 1 planned major fire drill simulat- anomaly detection [10, 11, 16]. ing total network cut to one storage back end. As above, Gorilla switched reads to the unaffected coast. Once the 5.2 Charting downed region was restored, Gorilla in the down region was Low latency queries have also enabled higher query vol- manually remediated to pull in logs from the firedrill time ume tools. As an example, engineers unrelated to the moni- frame so dashboards served out of the down region would toring team have created a new data visualization which will display the expected data to end users. show large sets of horizon charts, which themselves are re- Configuration changes and code pushes. There were ductions across many time series. This visualization allows 6 configuration changes and 6 code releases that required users to quickly visually scan across large sets of data to see restarting Gorilla in a given region. outliers and time-correlated anomalies. Bug. A release with a major bug was pushed to a single coast. Gorilla immediately shifted load to the other region 5.3 Aggregations and continued serving uses until the bug was fixed. There Recently, we moved the roll up background process from was minimal correctness issues in the served data. a set of map-reduce jobs to running directly against Gorilla. Single node failures. There were 5 single machine fail- Recall that ODS performs time-based aggregations (or roll ures (unassociated with the major bug), causing no lost data up) compression on old data, which is a lossy compression and no remediation needed that reduces data granularity [26], similar to the format used There were zero events in Gorilla in the last 6 months that by Whisper [1]. Before Gorilla, map-reduce jobs were run caused anomaly detection and alerting issues. Since Gorilla against the HBase cluster, which would read all data for launched, there has only been 1 event that disrupted real- the past hour and output values for a new, lower granu- time monitoring. In all cases, the long-term storage was able larity table. Now, a background process periodically scans to act as a backup for all monitoring-related queries. 1824

10. to 40,000 peak queries per second, as seen in Figure 9. Low latency reads have encouraged our users to build advanced data analysis tools on top of Gorilla as described in Section 5. High availability trumps resource efficiency. Fault tolerance was an important design goal for Gorilla. It needed to be able to withstand single host failures with no interrup- tion in data availability. Additionally, the service must be able to withstand disaster events that may impact an entire region. For this reason, we keep two redundant copies of data in memory despite the efficiency hit. We found that building a reliable, fault tolerant system was the most time consuming part of the project. While the team prototyped a high performance, compressed, in- Figure 10: When searching for the root cause for memory TSDB in a very short period of time, it took several a site-wide error rate increase, Gorilla’s time series more months of hard work to make it fault tolerant. How- correlation found anomalous events that were corre- ever, the advantages of fault tolerance were visible when the lated in time, namely a drop in memory used when system successfully survived both real and simulated fail- copying a newly released binary. ures [21]. We also benefited from a system that we can safely restart, upgrade, and add new nodes to whenever we need to. This has allowed us to scale Gorilla effectively with 6.2 Site wide error rate debugging low operational overhead while providing a highly reliable For an example of how Facebook uses time series data to service to our customers. drive our monitoring, one can look at a recent issue that was detected quickly and fixed due to monitoring data, first 7. FUTURE WORK described externally at SREcon15 [19]. We wish to extend Gorilla in several ways. One effort is to A mysterious problem resulted in a spike in the site wide add a second, larger data store between in-memory Gorilla error rate. This error rate was visible in Gorilla a few min- and HBase based on flash storage. This store has been built utes after the error rate spike and raised an alert which noti- to hold the compressed two hour chunks but for a longer fied the appropriate team a few minutes later [19]. Then, the period than 26 hours. We have found that flash storage hard work began. As one set of engineers mitigated the is- allows us to store about two weeks of full resolution, Gorilla sue, others began the hunt for a root cause. Using tools built compressed data. This will extend the amount of time full on Gorilla, including a new time series correlation search resolution data is available to engineers to debug problems. described in Section 5, they were able to find that the rou- Preliminary performance results are included in Figure 8. tine process of copying the release binary to Facebook’s web Before building Gorilla, ODS relied on the HBase backing servers caused an anomalous drop in memory used across store to be a real-time data store: very shortly after data the site, as illustrated in Figure 10. The detection of the was sent to ODS for storage, it needed to be available to read problem, various debugging efforts and root cause analysis, operations placing a significant burden on HBase’s disk I/O. depended on time series analysis tools enabled by Gorilla’s Now that Gorilla is acting as a write-through cache for the high performance query engine. most recent data, we have at least a 26 hour window after Since launching about 18 months ago, Gorilla has helped data is sent to ODS before they will be read from HBase. Facebook engineers identify and debug several such produc- We are exploiting this property by rewriting our write path tion issues. By reducing the 90th percentile Gorilla query to wait longer before writing to HBase. This optimization time to 10ms, Gorilla has also improved developer produc- should be much more efficient on HBase, but the effort is tivity. Further by serving 85% of all monitoring data from too new to report results. Gorilla, very few queries must hit the HBase TSDB [26], resulting in a lower load on the HBase cluster. 8. CONCLUSION 6.3 Lessons learned Gorilla is a new in-memory times series database that we Prioritize recent data over historical data. Go- have developed and deployed at Facebook. Gorilla functions rilla occupies an interesting optimization and design niche. as a write through cache for the past 26 hours of monitor- While it must be very reliable, it does not require ACID data ing data gathered across all of Facebook’s systems. In this guarantees. In fact, we have found that it is more important paper, we have described a new compression scheme that for the most recent data to be available than any previous allows us to efficiently store monitoring data comprising of data point. This led to interesting design trade-offs, such as over 700 million points per minute. Further, Gorilla has al- making a Gorilla host available for reads before older data lowed us to reduce our production query latency by over 70x is read off disk. when compared to our previous on-disk TSDB. Gorilla has Read latency matters. The efficient use of compression enabled new monitoring tools including alerts, automated and in-memory data structures has allowed for extremely remediation and an online anomaly checker. Gorilla has fast reads and resulted in a significant usage increase. While been in deployment for the past 18 months and has success- ODS served 450 queries per second when Gorilla launched, fully doubled in size twice in this period without much oper- Gorilla soon overtook it and currently handles more than ational effort demonstrating the scalability of our solution. 5,000 steady state queries per second, peaking at one point We have also verified Gorilla’s fault tolerance capabilities via 1825

11.several large scale simulated failures as well as actual disas- Linear Time and Space. In Proceedings of the eighth ter situations—Gorilla remained highly available for both ACM SIGKDD international conference on Knowledge writes and reads through these events aiding site recovery. discovery and data mining, pages 550–556. ACM, 2002. 9. ACKNOWLEDGEMENTS [11] E. Keogh, S. Lonardi, and C. A. Ratanamahatana. Towards Parameter-Free Data Mining. In Proceedings Lots of thanks to Janet Wiener, Vinod Venkataraman and of the tenth ACM SIGKDD international conference the others who reviewed early drafts of this paper to find on Knowledge discovery and data mining, pages typos and incorrect information. 206–215. ACM, 2004. Huge thanks to Sanjeev Kumar and Nathan Bronson, who [12] E. Keogh and C. A. Ratanamahatana. Exact Indexing had great insights into framing the paper to make it read of Dynamic Time Warping. Knowledge and better. information systems, 7(3):358–386, 2005. Thank you to Mike Nugent, who had the brilliant idea to use PPMCC to find root causes and effects caused by [13] I. Lazaridis and S. Mehrotra. Capturing interesting time series, and hacked a prototype together so Sensor-Generated Time Series with Quality quickly. Guarantees. In Data Engineering, 2003. Proceedings. Of course, thanks to the current ODS team (Alex Bakh- 19th International Conference on, pages 429–440. turin, Scott Franklin, Ostap Korkuna, Wojciech Lopata, Ja- IEEE, 2003. son Obenberger, and Oleksandr Voietsa), and ODS alum- [14] Leslie Lamport. Paxos Made Simple. SIGACT News, nus (Tuomas Pelkonen and Charles Thayer) who have made 32(4):51–58, December 2001. monitoring Facebook’s infrastructure so much fun over the [15] J. Lin, E. Keogh, S. Lonardi, and B. Chiu. A Symbolic last few years. You guys are awesome! Representation of Time Series, with Implications for Streaming Algorithms. In Proceedings of the 8th ACM SIGMOD workshop on Research issues in data mining 10. REFERENCES and knowledge discovery, pages 2–11. ACM, 2003. [1] Graphite - Scalable Realtime Graphing. [16] J. Lin, E. Keogh, S. Lonardi, J. P. Lankford, and http://graphite.wikidot.com/. Accessed March 20, D. M. Nystrom. Visually Mining and Monitoring 2015. Massive Time Series. In Proceedings of the tenth ACM [2] Influxdb.com: InfluxDB - Open Source Time Series, SIGKDD international conference on Knowledge Metrics, and Analytics Database. discovery and data mining, pages 460–469. ACM, http://influxdb.com/. Accessed March 20, 2015. 2004. [3] L. Abraham, J. Allen, O. Barykin, V. R. Borkar, [17] P. Lindstrom and M. Isenburg. Fast and Efficient B. Chopra, C. Gerea, D. Merl, J. Metzler, D. Reiss, Compression of Floating-Point Data. Visualization S. Subramanian, J. L. Wiener, and O. Zed. Scuba: and Computer Graphics, IEEE Transactions on, Diving into Data at Facebook. PVLDB, 12(5):1245–1250, 2006. 6(11):1057–1067, 2013. [18] A. Mueen, S. Nath, and J. Liu. Fast Approximate [4] E. B. Boyer, M. C. Broomfield, and T. A. Perrotti. Correlation for Massive Time-Series Data. In GlusterFS One Storage Server to Rule Them All. Proceedings of the 2010 ACM SIGMOD International Technical report, Los Alamos National Laboratory Conference on Management of data, pages 171–182. (LANL), 2012. ACM, 2010. [5] N. Bronson, T. Lento, and J. L. Wiener. Open Data [19] R. Nishtala. Learning from Mistakes and Outages. Challenges at Facebook. In Workshops Proceedings of Presented at SREcon, Santa Clara, CA, March 2015. the 31st International Conference on Data Engineering [20] R. Nishtala, H. Fugal, S. Grimm, M. Kwiatkowski, Workshops, ICDE Seoul, Korea. IEEE, 2015. H. Lee, H. C. Li, R. McElroy, M. Paleczny, D. Peek, [6] T. D. Chandra, R. Griesemer, and J. Redstone. Paxos P. Saab, et al. Scaling Memcache at Facebook. In nsdi, Made Live: An Engineering Perspective. In volume 13, pages 385–398, 2013. Proceedings of the twenty-sixth annual ACM [21] J. Parikh. Keynote speech. Presented at @Scale symposium on Principles of distributed computing, Conference, San Francisco, CA, September 2014. pages 398–407. ACM, 2007. [22] K. Pearson. Note on regression and inheritance in the [7] H. Chen, J. Li, and P. Mohapatra. RACE: Time Series case of two parents. Proceedings of the Royal Society Compression with Rate Adaptivity and Error Bound of London, 58(347-352):240–242, 1895. for Sensor Networks. In Mobile Ad-hoc and Sensor [23] F. Petitjean, G. Forestier, G. Webb, A. Nicholson, Systems, 2004 IEEE International Conference on, Y. Chen, and E. Keogh. Dynamic Time Warping pages 124–133. IEEE, 2004. Averaging of Time Series Allows Faster and More [8] B. Hu, Y. Chen, and E. J. Keogh. Time Series Accurate Classification. In IEEE International Classification under More Realistic Assumptions. In Conference on Data Mining, 2014. SDM, pages 578–586, 2013. [24] T. Rakthanmanon, B. Campana, A. Mueen, [9] E. Keogh, K. Chakrabarti, M. Pazzani, and G. Batista, B. Westover, Q. Zhu, J. Zakaria, and S. Mehrotra. Locally Adaptive Dimensionality E. Keogh. Searching and Mining Trillions of Time Reduction for Indexing Large Time Series Databases. Series Subsequences Under Dynamic Time Warping. ACM SIGMOD Record, 30(2):151–162, 2001. In Proceedings of the 18th ACM SIGKDD [10] E. Keogh, S. Lonardi, and B.-c. Chiu. Finding international conference on Knowledge discovery and Surprising Patterns in a Time Series Database in data mining, pages 262–270. ACM, 2012. 1826

12.[25] P. Ratanaworabhan, J. Ke, and M. Burtscher. Fast A Warehousing Solution Over a Map-Reduce Lossless Compression of Scientific Floating-Point Framework. PVLDB, 2(2):1626–1629, 2009. Data. In DCC, pages 133–142. IEEE Computer [28] T. W. Wlodarczyk. Overview of Time Series Storage Society, 2006. and Processing in a Cloud Environment. In [26] L. Tang, V. Venkataraman, and C. Thayer. Facebook’s Proceedings of the 2012 IEEE 4th International Large Scale Monitoring System Built on HBase. Conference on Cloud Computing Technology and Presented at Strata Conference, New York, 2012. Science (CloudCom), pages 625–628. IEEE Computer [27] A. Thusoo, J. S. Sarma, N. Jain, Z. Shao, P. Chakka, Society, 2012. S. Anthony, H. Liu, P. Wyckoff, and R. Murthy. Hive: 1827