1. 程式人生 > 其它 >Kafka 控制器Controller

Kafka 控制器Controller

Kafka 控制器Controller

Broker 在啟動時,會嘗試去 ZooKeeper 中建立 /controller 節點。Kafka 當前選舉控制器的規則是:第一個成功建立 /controller 節點的 Broker 會被指定為控制器

Controller Broker的主要職責有很多,主要是一些管理行為,主要包括以下幾個方面:

  • 建立、刪除主題,增加分割槽並分配leader分割槽
  • 叢集Broker管理(新增 Broker、Broker 主動關閉、Broker 故障)
  • preferred leader選舉
  • 分割槽重分配

為什麼需要Controller

在Kafka早期版本,對於分割槽和副本的狀態的管理依賴於zookeeper的Watcher和佇列:每一個broker都會在zookeeper註冊Watcher,所以zookeeper就會出現大量的Watcher, 如果宕機的broker上的partition很多比較多,會造成多個Watcher觸發,造成叢集內大規模調整;每一個replica都要去再次zookeeper上註冊監視器,當叢集規模很大的時候,zookeeper負擔很重。這種設計很容易出現腦裂和羊群效應以及zookeeper叢集過載。
新的版本中該變了這種設計,使用KafkaController,只有KafkaController,Leader會向zookeeper上註冊Watcher,其他broker幾乎不用監聽zookeeper的狀態變化。
Kafka叢集中多個broker,有一個會被選舉為controller leader,負責管理整個叢集中分割槽和副本的狀態,比如partition的leader 副本故障,由controller 負責為該partition重新選舉新的leader 副本;當檢測到ISR列表發生變化,有controller通知叢集中所有broker更新其MetadataCache資訊;或者增加某個topic分割槽的時候也會由controller管理分割槽的重新分配工作。

具體職責

具備控制器身份的broker需要比其他普通的broker多一份職責,具體細節如下:

  1. 監聽partition相關的變化。為Zookeeper中的/admin/reassign_partitions節點註冊PartitionReassignmentListener,用來處理分割槽重分配的動作。為Zookeeper中的/isr_change_notification節點註冊IsrChangeNotificetionListener,用來處理ISR集合變更的動作。為Zookeeper中的/admin/preferred-replica-election節點新增PreferredReplicaElectionListener,用來處理優先副本的選舉動作。
  2. 監聽topic相關的變化。為Zookeeper中的/brokers/topics節點新增TopicChangeListener,用來處理topic增減的變化;為Zookeeper中的/admin/delete_topics節點新增TopicDeletionListener,用來處理刪除topic的動作。
  3. 監聽broker相關的變化。為Zookeeper中的/brokers/ids/節點新增BrokerChangeListener,用來處理broker增減的變化。
  4. 從Zookeeper中讀取獲取當前所有與topic、partition以及broker有關的資訊並進行相應的管理。對於所有topic所對應的Zookeeper中的/brokers/topics/[topic]節點新增PartitionModificationsListener,用來監聽topic中的分割槽分配變化。
  5. 啟動並管理分割槽狀態機和副本狀態機。
  6. 更新叢集的元資料資訊。
  7. 如果引數auto.leader.rebalance.enable設定為true,則還會開啟一個名為“auto-leader-rebalance-task”的定時任務來負責維護分割槽的優先副本的均衡。

Kafka leader選舉

Kafka 使用 Zookeeper 來維護叢集成員 (brokers) 的資訊。每個 broker 都有一個唯一標識 broker.id,用於標識自己在叢集中的身份,可以在配置檔案 server.properties 中進行配置,或者由程式自動生成。

選舉流程

其選舉leader成為controller的過程如下:

  1. 在kafka叢集中,每一個 broker 啟動的時候,它會在zk 的 /brokers/ids 路徑下建立一個 臨時節點(例如:{“version”:1,”brokerid”:1,”timestamp”:”1512018424988”}),並將自己的 broker.id 寫入,從而將自身註冊到叢集;
  2. 第一個啟動的broker會在zk中建立一個臨時節點 /controller 讓自己成為控制器。其他broker啟動時也會試著建立這個節點當然他們會失敗,因為已經有人建立過了。那麼這些節點會在控制器節點上建立zk watch物件,這樣他們就可以收到這個節點變更的通知。任何時刻都確保叢集中只有一個leader的存在。
  3. 如果控制器被關閉或者與zk斷開連線,zk上的controller節點馬上就會消失。那麼其他訂閱了leader節點的broker也會收到通知隨後他們會嘗試讓自己成為新的leader,重複第一步的操作。
  4. 如果leader完好但是別的broker離開了叢集,那麼leader會去確定離開的broker的分割槽並確認新的分割槽領導者(即分割槽副本列表裡的下一個副本)。然後向所有包含該副本的follower或者observer傳送請求。隨後新的分割槽首領開始處理請求。

