1. 程式人生 > >TiDB Binlog 原始碼閱讀系列文章(四)Pump server 介紹

TiDB Binlog 原始碼閱讀系列文章(四)Pump server 介紹

作者: satoru

上篇文章 中,我們介紹了 TiDB 如何通過 Pump client 將 binlog 發往 Pump,本文將繼續介紹 Pump server 的實現,對應的原始碼主要集中在 TiDB Binlog 倉庫的 pump/server.go 檔案中。

啟動 Pump Server

Server 的啟動主要由兩個函式實現:NewServer(*Server).Start

NewServer 依照傳入的配置項建立 Server 例項,初始化 Server 執行所必需的欄位,以下簡單說明部分重要欄位:

  1. metrics:一個 MetricClient,用於定時向 Prometheus Pushgateway 推送 metrics。

  2. clusterID:每個 TiDB 叢集都有一個 ID,連線到同一個 TiDB 叢集的服務可以通過這個 ID 識別其他服務是否屬於同個叢集。

  3. pdCliPD Client,用於註冊、發現服務,獲取 Timestamp Oracle。

  4. tiStore:用於連線 TiDB storage engine,在這裡主要用於查詢事務相關的資訊(可以通過 TiDB 中的對應 interface 描述 瞭解它的功能)。

  5. storage:Pump 的儲存實現,從 TiDB 發過來的 binlog 就是通過它儲存的,下一篇文章將會重點介紹。

Server 初始化以後,就可以用 (*Server).Start

啟動服務。為了避免丟失 binlog,在開始對外提供 binlog 寫入服務之前,它會將當前 Server 註冊到 PD 上,確保所有執行中的 Drainer 都已經觀察到新增的 Pump 節點。這一步除了啟動對外的服務,還開啟了一些 Pump 正常運作所必須的輔助機制,下文會有更詳細的介紹。

Pump Server API

Pump Server 通過 gRPC 暴露出一些服務,這些介面定義在 tipb/pump.pb.go,包含兩個介面 WriteBinlogPullBinlogs

WriteBinlog

顧名思義,這是用於寫入 binlog 的介面,上篇文章中 Pump client 呼叫的就是這個。客戶端傳入的請求,是以下的格式:

type WriteBinlogReq struct {
  // The identifier of tidb-cluster, which is given at tidb startup.
  // Must specify the clusterID for each binlog to write.
  ClusterID uint64 `protobuf:"varint,1,opt,name=clusterID,proto3" json:"clusterID,omitempty"`
  // Payload bytes can be decoded back to binlog struct by the protobuf.
  Payload []byte `protobuf:"bytes,2,opt,name=payload,proto3" json:"payload,omitempty"`
}

其中 Payload 是一個用 Protobuf 序列化的 binlog,WriteBinlog 的 主要流程 就是將請求中的 Payload 解析成 binlog 例項,然後呼叫 storage.WriteBinlog 儲存下來。storage.WriteBinlog 將 binlog 持久化儲存,並對 binlog 按 start TS / commit TS 進行排序,詳細的實現將在下章展開討論。

PullBinlogs

PullBinlogs 是為 Drainer 提供的介面,用於按順序獲取 binlog。這是一個 streaming 介面,客戶端請求後得到一個 stream,可以從中不斷讀取 binlog。請求的格式如下:

type PullBinlogReq struct {
  // Specifies which clusterID of binlog to pull.
  ClusterID uint64 `protobuf:"varint,1,opt,name=clusterID,proto3" json:"clusterID,omitempty"`
  // The position from which the binlog will be sent.
  StartFrom Pos `protobuf:"bytes,2,opt,name=startFrom" json:"startFrom"`
}

// Binlogs are stored in a number of sequential files in a directory.
// The Pos describes the position of a binlog.
type Pos struct {
  // The suffix of binlog file, like .000001 .000002
  Suffix uint64 `protobuf:"varint,1,opt,name=suffix,proto3" json:"suffix,omitempty"`
  // The binlog offset in a file.
  Offset int64 `protobuf:"varint,2,opt,name=offset,proto3" json:"offset,omitempty"`
}

從名字可以看出,這個請求指定了 Drainer 要從什麼時間點的 binlog 開始同步。雖然 Pos 中有 SuffixOffset 兩個欄位,目前只有 Offset 欄位是有效的,我們把它用作一個 commit TS,表示只拉取這個時間以後的 binlog。

