1. 程式人生 > 其它 >論文解析 -- Monarch: Google’s Planet-Scale In-Memory Time Series Database

論文解析 -- Monarch: Google’s Planet-Scale In-Memory Time Series Database

Google 20年發的時序庫的paper

首先看看系統架構,

設計時,首先考慮CAP的balance問題,向Spanner這樣的高一致性方案,明顯不適合監控系統,所以Monarch犧牲一致性來保證實時性和可用性

Monarch’s design is determined by its primary usage for monitoring and alerting.
First, Monarch readily trades consistency for high availability and partition tolerance [21, 8, 9].
Writing to or reading from a strongly consistent database like Spanner [13] may block for a long time;
that is unacceptable for Monarch because it would increase mean-time-to-detection and mean-time-to-mitigation for potential outages.
To promptly deliver alerts, Monarch must serve the most recent data in a timely fashion;
for that, Monarch drops delayed writes and returns partial data for queries if necessary.
In the face of network partitions, Monarch continues to support its users’ monitoring and alerting needs, with mechanisms to indicate the underlying data may be incomplete or inconsistent.

第二,Monarch降低對於其他google系統的依賴,因為這樣回迴圈依賴,所以選擇將資料存在記憶體裡面

Second, Monarch must be low dependency on the alerting critical path.
To minimize dependencies, Monarch stores monitoring data in memory despite the high cost.
Most of Google’s storage systems, including Bigtable [10], Colossus ([36], the successor to GFS [20]), Spanner [13], Blobstore [18], and F1 [40], rely on Monarch for reliable monitoring;
thus, Monarch cannot use them on the alerting path to avoid a potentially dangerous circular dependency.
As a result, non-monitoring applications (e.g., quota services) using Monarch as a global time series database are forced to accept reduced consistency. 

首先Highlevel上,是分層的,從Global分到各個zone,這個很容易理解

Monarch的架構,分成三種顏色,

分別代表,

persistence層,

- Leave,記憶體時序庫;

- Recovery logs,WAL?可以回放出data的log,可以用於長期persistence儲存

- 配置服務,用Spanner來儲存配置資料

Ingestion層,

- Ingestion routers和Leaf routers,兩層路由

- Range assigners,管理range分佈的,在leaf間做負載均衡

Query層,

- Mixer,reducer,query broker,會合並所有子查詢的結果

- Index Server,索引,每個leaf裡面哪些資料,用於skip的

- Evaluator,做物化檢視的,定期執行特定查詢寫回leave

 

再者,看下資料模型,儲存格式

首先Monarch是schematized tables 

並且分成兩部分,Target Schema和Metric Schema,類似Timon

Target,如其名,用來記錄這條訊息的source,這裡圖裡的例子是ComputeTask

類似timon的tag,這裡Target是重複記錄的,有點低效

Monarch uses targets to associate each time series with its source entity (or monitored entity), which is, for example, the process or the VM that generates the time series.
Each target represents a monitored entity, and conforms to a target schema that defines an ordered set of target field names and associated field types.
Figure 2 shows a popular target schema named ComputeTask;
each ComputeTask target identifies a running task in a Borg [46] cluster with four fields: user, job, cluster, and task num.

為了locality,需要指定一個欄位做location

For locality, Monarch stores data close to where the data is generated.
Each target schema has one field annotated as location;
the value of this location field determines the specific Monarch zone to which a time series is routed and stored.
For example, the location field of ComputeTask is cluster; each Borg cluster is mapped to one (usually the closest) Monarch zone.
As described in Section 5.3, location fields are also used to optimize query execution.

Within each zone, Monarch stores time series of the same target together in the same leaf because they originate from the same entity and are more likely to be queried together in a join.
Monarch also groups targets into disjoint target ranges in the form of [Sstart,Send) where Sstart and Send are the start and end target strings.
A target string represents a target by concatenating the target schema name and field values in order1.
For example, in Figure 2, the target string ComputeTask::sql-dba::db.server::aa::0876 represents the Borg task of a database server.
Target ranges are used for lexicographic sharding and load balancing among leaves (see Section 4.2);
this allows more efficient aggregation across adjacent targets in queries (see Section 5.3). 

Metrics就很容易理解,就是時序點,這裡的例子不太好,他給出的是一個特殊型別,Distribution,就是histogram,所以是個map;正常就是一個數值

A metric measures one aspect of a monitored target, such as the number of RPCs a task has served, the memory utilization of a VM, etc.
Similar to a target, a metric conforms to a metric schema, which defines the time series value type and a set of metric fields.
Metrics are named like files.
Figure 2 shows an example metric called /rpc/server/latency that measures the latency of RPCs to a server; it has two metric fields that distinguish RPCs by service and command.

