1. 程式人生 > >influxdb記憶體中Cache資料結構詳解

influxdb記憶體中Cache資料結構詳解

引:

前面TSM檔案格式解析(一到四)綜合分析了不同case下的TSM檔案格式,檔案格式已基本清楚。

寫入磁碟是如此格式,那在寫入磁碟之前的記憶體中是怎麼儲存的呢?

通過第一篇influxdb初探https://blog.csdn.net/jacicson1987/article/details/81986234,瞭解到記憶體中的資料是儲存在

DBStore中的某個shard裡,

每個shard有一個tsm engine

每一個tsm engine裡面有一個Cache

結構說明

type Cache struct {
	// Due to a bug in atomic  size needs to be the first word in the struct, as
	// that's the only place where you're guaranteed to be 64-bit aligned on a
	// 32 bit system. See: https://golang.org/pkg/sync/atomic/#pkg-note-BUG
	size         uint64
	snapshotSize uint64

	mu      sync.RWMutex
	store   storer
	maxSize uint64

	// snapshots are the cache objects that are currently being written to tsm files
	// they're kept in memory while flushing so they can be queried along with the cache.
	// they are read only and should never be modified
	snapshot     *Cache
	snapshotting bool

	// This number is the number of pending or failed WriteSnaphot attempts since the last successful one.
	snapshotAttempts int

	stats         *CacheStatistics
	lastSnapshot  time.Time
	lastWriteTime time.Time

	// A one time synchronization used to initial the cache with a store.  Since the store can allocate a
	// a large amount memory across shards, we lazily create it.
	initialize       atomic.Value
	initializedCount uint32
}

Cache裡面有一個store

資料就是存在這個store裡面。

Cache裡面還有一個snapshot, 定時把store裡的資料複製到snapshot.store裡,然後store清空。

然後再把snapshot.store裡的內容寫入檔案。

那這個store裡到底是什麼結構呢?

store被初始化成一個含有16個partitions(節點)的ring。這個ring我稱之為偽一致性雜湊,因為它並沒有成環。

func (c *Cache) init() {
	if !atomic.CompareAndSwapUint32(&c.initializedCount, 0, 1) {
		return
	}

	c.mu.Lock()
	c.store, _ = newring(ringShards) // ringShards = 16
	c.mu.Unlock()
}

每一個partition都初始化成一個map,key是string, value是一個數組

func newring(n int) (*ring, error) {
	if n <= 0 || n > partitions {
		return nil, fmt.Errorf("invalid number of paritions: %d", n)
	}

	r := ring{
		partitions: make([]*partition, n), // maximum number of partitions.
	}

	// The trick here is to map N partitions to all points on the continuum,
	// such that the first eight bits of a given hash will map directly to one
	// of the N partitions.
	for i := 0; i < len(r.partitions); i++ {
		r.partitions[i] = &partition{        
			store: make(map[string]*entry),
		}
	}
	return &r, nil
}

通過跟蹤發現,這個map的key就是和TSM檔案結構裡面的key一致:measurement,tags#!~#field

而這個entry呢,是一組data,每個data由timestamp和value 兩個部分構成

type FloatValue struct {
	unixnano int64
	value    float64
}

type StringValue struct {
	unixnano int64
	value    string
}

那key是怎麼對映到具體某個partition的呢

// getPartition retrieves the hash ring partition associated with the provided
// key.
func (r *ring) getPartition(key []byte) *partition {
	return r.partitions[int(xxhash.Sum64(key)%partitions)]
}

xxhash.sum64,再與partition的數量(16)求餘,得到下標,找到partition.

具體xxhash.sum64這個雜湊值怎麼計算的呢,以後在研究。

結構圖

現在已經知道了Cache中資料的儲存方式了,來張表更清楚一點

每次寫入同一個key的資料,那就找到其Entries, 把新的資料直接append到後面。

 

排序與去重

這樣就又有問題了,如果 timestamp舊的資料後來,那這一組資料的就不是按照timestamp的大小順序了。

這裡怎麼解決的呢,這裡並沒有解決,不管是來的更舊的timestamp的資料 還是duplicated資料,統統加後面。

去重和排序在兩個地方做

1. select xx from xx的時候

2. snapshot寫入TSM檔案的時候

這個去重和排序程式碼如下, 先檢查順序,需要的話就sort..最後檢查去重。

這個sort演算法有時間可以看看,應該是針對大部分都是按順序的情況下效率可以的排序。

// Deduplicate returns a new slice with any values that have the same timestamp removed.
// The Value that appears last in the slice is the one that is kept.  The returned
// Values are sorted if necessary.
func (a Values) Deduplicate() Values {
	if len(a) <= 1 {
		return a
	}

	// See if we're already sorted and deduped
	var needSort bool
	for i := 1; i < len(a); i++ {
		if a[i-1].UnixNano() >= a[i].UnixNano() {
			needSort = true
			break
		}
	}

	if !needSort {
		return a
	}

	sort.Stable(a)
	var i int
	for j := 1; j < len(a); j++ {
		v := a[j]
		if v.UnixNano() != a[i].UnixNano() {
			i++
		}
		a[i] = v

	}
	return a[:i+1]
}

 

小結:

由下至上,瞭解到寫入TSM檔案之前,資料在Cache中的儲存方式。

具體的查詢和寫入的邏輯這裡只涉及了一點點,其他的大部分包括如何分shard, 如何通過制定時間段獲得資料,如何索引到TSM檔案indexes等等還需要再研究。