PullBinlogs 的 主要流程,是呼叫 storage.PullCommitBinlogs 得到一個可以獲取序列化 binlog 的 channel,將這些 binlog 通過 stream.Send 介面逐個傳送給客戶端。

輔助機制

上文提到 Pump 的正常運作需要一些輔助機制,本節將逐一介紹這些機制。

fake binlog

《TiDB-Binlog 架構演進與實現原理》 一文中,對 fake binlog 機制有以下說明:

“Pump 會定時(預設三秒)向本地儲存中寫入一條資料為空的 binlog,在生成該 binlog 前,會向 PD 中獲取一個 tso,作為該 binlog 的 start_tscommit_ts,這種 binlog 我們叫作 fake binlog。

……Drainer 通過如上所示的方式對 binlog 進行歸併排序,並推進同步的位置。那麼可能會存在這種情況:某個 Pump 由於一些特殊的原因一直沒有收到 binlog 資料,那麼 Drainer 中的歸併排序就無法繼續下去,正如我們用兩條腿走路,其中一隻腿不動就不能繼續前進。我們使用 Pump 一節中提到的 fake binlog 的機制來避免這種問題,Pump 每隔指定的時間就生成一條 fake binlog,即使某些 Pump 一直沒有資料寫入,也可以保證歸併排序正常向前推進。”

genForwardBinlog 實現了這個機制,它裡面是一個定時迴圈,每隔一段時間(預設 3 秒,可通過 gen-binlog-interval 選項配置)檢查一下是否有新的 binlog 寫入,如果沒有,就呼叫 writeFakeBinlog 寫一條假的 binlog。

判斷是否有新的 binlog 寫入,是通過 lastWriteBinlogUnixNano 這個變數,每次有新的寫入都會 將這個變數設定為當前時間

垃圾回收

由於儲存容量限制,顯然 Pump 不能無限制地儲存收到的 binlog,因此需要有一個 GC (Garbage Collection) 機制來清理沒用的 binlog 釋放空間,gcBinlogFile 就負責 GC 的排程。有兩個值會影響 GC 的排程:

  1. gcInterval:控制 GC 檢查的週期,目前寫死在程式碼裡的設定是 1 小時

  2. gcDuration:binlog 的儲存時長,每次 GC 檢查就是 通過當前時間和 gcDuration 計算出 GC 時間點,在這個時間點之前的 binlog 將被 GC 在 gcBinlogFile 的迴圈中,用 select 監控著 3 種情況:

select {
case <-s.ctx.Done():
  log.Info("gcBinlogFile exit")
  return
case <-s.triggerGC:
  log.Info("trigger gc now")
case <-time.After(gcInterval):
}

3 個 case 分別對應:server 退出,外部觸發 GC,定時檢查這三種情況。其中 server 退出的情況我們直接退出迴圈。另外兩種情況都會繼續,計算 GC 時間點,交由 storage.GC 執行。

Heartbeat

心跳機制用於定時(預設兩秒)向 PD 傳送 Server 最新狀態,由 (*pumpNode).HeartBeat 實現。狀態是由 JSON 編碼的 Status 例項,主要記錄 NodeIDMaxCommitTS 之類的資訊。

HTTP API 實現

Pump Server 通過 HTTP 方式暴露出一些 API,主要提供運維相關的介面。

路徑Handler說明
GET /statusStatus返回所有 Pump 節點的狀態。
PUT /state/{nodeID}/{action}ApplyAction支援 pause 和 close 兩種 action,可以暫停和關閉 server。接到請求的 server 會確保使用者指定的 nodeID 跟自己的 nodeID 相匹配,以防誤操作。
GET /drainersAllDrainers返回通過當前 PD 服務可以發現的所有 Drainer 的狀態,一般用於除錯時確定 Pump 是否能如預期地發現 Drainer。
GET /debug/binlog/{ts}BinlogByTS通過指定的 timestamp 查詢 binlog,如果查詢結果是一條 Prewrite binlog,還會額外輸出 MVCC 相關的資訊。
POST /debug/gc/triggerTriggerGC手動觸發一次 GC,如果 GC 已經在執行中,請求將被忽略。

