1. 程式人生 > 實用技巧 >Kafka控制器事件處理全流程分析

Kafka控制器事件處理全流程分析

前言

大家好,我是 yes。

這是Kafka原始碼分析第四篇文章,今天來說說 Kafka控制器,即 Kafka Controller

原始碼類的文章在手機上看其實效果很差,這篇文章我分為兩部分,第一部分就是直接圖文來說清整個 Kafka 控制器事件處理全流程,然後再通過Controller選舉流程進行一波原始碼分析,再來走一遍處理全流程。

​一些在手機上看的同學可以直接看前半部分,沒有一堆程式碼比較舒適,也能看明白整個流程,後面原始碼部分看個人了。

不過建議電腦端看效果更佳。

正文

在深入原始碼之前我們得先搞明白 Controller是什麼?它有什麼用?這樣在看原始碼的時候才能有的放矢

Controller

核心元件,它的作用是管理和協調整個Kafka叢集

具體管理和協調什麼呢?

  • 主題的管理,建立和刪除主題;
  • 分割槽管理,增加或重分配分割槽;
  • 分割槽Leader選舉;
  • 監聽Broker相關變化,即Broker新增、關閉等;
  • 元資料管理,向其他Broker提供元資料服務;

為什麼需要Controller​?

我個人理解:凡是管理或者協調某樣東西,都需要有個Leader,由他來把控全域性,管理內部,對接外部,咱們就跟著Leader幹就完事了。這其實對外也是好的,外部不需要和我們整體溝通,他只要和一個決策者交流,效率更高。

再來看看朱大是怎麼說的,以下內容來自《深入理解Kafka:核心設計與實踐原理》。

在Kafka的早期版本中,並沒有採用 Kafka Controller 這樣一概念來對分割槽和副本的狀態進行管理,而是依賴於 ZooKeeper,每個 broker都會在 ZooKeeper 上為分割槽和副本註冊大量的監聽器(Watcher)。
當分割槽或副本狀態變化時,會喚醒很多不必要的監聽器,這種嚴重依賴 ZooKeeper 的設計會有腦裂、羊群效應,以及造成 ZooKeeper 過載的隱患。在目前的新版本的設計中,只有 Kafka Controller 在 ZooKeeper 上註冊相應的監聽器,其他的 broker 極少需要再監聽 ZooKeeper 中的資料變化,這樣省去了很多不必要的麻煩。

簡單說下ZooKeeper

瞭解了 Controller的作用之後我們還需要在簡單的瞭解下ZooKeeper,因為Controller是極度依賴ZooKeeper的。(不過社群準備移除ZooKeeper,文末再提一下)

ZooKeeper是一個開源的分散式協調服務框架,最常用來作為註冊中心等。ZooKeeper的資料模型就像檔案系統一樣,以根目錄 "/" 開始,結構上的每個節點稱為znode,可以儲存一些資訊。節點分為持久節點和臨時節點,臨時節點會隨著會話結束而自動被刪除。

並且有Watcher功能,節點自身資料變更、節點新增、節點刪除、子節點數量變更都可以通過變更監聽器通知客戶端。

Controller是如何依賴ZooKeeper的

每個Broker在啟動時會嘗試向ZooKeeper註冊/controller節點來競選控制器,第一個建立/controller節點的Broker會被指定為控制器。這就是是控制器的選舉

/controller節點是個臨時節點,其他Broker會監聽著此節點,當/controller節點所在的Broker宕機之後,會話就結束了,此節點就被移除。其他Broker伺機而動,都來爭當控制器,還是第一個建立/controller節點的Broker被指定為控制器。這就是控制器故障轉移,即Failover

當然還包括各種節點的監聽,例如主題的增減等,都通過Watcher功能,來實現相關的監聽,進行對應的處理。

Controller在初始化的時候會從ZooKeeper拉取叢集元資料資訊,儲存在自己的快取中,然後通過向叢集其他Broker傳送請求的方式將資料同步給對方。

Controller 底層事件模型

不管是監聽WatcherZooKeeperWatcher執行緒,還是定時任務執行緒亦或是其他執行緒都需要訪問或更新Controller從叢集拉取的元資料。多執行緒 + 資料競爭 = 執行緒不安全。因此需要加鎖來保證執行緒安全。

一開始Kafka就是用大量的鎖來保證執行緒間的同步,各種加鎖使得效能下降,並且多執行緒加鎖的方式使得程式碼複雜度急劇上升,一不小心就會出各種問題,bug難修復。

因此在0.11版本之後將多執行緒併發訪問改成了單執行緒事件佇列模式將涉及到共享資料競爭相關方面的訪問抽象成事件,將事件塞入阻塞佇列中,然後單執行緒處理

也就是說其它執行緒還是在的,只是把涉及共享資料的操作封裝成事件由專屬執行緒處理。

先小結一下

到這我們已經清楚了Controller主要用來管理和協調叢集,具體是通過ZooKeeper臨時節點和Watcher機制來監控叢集的變化(當然還有來自定時任務或其他執行緒的事件驅動),更新叢集的元資料,並且通知叢集中的其他Broker進行相關的操作(這部分下文會講)。

