kafka叢集Controller競選與責任設計思路架構詳解-kafka 商業環境實戰
本套技術專欄是作者(秦凱新)平時工作的總結和昇華,通過從真實商業環境抽取案例進行總結和分享,並給出商業應用的調優建議和叢集環境容量規劃等內容,請持續關注本套部落格。期待加入IOT時代最具戰鬥力的團隊。QQ郵箱地址:[email protected],如有任何學術交流,可隨時聯絡。
1 無所不能的Controller
-
某一個broker被選舉出來承擔特殊的角色,就是控制器Controller。
-
Leader會向zookeeper上註冊Watcher,其他broker幾乎不用監聽zookeeper的狀態變化。
-
Controller叢集就是用來管理和協調Kafka叢集的,具體就是管理叢集中所有分割槽的狀態和分割槽對應副本的狀態。
-
每一個Kafka叢集任意時刻都只能有一個controller,當叢集啟動的時候,所有的broker都會參與到controller的競選,最終只能有一個broker勝出。
-
Controller維護的狀態分為兩類:1:管理每一臺Broker上對應的分割槽副本。2:管理每一個Topic分割槽的狀態。
-
KafkaController 核心程式碼,其中包含副本狀態機和分割槽狀態機
class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerState: BrokerState) extends Logging with KafkaMetricsGroup { this.logIdent = "[Controller " + config.brokerId + "]: " private var isRunning = true private val stateChangeLogger = KafkaController.stateChangeLogger val controllerContext = new ControllerContext(zkClient, config.zkSessionTimeoutMs) val partitionStateMachine = new PartitionStateMachine(this) val replicaStateMachine = new ReplicaStateMachine(this) private val controllerElector = new ZookeeperLeaderElector(controllerContext, ZkUtils.ControllerPath, onControllerFailover, onControllerResignation, config.brokerId) // have a separate scheduler for the controller to be able to start and stop independently of the // kafka server private val autoRebalanceScheduler = new KafkaScheduler(1) var deleteTopicManager: TopicDeletionManager = null val offlinePartitionSelector = new OfflinePartitionLeaderSelector(controllerContext, config) private val reassignedPartitionLeaderSelector = new ReassignedPartitionLeaderSelector(controllerContext) private val preferredReplicaPartitionLeaderSelector = new PreferredReplicaPartitionLeaderSelector(controllerContext) private val controlledShutdownPartitionLeaderSelector = new ControlledShutdownLeaderSelector(controllerContext) private val brokerRequestBatch = new ControllerBrokerRequestBatch(this) private val partitionReassignedListener = new PartitionsReassignedListener(this) private val preferredReplicaElectionListener = new PreferredReplicaElectionListener(this) 複製程式碼
-
KafkaController中共定義了五種selector選舉器
1、ReassignedPartitionLeaderSelector 從可用的ISR中選取第一個作為leader,把當前的ISR作為新的ISR,將重分配的副本集合作為接收LeaderAndIsr請求的副本集合。 2、PreferredReplicaPartitionLeaderSelector 如果從assignedReplicas取出的第一個副本就是分割槽leader的話,則丟擲異常,否則將第一個副本設定為分割槽leader。 3、ControlledShutdownLeaderSelector 將ISR中處於關閉狀態的副本從集合中去除掉,返回一個新新的ISR集合,然後選取第一個副本作為leader,然後令當前AR作為接收LeaderAndIsr請求的副本。 4、NoOpLeaderSelector 原則上不做任何事情,返回當前的leader和isr。 5、OfflinePartitionLeaderSelector 從活著的ISR中選擇一個broker作為leader,如果ISR中沒有活著的副本,則從assignedReplicas中選擇一個副本作為leader,leader選舉成功後註冊到Zookeeper中,並更新所有的快取。 複製程式碼
-
kafka修改分割槽和副本數
../bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --describe --topic test1 Topic:test1 PartitionCount:3 ReplicationFactor:2 Configs: Topic: test1 Partition: 0 Leader: 2 Replicas: 2,4 Isr: 2,4 Topic: test1 Partition: 1 Leader: 3 Replicas: 3,5 Isr: 3,5 Topic: test1 Partition: 2 Leader: 4 Replicas: 4,1 Isr: 4,1 複製程式碼
-
topic 分割槽擴容
./kafka-topics.sh --zookeeper 127.0.0.1:2181 -alter --partitions 4 --topic test1 複製程式碼
2 ReplicaStateMachine (ZK持久化副本分配方案)
-
Replica有7種狀態:
1 NewReplica: 在partition reassignment期間KafkaController建立New replica 2 OnlineReplica: 當一個replica變為一個parition的assingned replicas時 其狀態變為OnlineReplica, 即一個有效的OnlineReplica 3 Online狀態的parition才能轉變為leader或isr中的一員 4 OfflineReplica: 當一個broker down時, 上面的replica也隨之die, 其狀態轉變為Onffline; ReplicaDeletionStarted: 當一個replica的刪除操作開始時,其狀態轉變為ReplicaDeletionStarted 5 ReplicaDeletionSuccessful: Replica成功刪除後,其狀態轉變為ReplicaDeletionSuccessful 6 ReplicaDeletionIneligible: Replica成功失敗後,其狀態轉變為ReplicaDeletionIneligible 7 NonExistentReplica: Replica成功刪除後, 從ReplicaDeletionSuccessful狀態轉變為NonExistentReplica狀態 複製程式碼
-
ReplicaStateMachine 所在檔案: core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
-
startup: 啟動ReplicaStateMachine
-
initializeReplicaState: 初始化每個replica的狀態, 如果replica所在的broker是live狀態,則此replica的狀態為OnlineReplica。
-
處理可以轉換到Online狀態的Replica, handleStateChanges(controllerContext.allLiveReplicas(), OnlineReplica), 並且傳送LeaderAndIsrRequest到各broker nodes: handleStateChanges(controllerContext.allLiveReplicas(), OnlineReplica)
-
當建立某個topic時,該topic下所有分割槽的所有副本都是NonExistent。
-
當controller載入Zookeeper中該topic每一個分割槽的所有副本資訊到記憶體中,同時將副本的狀態變更為New。
-
之後controller選擇該分割槽副本列表中的第一個副本作為分割槽的leader副本並設定所有副本進入ISR,然後在Zookeeper中持久化該決定。
-
一旦確定了分割槽的Leader和ISR之後,controller會將這些訊息以請求的方式傳送給所有的副本。
-
同時將這些副本狀態同步到叢集的所有broker上以便讓他們知曉。
-
最後controller 會把分割槽的所有副本狀態設定為Online。
3 partitionStateMachine (根據副本分配方案建立分割槽)
-
Partition有如下四種狀態
NonExistentPartition: 這個partition還沒有被建立或者是建立後又被刪除了; NewPartition: 這個parition已建立, replicas也已分配好,但leader/isr還未就緒; OnlinePartition: 這個partition的leader選好; OfflinePartition: 這個partition的leader掛了,這個parition狀態為OfflinePartition; 複製程式碼
-
當建立Topic時,controller負責建立分割槽物件,它首先會短暫的將所有分割槽狀態設定為NonExistent。
-
之後讀取Zookeeper副本分配方案,然後令分割槽狀態設定為NewPartion。
-
處於NewPartion狀態的分割槽尚未有leader和ISR,因此Controller會初始化leader和ISR資訊並設定分割槽狀態為OnlinePartion,此時分割槽正常工作。
-
本套技術專欄是作者(秦凱新)平時工作的總結和昇華,通過從真實商業環境抽取案例進行總結和分享,並給出商業應用的調優建議和叢集環境容量規劃等內容,請持續關注本套部落格。期待加入IOT時代最具戰鬥力的團隊。QQ郵箱地址:[email protected],如有任何學術交流,可隨時聯絡。
4 Controller職責所在(監聽znode狀態變化做執行)
- UpdateMetadataRequest:更新元資料請求(比如:topic有多少個分割槽,每一個分割槽的leader在哪一臺broker上以及分割槽的副本列表),隨著叢集的執行,這部分資訊隨時都可能變更,一旦發生變更,controller會將最新的元資料廣播給所有存活的broker。具體方式就是給所有broker傳送UpdateMetadataRequest請求
- CreateTopics: 建立topic請求。當前不管是通過API方式、指令碼方式(--create)抑或是CreateTopics請求方式來建立topic,做法幾乎都是在Zookeeper的/brokers/topics下建立znode來觸發建立邏輯,而controller會監聽該path下的變更來執行真正的“建立topic”邏輯
- DeleteTopics:刪除topic請求。和CreateTopics類似,也是通過建立Zookeeper下的/admin/delete_topics/節點來觸發刪除topic,主要邏輯有:1:停止所有副本執行。2:刪除所有副本的日誌資料。3:移除zk上的 /admin/delete_topics/節點。
- 分割槽重分配:即kafka-reassign-partitions指令碼做的事情。同樣是與Zookeeper結合使用,指令碼寫入/admin/reassign_partitions節點來觸發,controller負責按照方案分配分割槽。執行過程是:先擴充套件再伸縮機制(就副本和新副本集合同時存在)。
- Preferred leader分配:調整分割槽leader副本,preferred leader選舉當前有兩種觸發方式:1. 自動觸發(auto.leader.rebalance.enable = true),controller會自動調整Preferred leader。2. kafka-preferred-replica-election指令碼觸發。兩者步驟相同,都是向Zookeeper的/admin/preferred_replica_election寫資料,controller提取資料執行preferred leader分配
- 分割槽擴充套件:即增加topic分割槽數。標準做法也是通過kafka-reassign-partitions指令碼完成,不過使用者可直接往Zookeeper中寫資料來實現,比如直接把新增分割槽的副本集合寫入到/brokers/topics/下,然後controller會為你自動地選出leader並增加分割槽
- 叢集擴充套件:新增broker時Zookeeper中/brokers/ids下會新增znode,controller自動完成服務發現的工作
- broker崩潰:同樣地,controller通過Zookeeper可實時偵測broker狀態。一旦有broker掛掉了,controller可立即感知併為受影響分割槽選舉新的leader
- ControlledShutdown:broker除了崩潰,還能“優雅”地退出。broker一旦自行終止,controller會接收到一個ControlledShudownRequest請求,然後controller會妥善處理該請求並執行各種收尾工作
- Controller leader選舉:controller必然要提供自己的leader選舉以防這個全域性唯一的元件崩潰宕機導致服務中斷。這個功能也是通過Zookeeper的幫助實現的。
5 Controller與Broker之間的通訊機制(NIO select)
- controller啟動時會為叢集中的所有Broker建立一個專屬的Socket連線,加入有100臺broker機器,那麼controller會建立100個Socket連線。新版本目前統一使用NIO select ,實際上還是要維護100個執行緒。
6 ControllerContext資料元件
- controller的快取,可謂是最重要的資料元件了,ControllerContext彙總了Zookeeper中關於kafka叢集中所有元資料資訊,是controller能夠正確提供服務的基礎。
本套技術專欄是作者(秦凱新)平時工作的總結和昇華,通過從真實商業環境抽取案例進行總結和分享,並給出商業應用的調優建議和叢集環境容量規劃等內容,請持續關注本套部落格。期待加入IOT時代最具戰鬥力的團隊。QQ郵箱地址:[email protected],如有任何學術交流,可隨時聯絡。
7 總結
kafka叢集Controller主要幹通過ZK持久化副本分配方案,根據副本分配方案建立分割槽,監聽ZK znode狀態變化做執行處理,維護分割槽和副本ISR機制穩定執行。感謝huxihx技術部落格以及相關書籍,讓我理解了Controller核心機制,寫一篇學習筆記,作為總結,辛苦成文,實屬不易,謝謝。
秦凱新 於深圳 201812021541