利用Zookeeper的強一致性特性,一個節點只能被一個客戶端建立成功,建立成功的broker即為leader,即先到先得原則,leader也就是叢集中的controller,負責叢集中所有大小事務。


參考Zookeeper的Leader Election如下:

搶注Leader節點——非公平模式

  1. 建立Leader父節點,如/chroot,並將其設定為persist節點
  2. 各客戶端通過在/chroot下建立Leader節點,如/chroot/leader,來競爭Leader。該節點應被設定為ephemeral
  3. 若某建立Leader節點成功,則該客戶端成功競選為Leader
  4. 若建立Leader節點失敗,則競選Leader失敗,在/chroot/leader節點上註冊exist的watch,一旦該節點被刪除則獲得通知
  5. Leader可通過刪除Leader節點來放棄Leader
  6. 如果Leader宕機,由於Leader節點被設定為ephemeral,Leader節點會自行刪除。而其它節點由於在Leader節點上註冊了watch,故可得到通知,參與下一輪競選,從而保證總有客戶端以Leader角色工作。

先到先得,後者監視前者——公平模式

  1. 建立Leader父節點,如/chroot,並將其設定為persist節點
  2. 各客戶端通過在/chroot下建立Leader節點,如/chroot/leader,來競爭Leader。該節點應被設定為ephemeral_sequential
  3. 客戶端通過getChildren方法獲取/chroot/下所有子節點,如果其註冊的節點的id在所有子節點中最小,則當前客戶端競選Leader成功
  4. 否則,在前面一個節點上註冊watch,一旦前者被刪除,則它得到通知,返回step 3(並不能直接認為自己成為新Leader,因為可能前面的節點只是宕機了)
  5. Leader節點可通過自行刪除自己建立的節點以放棄Leader

選舉方式

新增Controller機制後,減輕Zookeeper負載,Controller與Leader及Follower間通過RPC通訊,高效且實時,但是由於引入Controller增加了複雜度,同時需要考慮Controller的Failover(容錯)

Kafka通過leaderSelector完成leader的選舉。

可能觸發為partition選舉leader的場景有: 新建立topic,broker啟動,broker停止,controller選舉,客戶端觸發,reblance等等 場景。在不同的場景下選舉方法不盡相同。Kafka提供了五種leader選舉方式,繼承PartitionLeaderSelector,實現selectLeader方法完成leader的選舉,

選舉器用PartitionLeaderSelector表示,有5個實現類:

1、OfflinePartitionLeaderSelector:

2、ReassignedPartitionLeaderSelector:

3、PreferredReplicaPartitionLeaderSelector:

4、ControlledShutdownLeaderSelector:

5、NoOpLeaderSelector:

Controller傳送broker的請求

ControllerZooKeeper那兒得到變更通知之後,需要告知叢集中的Broker(包括它自身)做相應的處理。

Controller只會給叢集的Broker傳送三種請求:分別是 LeaderAndIsrRequestStopReplicaRequestUpdateMetadataRequest

LeaderAndIsrRequest

告知Broker主題相關分割槽LeaderISR副本都在哪些 Broker上。

StopReplicaRequest

告知Broker停止相關副本操作,用於刪除主題場景或分割槽副本遷移場景。

UpdateMetadataRequest

更新Broker上的元資料。

Controller事件處理執行緒會把事件封裝成對應的請求,然後將請求寫入對應的Broker的請求阻塞佇列,然後RequestSendThread不斷從阻塞佇列中獲取待發送的請求。

處理下線Broker場景

每個 Broker 啟動後,會在zookeeper的 /Brokers/ids 下建立一個臨時 znode。當 Broker 宕機或主動關閉後,該 Broker 與 ZooKeeper 的會話結束,這個 znode 會被自動刪除。同理,ZooKeeper 的 Watch 機制將這一變更推送給控制器,這樣控制器就能知道有 Broker 關閉或宕機了,從而進行後續的協調操作。

Controller將收到通知並對此採取行動,決定哪些Broker上的分割槽成為leader分割槽,然後,它會通知每個相關的Broker,將Broker上的主題分割槽變成leader,通過LeaderAndIsr請求從新的leader分割槽中複製資料。