下線 Pump Server

下線一個 Pump server 的流程通常由 binlogctl 命令發起,例如:

bin/binlogctl -pd-urls=localhost:2379 -cmd offline-pump -node-id=My-Host:8240

binlogctl 先通過 nodeID 在 PD 發現的 Pump 節點中找到指定的節點,然後呼叫上一小節中提到的介面 PUT /state/{nodeID}/close

在 Server 端,ApplyAction 收到 close 後會將節點狀態置為 Closing(Heartbeat 程序會定時將這類狀態更新到 PD),然後另起一個 goroutine 呼叫 CloseClose 首先呼叫 cancel,通過 context 將關停訊號發往協作的 goroutine,這些 goroutine 主要就是上文提到的輔助機制執行的 goroutine,例如在 genForwardBinlog 中設計了在 context 被 cancel 時退出:

for {
  select {
  case <-s.ctx.Done():
     log.Info("genFakeBinlog exit")
     return

ClosewaitGroup 等待這些 goroutine 全部退出。這時 Pump 仍然能正常提供 PullBinlogs 服務,但是寫入功能 已經停止Close 下一行呼叫了 commitStatus,這時節點的狀態是 Closing,對應的分支呼叫了 waitSafeToOffline 來確保到目前為止寫入的 binlog 都已經被所有的 Drainer 讀到了。waitSafeToOffline 先往 storage 中寫入一條 fake binlog,由於此時寫入功能已經停止,可以確定這將是這個 Pump 最後的一條 binlog。之後就是在迴圈中定時檢查所有 Drainer 已經讀到的 Binlog 時間資訊,直到這個時間已經大於 fake binlog 的 CommitTS

waitSafeToOffline 等待結束後,就可以關停 gRPC 服務,釋放其他資源。

小結

本文介紹了 Pump server 的啟動、gRPC API 實現、輔助機制的設計以及下線服務的流程,希望能幫助大家在閱讀原始碼時有一個更清晰的思路。在上面的介紹中,我們多次提到 storage 這個實體,用來儲存和查詢 binlog 的邏輯主要封裝在這個模組內,這部分內容將在下篇文章為大家作詳細介紹。

原文閱讀https://pingcap.com/blog-cn/tidb-binlog-source-code-reading-4/

相關推薦

TiDB Binlog 原始碼閱讀系列文章Pump server 介紹

作者: satoru 在 上篇文章 中,我們介紹了 TiDB 如何通過 Pump client 將 binlog 發往 Pump,

TiDB 原始碼閱讀系列文章初識 TiDB 原始碼

本文為 TiDB 原始碼閱讀系列文章的第二篇,第一篇文章介紹了 TiDB 整體的架構,知道 TiDB 有哪些模組,分別是做什麼的,從哪裡入手比較好,哪些可以忽略,哪些需要仔細閱讀。 這篇文章是一篇入門文件,難度係數比較低,其中部分內容可能大家在其他渠道已經看過

DM 原始碼閱讀系列文章定製化資料同步功能的實現

作者:王相 本文為 DM 原始碼閱讀系列文章的第七篇,在 上篇文章 中我們介紹了 relay log 的實現,主要包括 relay

DM 原始碼閱讀系列文章Online Schema Change 同步支援

作者:lan 本文為 DM 原始碼閱讀系列文章的第八篇,上篇文章 對 DM 中的定製化資料同步功能進行詳細的講解,包括庫表路由(T

DM 原始碼閱讀系列文章shard DDL 與 checkpoint 機制的實現

作者:張學程 本文為 DM 原始碼閱讀系列文章的第九篇,在 上篇文章 中我們詳細介紹了 DM 對 online schema ch

DM 原始碼閱讀系列文章測試框架的實現

作者:楊非 本文為 DM 原始碼閱讀系列文章的第十篇,之前的文章已經詳細介紹過 DM 資料同步各元件的實現原理和程式碼解析,相信大

TiKV 原始碼解析系列文章gRPC Server 的初始化和啟動流程

作者:屈鵬 本篇 TiKV 原始碼解析將為大家介紹 TiKV 的另一週邊元件—— grpc-rs。grpc-rs 是 PingCA

TiDB 原始碼閱讀系列文章二十Table Partition

作者:肖亮亮 Table Partition 什麼是 Table Partition Table Partition 是指根據一定規則,將資料庫中的一張表分解成多個更小的容易管理的部分。從邏輯上看只有一張表,但是底層卻是由多個物理分割槽組成。相信對有關係型資料庫使用背景的使用者來

TiDB 原始碼閱讀系列文章十九tikv-client

上篇文章 中,我們介紹了資料讀寫過程中 tikv-client 需要解決的幾個具體問題,本文將繼續介紹 tikv-client 裡的兩個主要的模組——負責處理分散式計算的 copIterator 和執行二階段提交的 twoPhaseCommitter。 copIterator cop

TiDB 原始碼閱讀系列文章二十一基於規則的優化 II

在 TiDB 原始碼閱讀系列文章(七)基於規則的優化 一文中,我們介紹了幾種 TiDB 中的邏輯優化規則,包括列剪裁,最大最小消除,投影消除,謂詞下推和構建節點屬性,本篇將繼續介紹更多的優化規則:聚合消除、外連線消除和子查詢優化。 聚合消除 聚合消除會檢查 SQL 查詢中 Group By 語句所使用的列是否

讀logback原始碼系列文章——記錄日誌

今天晚上本來想來寫一下Logger怎麼記錄日誌,以及Appender元件。不過9點才從丈母孃家回來,又被幾個兄弟喊去喝酒,結果回來晚了,所以時間就只夠寫一篇Logger類的原始碼分析了。Appender找時間再寫 上篇部落格介紹了LoggerContext怎麼生成Logger

openstack系列文章

cnblogs 調度器 5.5 min 代碼位置 虛機 inux latest 階段 學習 openstack 的系列文章 - Nova Nova 基本概念 Nova 架構 openstack Log Nova 組件介紹 Nova 操作介紹 1. Nova 基本概念

Git系列文章:常見異常問題

1、GitHub提交的時顯示Updates were rejected because the remote contains work that you do  git push -u origin master 每次建立新的倉庫,提交的時總會出現這樣的錯誤。Updates

【NLP】揭祕馬爾可夫模型神祕面紗系列文章

作者:白寧超 2016年7月12日14:08:28 摘要:最早接觸馬爾可夫模型的定義源於吳軍先生《數學之美》一書,起初覺得深奧難懂且無什麼用場。直到學習自然語言處理時,才真正使用到隱馬爾可夫模型,並體會到此模型的妙用之處。馬爾可夫模型在處理序列分類時具體強大的功能,諸如解決:詞類標註、語音識別、句

TiKV 原始碼解析系列文章Prometheus

開發十年,就只剩下這套架構體系了! >>>   

讀logback原始碼系列文章——記錄日誌的實際工作類Encoder

本系列的部落格從logback怎麼對接slf4j開始,逐步介紹了LoggerContext、Logger、ContextInitializer、Appender、Action等核心元件。跟讀logback的原始碼到這個程度,雖然不能說精通,不過至少日常的配置,和簡單的自定義擴

TiKV 原始碼解析系列文章十一Storage

作者:張金鵬 背景知識 TiKV 是一個強一致的支援事務的分散式 KV 儲存。TiKV 通過 raft 來保證多副本之間的強一致,

Java系列文章

java 學習JVMJVM系列:類裝載器的體系結構 JVM系列:Class文件檢驗器JVM系列:安全管理器JVM系列:策略文件Java垃圾回收機制深入剖析Classloader(一)--類的主動使用與被動使用深入剖析Classloader(二)-根類加載器,擴展類加載器與系統類加載器深入理解JVM—JVM內存

JXLS 2.4.0系列教程——多sheet是怎麽做到的

while director write 教程 == 模板 phy sheet ack 註:本文代碼在第一篇文章基礎上修改而成,請務必先閱讀第一篇文章。 http://www.cnblogs.com/foxlee1024/p/7616987.html 本文也不會過多的講解模

JXLS 2.4.0系列教程——拾遺 如何做頁面小計

進行 line http spa shee shel nes 默認 閱讀   註:閱讀本文前,請先閱讀第四篇文章。   http://www.cnblogs.com/foxlee1024/p/7619845.html   前面寫了第四篇教程,發現有些東西忘了講了,這裏補