VictoriaMetrics原始碼及相關API解析
flock.lock
vmStorage在啟動時會根據路徑以及給定的最大儲存時間建立Storage物件,然後還會根據cache路徑(path + /cache
)下是否存在/reset_cache_on_startup
這一路徑在選擇進行cache目錄下的清空(reset),然後是建立flock.lock檔案,這是一個檔案鎖檔案,呼叫的底層API是
if err := unix.Flock(int(flockF.Fd()), unix.LOCK_EX|unix.LOCK_NB); err != nil { return nil, fmt.Errorf("cannot acquire lock on file %q: %w", flockFile, err) }
呼叫unix.Flock(int fd, int operation)
對建立的flock.lock檔案上檔案鎖,第二個引數的含義是指上鎖的鎖型別,有LOCK_SH
、LOCK_EX
、LOCK_NB
、LOCK_UN
四種,含義如下
LOCK_SH
表示共享鎖LOCK_EX
表示排它鎖LOCK_NB
讓想要嘗試給檔案上排它鎖的程序不進入阻塞狀態(預設會進入阻塞狀態)而直接返回LOCK_UN
用於解除檔案的檔案鎖
而flock()新增的鎖是建議性鎖,也就是使用flock()對檔案上鎖後,其他程序仍然可以對該檔案進行操作,而只有通過flock()呼叫去檢測才可以知道檔案是否處於被鎖定的狀態,也就是flock()只是起到一個通知的作用,沒有強制性的阻止檔案修改的動作。
atomic.Value
vmStorage在啟動時會需要指定每小時以及每天的最大儲存進storage的數量,這個數量由一個限制器實現:
type Limiter struct {
maxItems int
v atomic.Value
wg sync.WaitGroup
stopCh chan struct{}
}
其中,atomic.Value型別類似於一個容器,可以將任意型別的讀寫操作分裝成原子性操作(讓中間狀態對外不可見),atomic.Value型別對外暴露的方法只有兩個
v.Store(c)
- 寫操作,將原始的變數c存放到一個atomic.Value型別的v裡c = v.Load()
- 讀操作,從執行緒安全的v中讀取上一步存放的內容
Request.FormValue
Golang中net/http
包下Request.FormValue
方法可以獲取 url 中 ? 後面的請求引數
TSID
vmStorage在啟動時還需要獲取到分配到的記憶體大小,然後根據一定的比例建立cache區域,有MetricName->TSID
、MetricID->TSID
、MetricID->MetricName
以供後續插入資料建立索引使用,其中有關與TSID的建立是至關重要的,如下圖所示,VictoriaMetrics在接收到寫入請求時,會對請求中包含的時序資料做轉換處理,首先根據包含metric和labels的MetricName生成一個唯一表示TSID,然後metric + labels + TSID
作為索引index, TSID + timestamp + value
作為資料data,最後索引index和資料data分別進行儲存和檢索。
TSID的結構
// storage/tsid.go
type TSID struct {
// metricName, 指標名對應的id
MetricGroupID uint64
JobID uint32
InstanceID uint32
// metricNameRaw, 指標名+tags對應的id
MetricID uint64
}
TSID一共有四個欄位,其中JobID
、InstanceID
是為了相容Prometheus的協議而新增的,而且只有MetricID是必須的,其他三個欄位都是可選的。TSID的初始化函式如下,這一函式會在VmStorage啟動的main函式裡面的go srv.RunVMInsert()
、go srv.RunVMSelect()
中被呼叫到
func (db *indexDB) generateTSID(dst *TSID, metricName []byte, mn *MetricName) error {
// Search the TSID in the external storage.
// This is usually the db from the previous period.
var err error
if db.doExtDB(func(extDB *indexDB) {
err = extDB.getTSIDByNameNoCreate(dst, metricName)
}) {
if err == nil {
// The TSID has been found in the external storage.
return nil
}
if err != io.EOF {
return fmt.Errorf("external search failed: %w", err)
}
}
// The TSID wasn't found in the external storage.
// Generate it locally.
dst.AccountID = mn.AccountID
dst.ProjectID = mn.ProjectID
dst.MetricGroupID = xxhash.Sum64(mn.MetricGroup)
if len(mn.Tags) > 0 {
dst.JobID = uint32(xxhash.Sum64(mn.Tags[0].Value))
}
if len(mn.Tags) > 1 {
dst.InstanceID = uint32(xxhash.Sum64(mn.Tags[1].Value))
}
dst.MetricID = generateUniqueMetricID()
return nil
}
其中,JobID和InstanceID是為了相容Prometheus協議而新增的,並且是根據tag欄位的第一個跟第二個進行生成,因為VictoriaMetrics會將接收到的這兩個ID放在前兩位。MetricGroupID是對指標名稱做hash演算法;而MetricID是程序當前啟動的納秒時間戳,並且在隨後的每次新的TSID的生成都會在此基礎上自增1
倒排索引
VictoriaMetrics在建立完TSID後就會建立一系列的索引以供在查詢時使用,並且由於TSID中除了MetricID外,其他欄位都是可供選擇的,所以可供使用的有效資訊只有MetricID,因為VictoriaMetrics在構建tags到TSID的字典的過程中,就是在構建tag->MetricID的字典。
對於接收到的一個時序指標,VictoriaMetrics會生成以下幾種索引,分別是
- MetricName -> TSID
- MetricID -> MetricName
- MetricID -> TSID
- Tags -> MetricID
例如以http_requests_total{status="200", method="GET"}為例,則MetricName為http_requests_total{status="200", method="GET"}, 假設生成的TSID為{metricGroupID=0, jobID=0, instanceID=0, metricID=51106185174286},那麼生成的索引如下:
- metricName -> TSID, 即http_requests_total{status="200", method="GET"} -> {metricGroupID=0, jobID=0, instanceID=0, metricID=51106185174286}
- metricID -> metricName,即51106185174286 -> http_requests_total{status="200", method="GET"}
- metricID -> TSID,即51106185174286 -> {metricGroupID=0, jobID=0, instanceID=0, metricID=51106185174286}
- tag -> metricID,即 status="200" -> 51106185174286, method="GET" -> 51106185174286, "" = http_requests_total -> 51106185174286
有了上面這些索引的item之後,就可以進行基於tag的多維查詢,VictoriaMetrics會首先根據tag尋找到所對應的MetricID列表,然後求出所有tag的MetricID列表的交集,然後再根據這一MetricID檢索出TSID去到資料檔案查詢資料以及根據MetricID到索引檔案檢索原始的MetricName
VmSelect
VmSelect在接受到查詢請求後,會將查詢任務拆分成多個任務分發給VMStorage,再將返回的結果聚合返回,接受的url格式為/{prefix}/{authToken}/{suddix}
,含義分別如下:
- prefix:操作型別,VmSelect支援的是select和delete
- authToken:賬號型別,格式為
accountID[:projectID]
,用於租戶隔離,projectID
可選 - suffix:prometheus的查詢api
從VMSelect的api接收到查詢的query到查詢VmStorage,中間主要是一些解析查詢語句的過程,然後會通過下面這一函式對所有的VmStorage進行遍歷查詢(無差別對待)
func startStorageNodesRequest(denyPartialResponse bool, f func(idx int, sn *storageNode) interface{}) *storageNodesRequest {
resultsCh := make(chan interface{}, len(storageNodes))
for idx, sn := range storageNodes {
go func(idx int, sn *storageNode) {
result := f(idx, sn)
resultsCh <- result
}(idx, sn)
}
return &storageNodesRequest{
denyPartialResponse: denyPartialResponse,
resultsCh: resultsCh,
}
}
其中,storageNodes
是在VmSelect初始化的時候通過func InitStorageNodes(addrs []string)
便初始化好的一個節點,這就意味著當我們使用增加storageNodes的方式擴容VictoriaMetrics的時候,就需要修改VmSelect和VmInsert的引數並重新啟動,並且如果有一個VmStorage宕機的時候,查詢出來的資料也是不完整的。