1. 程式人生 > 其它 >時序資料庫永遠的難關 — 時間線膨脹(高基數 Cardinality)問題的解決方案

時序資料庫永遠的難關 — 時間線膨脹(高基數 Cardinality)問題的解決方案

簡介:本文主要討論 influxdb 在遇到寫入的資料出現高基數 Cardinality 問題時,一些可行的解決方案。

作者 | 徐建偉 (竹影)

前序

隨著移動端發展走向飽和,現在整個 IT 行業都期待著“萬物互聯”的物聯網時代。在物聯網場景中,往往有許多各類不同的終端裝置,佈署在不同的位置,去採集各種資料,比如某一區域有 10萬個 loT 裝置,每個 loT 裝置每 5 秒傳送一次資料。那麼每年會產生 6307億 個數據點。而這些資料都是順序產生的,並且 loT 裝置產生資料的格式全部是一致的,並且沒有刪除和修改的需求。針對這樣按時海量寫入無更新場景,時序資料庫應運而生。

時序資料庫在假定沒有資料插入和更新需求,資料結構穩定的前提下,極限追求快速寫入,高壓縮,快速檢索資料。時序資料的 Label(tag)會建立索引,以提高查詢效能,以便你可以快速找到與所有指定標籤匹配的值。如果 Label(tag)值的數量過多時(高基數 Cardinality 問題),索引會出現各種各樣的問題, 本文主要討論 influxdb 在遇到寫入的資料出現高基數 Cardinality 問題時,一些可行的解決方案。

高基數Cardinality問題(時間線膨脹)

時序資料庫主要儲存的是 metric 資料,每一條資料稱為一個樣本(sample),樣本由以下三部分組成:

  • 指標(時間線 time-series):metric name 和描述當前樣本特徵的 labelsets;
  • 時間戳(timestamp):一個精確到毫秒的時間戳;
  • 樣本值(value):表示當前樣本的值。

<-------------- time-series="" --------=""><-timestamp -----=""> <-value->

node_cpu{cpu=“cpu0”,mode=“idle”} @1627339366586 70

node_cpu{cpu=“cpu0”,mode=“sys”} @1627339366586 5

node_cpu{cpu=“cpu0”,mode=“user”} @1627339366586 25

通常情況下, time-series 中的 lablelsets 是有限的,可列舉的,比如上面的例子 model 可選值為 idle,sys,user。

prometheus 官方文件中對於 Label 的建議:

CAUTION: Remember that every unique combination of key-value label pairs represents a new time series, which can dramatically increase the amount of data stored. Do not use labels to store dimensions with high cardinality (many different label values), such as user IDs, email addresses, or other unbounded sets of values.

時序資料庫的設計時,也是假定在時間線低基數的前提下。但是隨著 metric 的廣泛使用,在很多場景下無法避免出現時間線膨脹。

比如,在雲原生場景下 tag 出現 pod/container ID之類,也有些 tag 出現 userId,甚至有些 tag 是 url,而這些 tag 組合時,時間線膨脹得非常厲害。

這個矛盾出現是必然的,怎麼解決呢?是寫入資料方調整寫入資料時,控制寫入的 time-series的數量,還是時序資料庫去更改設計來適用這種場景?這個問題沒有完美的解決方案,我們需要做出平衡。

從實際情況出發,如果時間線膨脹後,時序資料庫不會出現不可用,效能也不會出現指數級別下降。也就是說時間線不膨脹時,效能優秀。時間線膨脹後,效能能達到良好或者及格就好。

那怎麼讓時序資料庫在時間線膨脹的情況下效能還能良好呢?接下來我們通過influxdb的原始碼來討論這個問題。

時間線的處理邏輯

influxdb 的 tsm 結構,主要的邏輯處理過程類似 lsm。資料上報後,會新增到 cache 和日誌檔案(wal)。為了加快檢索速度或者壓縮比例,會對上報的資料進行 compaction(資料檔案合併,重新構建索引)。

索引涉及到三個方面:

  • TSI(Time Series Index)檢索Measurement,tag,tagval,time
  • TSM(Time-Structured Merge Tree)用來檢索time-series -> value
  • Series Segment Index 用來檢索 time-series key <–> time-series Id

具體influxdb的索引實現可以參照官方文章。

https://github.com/influxdata/influxdb/blob/master/tsdb/index/tsi1/doc.go

當時間線膨脹後,TSI 和 TSM 的檢索效能下降並不嚴重,問題主要是出現在 Series Segment Index 裡。