Controller中儲存的資料儲存

Kafka 是離不開 ZooKeeper的,所快取的資料資訊在 ZooKeeper 中也儲存了一份。每當控制器初始化時,它都會從 ZooKeeper 上讀取對應的元資料並填充到自己的快取中。

歸納主要包含三類:

  • broker 上的所有資訊。包括 broker 中的所有分割槽,broker 所有分割槽副本,當前都有哪些執行中的 broker,哪些正在關閉中的 broker 。
  • 所有主題資訊。包括具體的分割槽資訊,比如領導者副本是誰,ISR 集合中有哪些副本等。
  • 所有涉及運維任務的分割槽。包括當前正在進行 Preferred 領導者選舉以及分割槽重分配的分割槽列表。

Broker Controller 初始化

Controller選舉成功在啟動後,首先進行一些快取的清理,並在zk上註冊監聽事件,監聽那些Broker變化,Topic變化等事件,用於行使Controller的具體職責。同時通過傳送UpdateMetadataRequest,用於各個Broker更新metadata。在啟動過程中,Controller還會啟動副本狀態機和分割槽狀態機,這兩個狀態機用於記錄副本和分割槽的狀態,並且預設了狀態轉換的處理方法。在Controller啟動時會分別呼叫兩個狀態機的startup()方法,在該方法中初始化副本和分割槽的狀態,並且主要地觸發LeaderAndIsrRequest請求到Broker。

Broker之間元資料快取一致性

Kafka在設計時一個願景:每臺Kafka broker都要維護相同的快取,這樣客戶端程式(clients)隨意地給任何一個broker傳送請求都能夠獲取相同的資料,這也是為什麼任何一個broker都能處理clients發來的Metadata請求的原因。這種用空間去換時間的做法可以縮短請求被處理的延時從而提高整體clients端的吞吐。

目前Kafka是怎麼更新cache的?
簡單來說,有叢集中的controller監聽Zookeeper上元資料節點,由controller和ZK元資料保持一致,具體的更新操作實際上是由controller來完成的。controller會在一定場景下向各broker傳送UpdateMetadata請求令這些broker去更新它們各自的cache,這些broker一旦接收到請求便開始全量更新——即清空當前所有cache資訊,使用UpdateMetadata請求中的資料來重新填充cache。

由於是非同步更新的,所以在某一個時間點叢集上所有broker的cache資訊就未必是嚴格相同的。只不過在實際使用場景中,這種弱一致性似乎並沒有太大的問題。

原因如下:

  1. clients並不是時刻都需要去請求元資料的,且會快取到本地;
  2. 即使獲取的元資料無效或者過期了,clients通常都有重試機制,可以去其他broker上再次獲取元資料;
  3. cache更新是很輕量級的,僅僅是更新一些記憶體中的資料結構,不會有太大的成本。因此我們還是可以安全地認為每臺broker上都有相同的cache資訊。

Broker Controller 故障轉移

由於broker controller 只有一個,那麼必然會存在單點失效問題。kafka 為考慮到這種情況提供了故障轉移功能,也就是 Fail Over。如下圖:

當新的controller開始工作後,舊的controller可能還在工作,這時就會有兩個自認為是的controller,那麼broker該聽哪個的呢?

Kafka leader 分割槽自動平衡機制

broker配置auto.leader.rebalance.enable=true,開啟分割槽自動平衡

當 partition 1 的 leader,就是 broker.id = 1 的節點掛掉後,那麼 leader 0 或 leader 2 成為 partition 1 的 leader,那麼 leader 0 或 leader 2 會管理兩個 partition 的讀寫,效能會下 降,當 leader 1 重新啟動後,如果開啟了 leader 均衡機制,那麼 leader 1 會重新成為 partition 1 的 leader,降低 leader 0 或 leader 2 的負載

上面提到的選Leader分割槽,嚴格意義上是換Leader分割槽,為了達到負載均衡,可能會造成原來正常的Leader分割槽被強行變為follower分割槽。換一次 Leader 代價是很高的,原本向 Leader分割槽A(原Leader分割槽) 傳送請求的所有客戶端都要切換成向 B (新的Leader分割槽)傳送請求,建議在生產環境中把這個引數設定成 false

Kafka 首選領導者Preferred Leader

Kafka認為leader分割槽副本最初的分配(每個節點都處於活躍狀態)是均衡的。這些被最初選中的分割槽副本就是所謂的首選領導者(preferred leaders)

