influxdb記憶體中Cache資料結構詳解
引:
前面TSM檔案格式解析(一到四)綜合分析了不同case下的TSM檔案格式,檔案格式已基本清楚。
寫入磁碟是如此格式,那在寫入磁碟之前的記憶體中是怎麼儲存的呢?
通過第一篇influxdb初探https://blog.csdn.net/jacicson1987/article/details/81986234,瞭解到記憶體中的資料是儲存在
DBStore中的某個shard裡,
每個shard有一個tsm engine
每一個tsm engine裡面有一個Cache
結構說明
type Cache struct { // Due to a bug in atomic size needs to be the first word in the struct, as // that's the only place where you're guaranteed to be 64-bit aligned on a // 32 bit system. See: https://golang.org/pkg/sync/atomic/#pkg-note-BUG size uint64 snapshotSize uint64 mu sync.RWMutex store storer maxSize uint64 // snapshots are the cache objects that are currently being written to tsm files // they're kept in memory while flushing so they can be queried along with the cache. // they are read only and should never be modified snapshot *Cache snapshotting bool // This number is the number of pending or failed WriteSnaphot attempts since the last successful one. snapshotAttempts int stats *CacheStatistics lastSnapshot time.Time lastWriteTime time.Time // A one time synchronization used to initial the cache with a store. Since the store can allocate a // a large amount memory across shards, we lazily create it. initialize atomic.Value initializedCount uint32 }
Cache裡面有一個store
資料就是存在這個store裡面。
Cache裡面還有一個snapshot, 定時把store裡的資料複製到snapshot.store裡,然後store清空。
然後再把snapshot.store裡的內容寫入檔案。
那這個store裡到底是什麼結構呢?
store被初始化成一個含有16個partitions(節點)的ring。這個ring我稱之為偽一致性雜湊,因為它並沒有成環。
func (c *Cache) init() { if !atomic.CompareAndSwapUint32(&c.initializedCount, 0, 1) { return } c.mu.Lock() c.store, _ = newring(ringShards) // ringShards = 16 c.mu.Unlock() }
每一個partition都初始化成一個map,key是string, value是一個數組
func newring(n int) (*ring, error) { if n <= 0 || n > partitions { return nil, fmt.Errorf("invalid number of paritions: %d", n) } r := ring{ partitions: make([]*partition, n), // maximum number of partitions. } // The trick here is to map N partitions to all points on the continuum, // such that the first eight bits of a given hash will map directly to one // of the N partitions. for i := 0; i < len(r.partitions); i++ { r.partitions[i] = &partition{ store: make(map[string]*entry), } } return &r, nil }
通過跟蹤發現,這個map的key就是和TSM檔案結構裡面的key一致:measurement,tags#!~#field
而這個entry呢,是一組data,每個data由timestamp和value 兩個部分構成。
type FloatValue struct {
unixnano int64
value float64
}
type StringValue struct {
unixnano int64
value string
}
那key是怎麼對映到具體某個partition的呢
// getPartition retrieves the hash ring partition associated with the provided
// key.
func (r *ring) getPartition(key []byte) *partition {
return r.partitions[int(xxhash.Sum64(key)%partitions)]
}
xxhash.sum64,再與partition的數量(16)求餘,得到下標,找到partition.
具體xxhash.sum64這個雜湊值怎麼計算的呢,以後在研究。
結構圖
現在已經知道了Cache中資料的儲存方式了,來張表更清楚一點
每次寫入同一個key的資料,那就找到其Entries, 把新的資料直接append到後面。
排序與去重
這樣就又有問題了,如果 timestamp舊的資料後來,那這一組資料的就不是按照timestamp的大小順序了。
這裡怎麼解決的呢,這裡並沒有解決,不管是來的更舊的timestamp的資料 還是duplicated資料,統統加後面。
去重和排序在兩個地方做
1. select xx from xx的時候
2. snapshot寫入TSM檔案的時候
這個去重和排序程式碼如下, 先檢查順序,需要的話就sort..最後檢查去重。
這個sort演算法有時間可以看看,應該是針對大部分都是按順序的情況下效率可以的排序。
// Deduplicate returns a new slice with any values that have the same timestamp removed.
// The Value that appears last in the slice is the one that is kept. The returned
// Values are sorted if necessary.
func (a Values) Deduplicate() Values {
if len(a) <= 1 {
return a
}
// See if we're already sorted and deduped
var needSort bool
for i := 1; i < len(a); i++ {
if a[i-1].UnixNano() >= a[i].UnixNano() {
needSort = true
break
}
}
if !needSort {
return a
}
sort.Stable(a)
var i int
for j := 1; j < len(a); j++ {
v := a[j]
if v.UnixNano() != a[i].UnixNano() {
i++
}
a[i] = v
}
return a[:i+1]
}
小結:
由下至上,瞭解到寫入TSM檔案之前,資料在Cache中的儲存方式。
具體的查詢和寫入的邏輯這裡只涉及了一點點,其他的大部分包括如何分shard, 如何通過制定時間段獲得資料,如何索引到TSM檔案indexes等等還需要再研究。