而由於叢集元資料會有併發修改問題,因此將操作抽象成事件,由阻塞佇列和單執行緒處理來替換之前的多執行緒處理,降低程式碼的複雜度,提升程式碼的可維護性和效能。

接下來我們再講講Controller通知叢集中的其他Broker的相關操作。

Controller的請求傳送

ControllerZooKeeper那兒得到變更通知之後,需要告知叢集中的Broker(包括它自身)做相應的處理。

Controller只會給叢集的Broker傳送三種請求:分別是 LeaderAndIsrRequestStopReplicaRequestUpdateMetadataRequest

LeaderAndIsrRequest

告知Broker主題相關分割槽 LeaderISR副本都在哪些 Broker上。

StopReplicaRequest

告知Broker停止相關副本操作,用於刪除主題場景或分割槽副本遷移場景。

UpdateMetadataRequest

更新Broker上的元資料。

Controller事件處理執行緒會把事件封裝成對應的請求,然後將請求寫入對應的Broker的請求阻塞佇列,然後RequestSendThread 不斷從阻塞佇列中獲取待發送的請求。

先解釋下controllerBrokerStateInfo,它就是個 POJO類,可以理解為叢集每個broker對應一個controllerBrokerStateInfo.

然後再看下ControllerChannelManager,從名字可以看出它管理Controller和叢集Broker之間的連線,併為每個Broker建立一個RequestSendThread 執行緒。

再小結一下

接著上個小結,事件處理執行緒將事件佇列裡面的事件處理之後再進行對應的請求封裝,塞入需要通知的叢集Broker對應的阻塞佇列中,然後由每個Broker專屬的requestSendThread傳送請求至對應的Broker

總的步驟如下圖:

現在應該已經清楚Controller大概是如何運作的,整體看起來還是生產者-消費者模型

接下來就進入原始碼環節。

Controller選舉流程原始碼分析

事件處理的流程都是一樣的,只是具體處理的事件邏輯不同,我們從Controller選舉入手,來走一遍處理流程。

ControllerChangeHandler

選舉會觸發此handler,可以看到直接往ControllerEventManager的事件佇列裡塞。

這個QueueEventControllerEventManager,我們先來看看是啥。不過在此之前先了解下ControllerEventControllerEventProcessor

ControllerEvent:事件

ControllerEventProcessor : 事件處理介面

此介面的唯一實現類是 KafkaController

ControllerEventManager:事件處理器

此類主要用來管理事件處理執行緒和事件佇列。

QueuedEvent:封裝了ControllerEvent的類

主要是記錄了下入隊時間,並且提供了事件需要呼叫的方法。

ControllerEventThread:事件處理執行緒

整體而言還是很簡單的,從佇列拿事件,然後處理。

KafkaController#process

就是個switch,根據事件呼叫對應的processXXXX方法。

來關注下controller 重選事件

然後在onControllerFailover裡面會呼叫sendUpdateMetadataRequest方法

中間省略呼叫,內容太多了,不是重點,到後來呼叫ControllerBrokerRequestBatch#sendRequest

最後還是呼叫了controllerChannelManager#sendRequest.

然後 RequestSendThread#doWork,不斷從請求佇列裡拿請求,傳送請求。

一個環節完成了!我們來看下整體流程圖

最後我們來看下元資料到底有啥和KafkaController的一些欄位。

ControllerContext:元資料

主要有執行中的Broker、所有主題資訊、主題分割槽副本資訊等。

KafkaController

基本上關鍵的欄位都解釋了,關於狀態機那一塊篇幅有限,之後再說。

最後

整體的流程就是將Controller相關操作都封裝成一個個事件,然後將事件入隊,由一個事件處理執行緒來處理,保證資料的安全(從這也可以看出,不是多執行緒就是好,有利有弊最終還是看場景)。

最後在通知叢集中Broker的過程是每個Broker配備一個傳送執行緒,因為傳送是同步的,因此每個Broker執行緒隔離可以防止某個Broker阻塞而導致整體都阻塞的情況。

前面有說到Kafka Controller 強依賴 ZooKeeper。但是現在社群打算移除 ZooKeeper,因為ZooKeeper不適合頻繁寫,並且是CP的。而且用Kafka 還需要維護ZooKeeper叢集,提升了系統的複雜度和運維難度,降低了系統的穩定性。

像位移資訊,已經通過內部主題的方式儲存,繞開了ZooKeeper

社群打算通過類 Raft 共識演算法來選舉Controller,並且把元資料儲存在 Log 中的方式來做。

我是 yes,從一點點到億點點,我們下篇見

往期推薦:

訊息佇列面試連環問:如何保證訊息不丟失?處理重複訊息?訊息有序性?訊息堆積處理?

圖解+程式碼|常見限流演算法以及限流在單機分散式場景下的思考

面試官:說說Kafka處理請求的全流程

Kafka索引設計的亮點

Kafka日誌段讀寫分析