1. 程式人生 > 其它 >論文解析 -- Monarch: Google’s Planet-Scale In-Memory Time Series Database

論文解析 -- Monarch: Google’s Planet-Scale In-Memory Time Series Database

Google 20年發的時序庫的paper



Monarch’s design is determined by its primary usage for monitoring and alerting.
First, Monarch readily trades consistency for high availability and partition tolerance [21, 8, 9].
Writing to or reading from a strongly consistent database like Spanner [13] may block for a long time;
that is unacceptable for Monarch because it would increase mean-time-to-detection and mean-time-to-mitigation for potential outages.
To promptly deliver alerts, Monarch must serve the most recent data in a timely fashion;
for that, Monarch drops delayed writes and returns partial data for queries if necessary.
In the face of network partitions, Monarch continues to support its users’ monitoring and alerting needs, with mechanisms to indicate the underlying data may be incomplete or inconsistent.


Second, Monarch must be low dependency on the alerting critical path.
To minimize dependencies, Monarch stores monitoring data in memory despite the high cost.
Most of Google’s storage systems, including Bigtable [10], Colossus ([36], the successor to GFS [20]), Spanner [13], Blobstore [18], and F1 [40], rely on Monarch for reliable monitoring;
thus, Monarch cannot use them on the alerting path to avoid a potentially dangerous circular dependency.
As a result, non-monitoring applications (e.g., quota services) using Monarch as a global time series database are forced to accept reduced consistency. 





- Leave,記憶體時序庫;

- Recovery logs,WAL?可以回放出data的log,可以用於長期persistence儲存

- 配置服務,用Spanner來儲存配置資料


- Ingestion routers和Leaf routers,兩層路由

- Range assigners,管理range分佈的,在leaf間做負載均衡


- Mixer,reducer,query broker,會合並所有子查詢的結果

- Index Server,索引,每個leaf裡面哪些資料,用於skip的

- Evaluator,做物化檢視的,定期執行特定查詢寫回leave



首先Monarch是schematized tables 

並且分成兩部分,Target Schema和Metric Schema,類似Timon



Monarch uses targets to associate each time series with its source entity (or monitored entity), which is, for example, the process or the VM that generates the time series.
Each target represents a monitored entity, and conforms to a target schema that defines an ordered set of target field names and associated field types.
Figure 2 shows a popular target schema named ComputeTask;
each ComputeTask target identifies a running task in a Borg [46] cluster with four fields: user, job, cluster, and task num.


For locality, Monarch stores data close to where the data is generated.
Each target schema has one field annotated as location;
the value of this location field determines the specific Monarch zone to which a time series is routed and stored.
For example, the location field of ComputeTask is cluster; each Borg cluster is mapped to one (usually the closest) Monarch zone.
As described in Section 5.3, location fields are also used to optimize query execution.

Within each zone, Monarch stores time series of the same target together in the same leaf because they originate from the same entity and are more likely to be queried together in a join.
Monarch also groups targets into disjoint target ranges in the form of [Sstart,Send) where Sstart and Send are the start and end target strings.
A target string represents a target by concatenating the target schema name and field values in order1.
For example, in Figure 2, the target string ComputeTask::sql-dba::db.server::aa::0876 represents the Borg task of a database server.
Target ranges are used for lexicographic sharding and load balancing among leaves (see Section 4.2);
this allows more efficient aggregation across adjacent targets in queries (see Section 5.3). 


A metric measures one aspect of a monitored target, such as the number of RPCs a task has served, the memory utilization of a VM, etc.
Similar to a target, a metric conforms to a metric schema, which defines the time series value type and a set of metric fields.
Metrics are named like files.
Figure 2 shows an example metric called /rpc/server/latency that measures the latency of RPCs to a server; it has two metric fields that distinguish RPCs by service and command.

The value type can be boolean, int64, double, string, distribution, or tuple of other types.
All of them are standard types except distribution, which is a compact type that represents a large number of double values.
A distribution includes a histogram that partitions a set of double values into subsets called buckets and summarizes values in each bucket using overall statistics such as mean, count, and standard deviation [28]. Bucket boundaries are configurable for trade-off between data granularity (i.e., accuracy) and storage costs: users may specify finer buckets for more popular value ranges.
Figure 3 shows an example distribution-typed time series of /rpc/server/latency which measures servers’ latency in handling RPCs;
and it has a fixed bucket size of 10ms.



首先兩級partition,一級是根據location,一級是根據range assigner

寫入過程沒啥特殊的,記憶體儲存做了一些優化,主要是timestamp sharing和delta encoding

The right side of Figure 1 gives an overview of Monarch’s collection path.
The two levels of routers perform two levels of divide-and-conquer:
ingestion routers regionalize time series data into zones according to location fields,
and leaf routers distribute data across leaves according to the range assigner.
Recall that each time series is associated with a target and one of the target fields is a location field. 



Range assigner,負載均衡,在zone內會根據cpu和memory,去split,merge,move range


In addition, we allow heterogeneous replication policies (1 to 3 replicas) for users to trade off between availability and storage cost.
Replicas of each target range have the same boundaries, but their data size and induced CPU load may differ because,
for example, one user may retain only the first replica at a fine time granularity while another user retains all three replicas at a coarse granularity.
Therefore, the range assigner assigns each target range replica individually.
Of course, a leaf is never assigned multiple replicas of a single range.
Usually, a Monarch zone contains leaves in multiple failure domains (clusters); the assigner assigns the replicas for a range to different failure domains. 

Range assigners balance load in ways similar to Slicer [1].
Within each zone, the range assigner splits, merges, and moves ranges between leaves to cope with changes in the CPU load and memory usage imposed by the range on the leaf that stores it.
While range assignment is changing, data collection works seamlessly by taking advantage of recovery logs. 




For some monitoring scenarios, it is prohibitively expensive to store time series data exactly as written by clients.
One example is monitoring disk I/O, served by millions of disk servers, where each I/O operation (IOP) is accounted to one of tens of thousands of users in Google.
This generates tens of billions of time series, which is very expensive to store naively.
However, one may only care about the aggregate IOPs per user across all disk servers in a cluster.
Collection aggregation solves this problem by aggregating data during ingestion. 



記憶體中維持滑動視窗,視窗解決亂序,觸發後就變成immutable,late資料直接丟掉,聚合完的資料寫入記憶體庫和recovery logs裡面。


In addition, each leaf also maintains a sliding admission window and rejects deltas older than the window length TW .
Therefore, older buckets become immutable and generate finalized points that can be efficiently stored with delta and run-length encoding.
The admission window also enables Monarch to recover quickly from network congestion;
otherwise, leaves may be flooded by delayed traffic and never catch up to recent data, which is more important for critical alerting.
In practice, rejected writes comprise only a negligible fraction of traffic.
Once a bucket’s end time moves out of the admission window, the bucket is finalized: the aggregated point is written to the in-memory store and the recovery logs. 








Standing queries are periodic materialized-view queries whose results are stored back into Monarch; teams use them:
(1) to condense data for faster subsequent querying and/or cost-saving; and (2) to generate alerts.
Standing queries can be evaluated by either regional zone evaluators or global root evaluators.
The decision is based on static analysis of the query and the table schemas of the inputs to the query (details in Section 5.3).
The majority of standing queries are evaluated by zone evaluators which send identical copies of the query to the corresponding zone mixers and write the output to their zone.
Such queries are efficient and resilient to network partition.
The zone and root evaluators are sharded by hashes of standing queries they process, allowing us to scale to millions of standing queries. 