The value type can be boolean, int64, double, string, distribution, or tuple of other types.
All of them are standard types except distribution, which is a compact type that represents a large number of double values.
A distribution includes a histogram that partitions a set of double values into subsets called buckets and summarizes values in each bucket using overall statistics such as mean, count, and standard deviation [28]. Bucket boundaries are configurable for trade-off between data granularity (i.e., accuracy) and storage costs: users may specify finer buckets for more popular value ranges.
Figure 3 shows an example distribution-typed time series of /rpc/server/latency which measures servers’ latency in handling RPCs;
and it has a fixed bucket size of 10ms.

 

接著看寫入路徑,ingestion,

首先兩級partition,一級是根據location,一級是根據range assigner

寫入過程沒啥特殊的,記憶體儲存做了一些優化,主要是timestamp sharing和delta encoding

The right side of Figure 1 gives an overview of Monarch’s collection path.
The two levels of routers perform two levels of divide-and-conquer:
ingestion routers regionalize time series data into zones according to location fields,
and leaf routers distribute data across leaves according to the range assigner.
Recall that each time series is associated with a target and one of the target fields is a location field. 

 

 

Range assigner,負載均衡,在zone內會根據cpu和memory,去split,merge,move range

這裡有點意思的是,異構的副本策略,副本數目和時間粗細粒度是可以不同的

In addition, we allow heterogeneous replication policies (1 to 3 replicas) for users to trade off between availability and storage cost.
Replicas of each target range have the same boundaries, but their data size and induced CPU load may differ because,
for example, one user may retain only the first replica at a fine time granularity while another user retains all three replicas at a coarse granularity.
Therefore, the range assigner assigns each target range replica individually.
Of course, a leaf is never assigned multiple replicas of a single range.
Usually, a Monarch zone contains leaves in multiple failure domains (clusters); the assigner assigns the replicas for a range to different failure domains. 

Range assigners balance load in ways similar to Slicer [1].
Within each zone, the range assigner splits, merges, and moves ranges between leaves to cope with changes in the CPU load and memory usage imposed by the range on the leaf that stores it.
While range assignment is changing, data collection works seamlessly by taking advantage of recovery logs. 

 

Aggregation,聚合

核心的一點是,對於監控場景,時序資料量太大了,而且沒有必要記錄原始值,所以在ingestion的時候直接聚合就好,和Timon思路一致

For some monitoring scenarios, it is prohibitively expensive to store time series data exactly as written by clients.
One example is monitoring disk I/O, served by millions of disk servers, where each I/O operation (IOP) is accounted to one of tens of thousands of users in Google.
This generates tens of billions of time series, which is very expensive to store naively.
However, one may only care about the aggregate IOPs per user across all disk servers in a cluster.
Collection aggregation solves this problem by aggregating data during ingestion. 

 

聚合的思路,大同小異,他這裡沒有提Dataflow, 

記憶體中維持滑動視窗,視窗解決亂序,觸發後就變成immutable,late資料直接丟掉,聚合完的資料寫入記憶體庫和recovery logs裡面。

這裡有個問題,記憶體窗口裡面的資料會丟啊,多個副本保證嗎?

In addition, each leaf also maintains a sliding admission window and rejects deltas older than the window length TW .
Therefore, older buckets become immutable and generate finalized points that can be efficiently stored with delta and run-length encoding.
The admission window also enables Monarch to recover quickly from network congestion;
otherwise, leaves may be flooded by delayed traffic and never catch up to recent data, which is more important for critical alerting.
In practice, rejected writes comprise only a negligible fraction of traffic.
Once a bucket’s end time moves out of the admission window, the bucket is finalized: the aggregated point is written to the in-memory store and the recovery logs. 

 

 

最後看查詢層,

除了普通的查詢,

還支援Standing查詢,其實就是定期下發的查詢,有兩種左右:

物化檢視的生成,這個挺有意思,這樣生成物化檢視比較簡單,但是一致性保證不了

alerts,實時性會有問題

Standing queries are periodic materialized-view queries whose results are stored back into Monarch; teams use them:
(1) to condense data for faster subsequent querying and/or cost-saving; and (2) to generate alerts.
Standing queries can be evaluated by either regional zone evaluators or global root evaluators.
The decision is based on static analysis of the query and the table schemas of the inputs to the query (details in Section 5.3).
The majority of standing queries are evaluated by zone evaluators which send identical copies of the query to the corresponding zone mixers and write the output to their zone.
Such queries are efficient and resilient to network partition.
The zone and root evaluators are sharded by hashes of standing queries they process, allowing us to scale to millions of standing queries. 

 

後面還講了些,分層查詢,pushdown,索引,配置管理,不一一寫了