這節我們會討論influxdb的時間線檔案的正排索引(time-series key ->id, id->time-series key):

  • SeriesFile 是 Database(bucket)級別的。
  • SeriesIndex 主要處理 key->Id, key->id 的索引對映。
  • SeriesSegment 主要存放的是 Series 的 Id 和 key。
  • SeriesIndex 裡面是存放 Series 的 Id 和 key 等索引。(可以理解是兩個 hashmap)
  • keyIDMap 通過 key 來查詢對應的 Id。
  • idOffsetMap 通過 Id 查到到 offset,通過這個 offset(對應 SeriesSegment 的位置)來查詢 SeriesSegment 檔案獲取 key。

具體的程式碼(influxdb 2.0.7)如下:

tsdb/series_partition.go:30 // SeriesPartition represents a subset of series file data. type SeriesPartition struct { ... segments []*SeriesSegment index *SeriesIndex seq uint64 // series id sequence .... } tsdb/series_index.go:36 // SeriesIndex represents an index of key-to-id & id-to-offset mappings. type SeriesIndex struct { path string ... data []byte // mmap data keyIDData []byte // key/id mmap data idOffsetData []byte // id/offset mmap data // In-memory data since rebuild. keyIDMap *rhh.HashMap idOffsetMap map[uint64]int64 tombstones map[uint64]struct{} }

對 series key 進行檢索時,會先在記憶體 map 中查詢,然後在磁碟的 map 上查詢,具體的實現程式碼如下:

tsdb/series_index.go:185 func (idx *SeriesIndex) FindIDBySeriesKey(segments []*SeriesSegment, key []byte) uint64 { // 記憶體map查詢 if v := idx.keyIDMap.Get(key); v != nil { if id, _ := v.(uint64); id != 0 && !idx.IsDeleted(id) { return id } } if len(idx.data) == 0 { return 0 } hash := rhh.HashKey(key) for d, pos := int64(0), hash&idx.mask; ; d, pos = d+1, (pos+1)&idx.mask { // 磁碟map查詢offset elem := idx.keyIDData[(pos * SeriesIndexElemSize):] elemOffset := int64(binary.BigEndian.Uint64(elem[:8])) if elemOffset == 0 { return 0 } // 通過offset獲取對於的id elemKey := ReadSeriesKeyFromSegments(segments, elemOffset+SeriesEntryHeaderSize) elemHash := rhh.HashKey(elemKey) if d > rhh.Dist(elemHash, pos, idx.capacity) { return 0 } else if elemHash == hash && bytes.Equal(elemKey, key) { id := binary.BigEndian.Uint64(elem[8:]) if idx.IsDeleted(id) { return 0 } return id } } }

這裡補充一個知識點,將記憶體 hashmap 轉成磁碟 hashmap 的實現。我們都知道 hashmap 的儲存是陣列,influfxdb 中的實現是通過 mmap 方式對映磁碟空間(見 SeriesIndex 的 keyIDData),然後通過 hash 訪問陣列地址,採用的 Robin Hood Hashing,符合記憶體區域性性原理(查詢邏輯的程式碼如上 series_index.go 中)。將 Robin Hood Hashtable 純手動移植磁碟 hashtable, 開發人員還是花了不少心思。

那記憶體 map 和磁碟 map 是如何生成的,為什麼需要兩個 map?

influxdb 的做法是將新增的 series key 先放到記憶體 hashmap 裡面,當記憶體 hashmap 增長大於閾值時,將記憶體 hashmap 和磁碟 hashmap 進行 merge(遍歷所有 SeriesSegment,過濾已經刪除的 series key)生成一個新的磁碟 hashmap,這個過程叫做 compaction。compation 結束後記憶體 hashmap 被清空,然後繼續存放新增的 series key。

tsdb/series_partition.go:200 // Check if we've crossed the compaction threshold. if p.compactionsEnabled() && !p.compacting && p.CompactThreshold != 0 && p.index.InMemCount() >= uint64(p.CompactThreshold) && p.compactionLimiter.TryTake() { p.compacting = true log, logEnd := logger.NewOperation(context.TODO(), p.Logger, "Series partition compaction", "series_partition_compaction", zap.String("path", p.path)) p.wg.Add(1) go func() { defer p.wg.Done() defer p.compactionLimiter.Release() compactor := NewSeriesPartitionCompactor() compactor.cancel = p.closing if err := compactor.Compact(p); err != nil { log.Error("series partition compaction failed", zap.Error(err)) } logEnd() // Clear compaction flag. p.mu.Lock() p.compacting = false p.mu.Unlock() }() }

tsdb/series_partition.go:569 func (c *SeriesPartitionCompactor) compactIndexTo(index *SeriesIndex, seriesN uint64, segments []*SeriesSegment, path string) error { hdr := NewSeriesIndexHeader() hdr.Count = seriesN hdr.Capacity = pow2((int64(hdr.Count) * 100) / SeriesIndexLoadFactor) // Allocate space for maps. keyIDMap := make([]byte, (hdr.Capacity * SeriesIndexElemSize)) idOffsetMap := make([]byte, (hdr.Capacity * SeriesIndexElemSize)) // Reindex all partitions. var entryN int for _, segment := range segments { errDone := errors.New("done") if err := segment.ForEachEntry(func(flag uint8, id uint64, offset int64, key []byte) error { ... // Save max series identifier processed. hdr.MaxSeriesID, hdr.MaxOffset = id, offset // Ignore entry if tombstoned. if index.IsDeleted(id) { return nil } // Insert into maps. c.insertIDOffsetMap(idOffsetMap, hdr.Capacity, id, offset) return c.insertKeyIDMap(keyIDMap, hdr.Capacity, segments, key, offset, id) }); err == errDone { break } else if err != nil { return err } }

這樣設計有兩個缺陷:

  1. 做 compaction 時,當 io 訪問 SeriesSegments 檔案, 記憶體載入所有的 series key,會構建一個新的 hashtable,然後將這個 hashtable mmap 儲存到磁碟,當 series key 超過幾千萬或者更多時,會出現記憶體不夠,oom 問題。
  2. 做 compaction 時, 對於已經刪除的 series key(tombstone 標記)做了過濾,不生成 series index,但是 SeriesSegment 中已經刪除 series key 只有做了 tombstone 標記,不會做物理刪除,這樣會導致 SeriesSegment 一直膨脹,在實際生產環境一個 partition 下的所有 segmeng 檔案超過幾十 G,做 compaction 時,會產生大量 io 訪問。

可行的解決方案

1、增加partition或者database

influxdb 的正排索引是 database 級別的,有兩個方式可以減少 compaction 時的記憶體,一個是增加 partition 數量或者將多個 Measurement 劃到不同的 database 裡面。

但這樣做的問題是,已經存在資料的 influxdb 不好調整兩個資料。

2、修改時間線儲存策略

我們知道 hash 索引是 O1 的查詢,效率非常高,但是對於增長性的資料,存在擴容問題。那我們做個折中的選擇。當 partition 大於某個閾值時,將 hash 索引變成 b+tree 索引。b+tree 對於資料膨脹效能下降有限,更適合高基數問題,而且不再需要全域性的 compaction。

3、將series key的正排索引下沉到shard級別

influxdb 裡面每個 shard 都是有時間區間的,某個時間區間內的時間線資料並不大。比如 database 裡面儲存的是 180天 的 series key,而 shard 一般只有一天甚至 1 個小時的跨度,兩者存放的 series key 存在 1~ 2 個數量級的差距。另外將 series key 正排索引下沉到 shard 級別對刪除操作更友好,當 shard 過期刪除時,會將當前 shard 的所有 series key 和其他 shard 做 diff,當 series key 不存在時再去刪除 series key。

4、根據Measurement修改時間線儲存策略

在實際生產環境中,時間線膨脹和 Measurement 有很大關係,一般是少數的 Measurement 存在時間線膨脹問題,但是絕大部分的 Measurement 不存在時間線爆炸的問題。

我們可以對做 series key 的正排索引的 compaction 時,可以新增 Measurement 時間線統計,如果某個 Measurement 的時間線膨脹時,可以將這個 Measurement 的所有 series key 切換到 B+ tree。而不膨脹的 series key 繼續保留走 hash 索引。這樣方案效能比第二個方案更好,開發成本會更高一些。

目前高基數問題主要體現在 series key 正排索引。個人覺得短期先做第二個方案過度到第四個方案的方式。這樣可以比較好的解決時間線增長的問題,效能下降不多,成本不高。第三個方案改動比較大,設計更合理,可以作為一個長期修復方案。

總結

本文主要通過 influxdb 來講解時序資料庫的高基數 Cardinality 問題,以及可行的方案。metric 的維度爆炸導致資料線膨脹問題,很多同學都認為這是對時序資料庫的誤用或者是濫用。但是資訊資料爆炸的今天,讓資料維度收斂,不發散成本非常高,甚至遠高於資料儲存成本。

個人覺得需要對這個問題進行分而治之的方式,提升時序資料庫對維度爆炸的容忍度。換句話說,出現時間線膨脹後,時序資料庫不會出現崩潰情況,對時間線未膨脹的 metric 繼續高效執行,而出現時間線膨脹的 metic 可以出現效能下降,單不會線性下降。提升對時間線膨脹的容忍度,控制時間線膨脹的爆炸半徑,將會成為時序資料庫的核心能力。

原文連結
本文為阿里雲原創內容,未經允許不得轉載。