在 broker 掛掉之後,分割槽 leader 會變更,久而久之就會變得不均衡,Kafka 預設序號最小的副本為 Preferred leader,在 broker 重啟回來後,Kafka 會重新調整分割槽的 Preferred leader 成為 leader,Preferred leader 選舉分為手動選舉和自動選舉,涉及引數 auto.leader.rebalance.enable,還有個預設允許 10% 不均衡策略等等。

同步副本(in-sync replica ,ISR)列表

選擇一個同步副本列表中的分割槽作為leader 分割槽的過程稱為clean leader election。注意,這裡要與在非同步副本中選一個分割槽作為leader分割槽的過程區分開,在非同步副本中選一個分割槽作為leader的過程稱之為unclean leader election

由於ISR是動態調整的,所以會存在ISR列表為空的情況,通常來說,非同步副本落後 Leader 太多,因此,如果選擇這些副本作為新 Leader,就可能出現數據的丟失。畢竟,這些副本中儲存的訊息遠遠落後於老 Leader 中的訊息。在 Kafka 中,選舉這種副本的過程可以通過Broker 端引數 **unclean.leader.election.enable **控制是否允許 Unclean 領導者選舉。開啟 Unclean 領導者選舉可能會造成資料丟失,但好處是,它使得分割槽 Leader 副本一直存在,不至於停止對外提供服務,因此提升了高可用性。反之,禁止 Unclean Leader 選舉的好處在於維護了資料的一致性,避免了訊息丟失,但犧牲了高可用性。分散式系統的CAP理論說的就是這種情況。

Controller 腦裂

如果controller Broker 掛掉了,Kafka叢集必須找到可以替代的controller,叢集將不能正常運轉。這裡面存在一個問題,很難確定Broker是掛掉了,還是僅僅只是短暫性的故障。但是,叢集為了正常運轉,必須選出新的controller。如果之前被取代的controller又正常了,他並不知道自己已經被取代了,那麼此時叢集中會出現兩臺controller。

其實這種情況是很容易發生。比如,某個controller由於GC而被認為已經掛掉,並選擇了一個新的controller。在GC的情況下,在最初的controller眼中,並沒有改變任何東西,該Broker甚至不知道它已經暫停了。因此,它將繼續充當當前controller,這是分散式系統中的常見情況,稱為腦裂。

假如,處於活躍狀態的controller進入了長時間的GC暫停。它的ZooKeeper會話過期了,之前註冊的/controller節點被刪除。叢集中其他Broker會收到zookeeper的這一通知。

由於叢集中必須存在一個controller Broker,所以現在每個Broker都試圖嘗試成為新的controller。假設Broker 2速度比較快,成為了最新的controller Broker。此時,每個Broker會收到Broker2成為新的controller的通知,由於Broker3正在進行”stop the world”的GC,可能不會收到Broker2成為最新的controller的通知。

等到Broker3的GC完成之後,仍會認為自己是叢集的controller,在Broker3的眼中好像什麼都沒有發生一樣。

現在,叢集中出現了兩個controller,它們可能一起發出具有衝突的命令,就會出現腦裂的現象。如果對這種情況不加以處理,可能會導致嚴重的不一致。所以需要一種方法來區分誰是叢集當前最新的Controller。

Kafka是通過使用epoch number(紀元編號,也稱為隔離令牌)來完成的。epoch number只是單調遞增的數字,第一次選出Controller時,epoch number值為1,如果再次選出新的Controller,則epoch number將為2,依次單調遞增。

每個新選出的controller通過Zookeeper 的條件遞增操作獲得一個全新的、數值更大的epoch number 。其他Broker 在知道當前epoch number 後,如果收到由controller發出的包含較舊(較小)epoch number的訊息,就會忽略它們,即Broker根據最大的epoch number來區分當前最新的controller。

上圖,Broker3向Broker1發出命令:讓Broker1上的某個分割槽副本成為leader,該訊息的epoch number值為1。於此同時,Broker2也向Broker1傳送了相同的命令,不同的是,該訊息的epoch number值為2,此時Broker1只聽從Broker2的命令(由於其epoch number較大),會忽略Broker3的命令,從而避免腦裂的發生。

參考:

一文弄懂Kafka基礎理論

直擊Kafka的心臟——控制器

Kafka學習筆記(4)----Kafka的Leader Election

KafkaLeader選舉時機和選舉策略

Kafka的Controller Broker是什麼