1. 程式人生 > >深入學習Kafka:Leader Election

深入學習Kafka:Leader Election

本文所講的Leader是指叢集中的Controller,而不是各個Partition的Leader。

為什麼要有Leader?

在Kafka早期版本,對於分割槽和副本的狀態的管理依賴於zookeeper的Watcher和佇列:每一個broker都會在zookeeper註冊Watcher,所以zookeeper就會出現大量的Watcher, 如果宕機的broker上的partition很多比較多,會造成多個Watcher觸發,造成叢集內大規模調整;每一個replica都要去再次zookeeper上註冊監視器,當叢集規模很大的時候,zookeeper負擔很重。這種設計很容易出現腦裂和羊群效應以及zookeeper叢集過載。

新的版本中該變了這種設計,使用KafkaController,只有KafkaController,Leader會向zookeeper上註冊Watcher,其他broker幾乎不用監聽zookeeper的狀態變化。

Kafka叢集中多個broker,有一個會被選舉為controller leader,負責管理整個叢集中分割槽和副本的狀態,比如partition的leader 副本故障,由controller 負責為該partition重新選舉新的leader 副本;當檢測到ISR列表發生變化,有controller通知叢集中所有broker更新其MetadataCache資訊;或者增加某個topic分割槽的時候也會由controller管理分割槽的重新分配工作

Kafka叢集Leader選舉原理

我們知道Zookeeper叢集中也有選舉機制,是通過Paxos演算法,通過不同節點向其他節點發送資訊來投票選舉出leader,但是Kafka的leader的選舉就沒有這麼複雜了。
Kafka的Leader選舉是通過在zookeeper上建立/controller臨時節點來實現leader選舉,並在該節點中寫入當前broker的資訊
{“version”:1,”brokerid”:1,”timestamp”:”1512018424988”}
利用Zookeeper的強一致性特性,一個節點只能被一個客戶端建立成功,建立成功的broker即為leader,即先到先得原則,leader也就是叢集中的controller,負責叢集中所有大小事務。
當leader和zookeeper失去連線時,臨時節點會刪除,而其他broker會監聽該節點的變化,當節點刪除時,其他broker會收到事件通知,重新發起leader選舉。

KafkaController

KafkaController初始化ZookeeperLeaderElector物件,為ZookeeperLeaderElector設定兩個回撥方法,onControllerFailover和onControllerResignation
onControllerFailover在選舉leader成功後會回撥,在onControllerFailover中進行leader依賴的模組初始化,包括向zookeeper上/controller_epoch節點上記錄leader的選舉次數,這個epoch數值在處理分散式腦裂的場景中很有用。
而onControllerResignation在當前broker不再成為leader(即當前leader退位後)時會回撥。
KafkaController在啟動後註冊zookeeper的會話超時監聽器,並嘗試選舉leader。

class KafkaController {
  private val controllerElector = new ZookeeperLeaderElector(controllerContext, ZkUtils.ControllerPath, onControllerFailover, onControllerResignation, config.brokerId)

  def startup() = {
    inLock(controllerContext.controllerLock) {
      info("Controller starting up")
      //註冊Session過期監聽器
      registerSessionExpirationListener()
      isRunning = true
      //每次啟動時,嘗試選舉leader
      controllerElector.startup
      info("Controller startup complete")
    }
  }

  private def registerSessionExpirationListener() = {
    zkUtils.zkClient.subscribeStateChanges(new SessionExpirationListener())
  }
}

SessionExpirationListener

當broker和zookeeper重新建立連線後,SessionExpirationListener中的handleNewSession會被呼叫,這時先關閉之前的leader相關模組,然後重新嘗試選舉成為leader。

  class SessionExpirationListener() extends IZkStateListener with Logging {
    this.logIdent = "[SessionExpirationListener on " + config.brokerId + "], "
    @throws(classOf[Exception])
    def handleStateChanged(state: KeeperState) {
      // do nothing, since zkclient will do reconnect for us.
    }

    /**
     * Called after the zookeeper session has expired and a new session has been created. You would have to re-create
     * any ephemeral nodes here.
     *
     * @throws Exception
     *             On any error.
     */
    @throws(classOf[Exception])
    def handleNewSession() {
      info("ZK expired; shut down all controller components and try to re-elect")
      //和Zookeeper重新建立連線後,此方法會被呼叫
      inLock(controllerContext.controllerLock) {
        //先登出一些已經註冊的監聽器,關閉資源
        onControllerResignation()
        //重新嘗試選舉成controller
        controllerElector.elect
      }
    }

    override def handleSessionEstablishmentError(error: Throwable): Unit = {
      //no-op handleSessionEstablishmentError in KafkaHealthCheck should handle this error in its handleSessionEstablishmentError
    }
  }

ZookeeperLeaderElector

ZookeeperLeaderElector類實現leader選舉的功能,但是它並不負責處理broker和zookeeper的會話超時(連線超時)的情況,而是認為呼叫者應該在會話恢復(連線重新建立)時進行重新選舉。

class ZookeeperLeaderElector(controllerContext: ControllerContext,
                             electionPath: String,
                             onBecomingLeader: () => Unit,
                             onResigningAsLeader: () => Unit,
                             brokerId: Int)
  extends LeaderElector with Logging {
  var leaderId = -1
  // create the election path in ZK, if one does not exist
  val index = electionPath.lastIndexOf("/")
  if (index > 0)
    controllerContext.zkUtils.makeSurePersistentPathExists(electionPath.substring(0, index))
  val leaderChangeListener = new LeaderChangeListener

  def startup {
    inLock(controllerContext.controllerLock) {
      // 新增/controller節點的IZkDataListener監聽器
      controllerContext.zkUtils.zkClient.subscribeDataChanges(electionPath, leaderChangeListener)
      // 選舉
      elect
    }
  }
}

ZookeeperLeaderElector的startup方法中呼叫elect方法選舉leader

有下面幾種情況會呼叫elect方法
1. broker啟動時,第一次呼叫
2. 上一次建立節點成功,但是可能在等Zookeeper響應的時候,連線中斷,resign方法中刪除/controller節點後,觸發了leaderChangeListener的handleDataDeleted
3. 上一次建立節點未成功,但是可能在等Zookeeper響應的時候,連線中斷,而再次進入elect方法時,已有別的broker建立controller節點成功,成為了leader
4. 上一次建立節點成功,但是onBecomingLeader丟擲了異常,而再次進入
所以elect方法中先獲取/controller節點資訊,判斷是否已經存在,然後再嘗試選舉leader

  private def getControllerID(): Int = {
    controllerContext.zkUtils.readDataMaybeNull(electionPath)._1 match {
       case Some(controller) => KafkaController.parseControllerId(controller)
       case None => -1
    }
  }

  def elect: Boolean = {
    val timestamp = SystemTime.milliseconds.toString
    val electString = Json.encode(Map("version" -> 1, "brokerid" -> brokerId, "timestamp" -> timestamp))

    //先嚐試獲取/controller節點資訊
   leaderId = getControllerID 
    /* 
     * We can get here during the initial startup and the handleDeleted ZK callback. Because of the potential race condition, 
     * it's possible that the controller has already been elected when we get here. This check will prevent the following 
     * createEphemeralPath method from getting into an infinite loop if this broker is already the controller.
     */
    // 有下面幾種情況會呼叫elect方法
    // 1.broker啟動時,第一次呼叫
    // 2.上一次建立節點成功,但是可能在等Zookeeper響應的時候,連線中斷,resign方法中刪除/controller節點後,觸發了leaderChangeListener的handleDataDeleted
    // 3.上一次建立節點未成功,但是可能在等Zookeeper響應的時候,連線中斷,而再次進入elect方法時,已有別的broker建立controller節點成功,成為了leader
    // 4.上一次建立節點成功,但是onBecomingLeader丟擲了異常,而再次進入
    // 所以先獲取節點資訊,判斷是否已經存在
    if(leaderId != -1) {
       debug("Broker %d has been elected as leader, so stopping the election process.".format(leaderId))
       return amILeader
    }

    try {
      val zkCheckedEphemeral = new ZKCheckedEphemeral(electionPath,
                                                      electString,
                                                      controllerContext.zkUtils.zkConnection.getZookeeper,
                                                      JaasUtils.isZkSecurityEnabled())
      //建立/controller節點,並寫入controller資訊,brokerid, version, timestamp
      zkCheckedEphemeral.create()
      info(brokerId + " successfully elected as leader")
      leaderId = brokerId
      //寫入成功,成為Leader,回撥
      onBecomingLeader()
    } catch {
      case e: ZkNodeExistsException =>
        // If someone else has written the path, then
        leaderId = getControllerID 
        //寫入失敗,節點已經存在,說明已有其他broker建立成功
        if (leaderId != -1)
          debug("Broker %d was elected as leader instead of broker %d".format(leaderId, brokerId))
        else
          warn("A leader has been elected but just resigned, this will result in another round of election")

      case e2: Throwable =>
        error("Error while electing or becoming leader on broker %d".format(brokerId), e2)
        //這裡有可能是建立節點時,和zookeeper斷開了連線,也有可能是onBecomingLeader的回撥方法裡出了異常
        //onBecomingLeader方法裡,一般是初始化leader的相關的模組,如果初始化失敗,則呼叫resign方法先刪除/controller節點
        //當/controller節點被刪除時,會觸發leaderChangeListener的handleDataDeleted,會重新嘗試選舉成Leader,更重要的是也讓其他broker有機會成為leader,避免某一個broker的onBecomingLeader一直失敗造成整個叢集一直處於“群龍無首”的尷尬局面
        resign()
    }
    amILeader
  }

  def close = {
    leaderId = -1
  }

  def amILeader : Boolean = leaderId == brokerId

  def resign() = {
    leaderId = -1
    // 刪除/controller節點
    controllerContext.zkUtils.deletePath(electionPath)
  }

在建立/controller節點時,若收到的異常是ZkNodeExistsException,則說明其他broker已經成為了leader。
而若是onBecomingLeader的回撥方法裡出了異常,一般是初始化leader的相關的模組出了問題,如果初始化失敗,則呼叫resign方法先刪除/controller節點。
當/controller節點被刪除時,會觸發leaderChangeListener的handleDataDeleted,會重新嘗試選舉成Leader。
更重要的是也讓其他broker有機會成為leader,避免某一個broker的onBecomingLeader一直失敗造成整個叢集一直處於“群龍無首”的尷尬局面。

LeaderChangeListener

在startup方法中,註冊了/controller節點的IZkDataListener監聽器即LeaderChangeListener。
若節點資料有變化時,則有可能別的broker成為了leader,則呼叫onResigningAsLeader方法。
若節點被刪除,則是leader已經出了故障下線了,如果當前broker之前是leader,則呼叫onResigningAsLeader方法,然後重新嘗試選舉成為leader。

  class LeaderChangeListener extends IZkDataListener with Logging {
    /**
     * Called when the leader information stored in zookeeper has changed. Record the new leader in memory
     * @throws Exception On any error.
     */
    @throws(classOf[Exception])
    def handleDataChange(dataPath: String, data: Object) {
      inLock(controllerContext.controllerLock) {
        val amILeaderBeforeDataChange = amILeader
        leaderId = KafkaController.parseControllerId(data.toString)
        info("New leader is %d".format(leaderId))
        // The old leader needs to resign leadership if it is no longer the leader
        if (amILeaderBeforeDataChange && !amILeader)
          //如果之前是Leader,而現在不是Leader
          onResigningAsLeader()
      }
    }

    /**
     * Called when the leader information stored in zookeeper has been delete. Try to elect as the leader
     * @throws Exception
     *             On any error.
     */
    @throws(classOf[Exception])
    def handleDataDeleted(dataPath: String) {
      inLock(controllerContext.controllerLock) {
        debug("%s leader change listener fired for path %s to handle data deleted: trying to elect as a leader"
          .format(brokerId, dataPath))
        if(amILeader)
          //如果之前是Leader
          onResigningAsLeader()
        //重新嘗試選舉成Leader
        elect
      }
    }
  }

onBecomingLeader方法對應KafkaController裡的onControllerFailover方法,當成為新的leader後,要初始化leader所依賴的功能模組
onResigningAsLeader方法對應KafkaController裡的onControllerResignation方法,當leader退位後,要關閉leader所依賴的功能模組

Leader選舉流程圖

整個leader選舉的過程的流程圖為
Kafka Leader選舉流程圖

相關推薦

深入學習KafkaLeader Election

本文所講的Leader是指叢集中的Controller,而不是各個Partition的Leader。 為什麼要有Leader? 在Kafka早期版本,對於分割槽和副本的狀態的管理依賴於zookeeper的Watcher和佇列:每一個broker都會在

深入學習KafkaTopic的刪除過程分析

要刪除Topic,需要執行下面命令: .\kafka-topics.bat --delete --zookeeper localhost:2181 --topic test 這裡假設zookeeper地址為localhost,要刪除的topic是te

深入學習Kafka叢集中Controller和Broker之間通訊機制分析

Kafka叢集中,首先會選舉出一個broker作為controller,然後該controller負責跟其他broker進行協調topic建立,partition主副本選舉,topic刪除等事務。 下面我們來分析controller和其他broker的通訊

深入學習KafkaPartitionLeaderSelector原始碼分析

PartitionLeaderSelector主要是為分割槽選舉出leader broker,該trait只定義了一個方法selectLeader,接收一個TopicAndPartition物件和一個LeaderAndIsr物件。 TopicAndPart

Zookeeper 學習筆記之 Leader Election

通知 客戶 就會 lec 搶占式 類型 二次 lead per ZooKeeper四種節點類型: Persist Persist_Sequential Ephemeral Ephemeral_Sequential 在節點上可註冊的Watch,客戶端先得到通知再得到數據,

caffe原始碼深入學習6超級詳細的im2col繪圖解析,分析caffe卷積操作的底層實現

在先前的兩篇部落格中,筆者詳細解析了caffe卷積層的定義與實現,可是在conv_layer.cpp與base_conv_layer.cpp中,卷積操作的實現仍然被隱藏,通過im2col_cpu函式和caffe_cpu_gemm函式(後者實現矩陣乘法)實現,在此篇部落格中,筆者旨在向大家展示,caf

影象顯示深入學習視窗機制分析

上一篇文章影象顯示深入學習一:Activity啟動過程中介紹了啟動一個Activity在整個Android系統中執行的流程,其中可以看到Window的建立,這篇文章就整理一下Window機制的實現過程吧。 吐個槽,今年大部分時間在公司一直在做SDK專案,UI方面的工作涉及的比較少,現在重

kafkaleader選舉

Kafka叢集Leader選舉原理 我們知道Zookeeper叢集中也有選舉機制,是通過Paxos演算法,通過不同節點向其他節點發送資訊來投票選舉出leader,但是Kafka的leader的選舉就沒有這麼複雜了。  Kafka的Leader選舉是通過在zookeeper上

深入學習理論VC維(VC dimensions)

上面這個式子就是模型的評估與選擇這篇文章中提到的泛化誤差上界。(喜極而泣,大費周章,終於把這個坑給填了)vc維在這裡面起到了一個懲罰項的作用,它所表徵的是模型的複雜程度,當模型越複雜的時候,vc維越大,泛化能力就越差;當模型越簡單的時候,vc維越小,經驗損失函式和期望損失函式越接近,泛化能力越好。

caffe原始碼深入學習5超級詳細的caffe卷積層程式碼解析

   在本篇部落格中,筆者為大家解析一下caffe卷積層的原始碼,在開篇提醒各位讀者朋友,由於caffe卷積層實現較為複雜,引數相對較多,因此,讀者朋友們如果發現筆者的部落格中的疏漏或者錯誤之處,請大家不吝賜教,筆者在此表示衷心的感謝。    在解析程式碼前,首先要強調一下

Hibernate深入學習(三)繼承與多型查詢,joined-subclass與union-subclass

在上一篇中,我們對hibernate中3種繼承有了初步瞭解,並對subclass進行了測試,以及知道了它的諸多缺點,這些缺點導致subclass在開發中並不常用,接下來我們看看剩下的兩種繼承方式:joined-subclass與union-subclass 本

【Spark深入學習 -15】Spark Streaming前奏-Kafka初體驗

rod htm 新的 callback tails 包括 -c 舉例 清理 ----本節內容------- 1.Kafka基礎概念 1.1 出世背景 1.2 基本原理 1.2.1.前置知識 1.2.2.架構和原理 1.2.

深入Java集合學習系列HashSet的實現原理

是否 abstract arc html 源代碼 cat param body static 0.參考文獻 深入Java集合學習系列:HashSet的實現原理 1.HashSet概述:   HashSet實現Set接口,由哈希表(實際上是一個HashMap實例)支持。它

[深入學習C#]C#實現多線程的方式Task——任務

ren avr 利用 run 如何 創建 其中 continue rep 簡介   .NET 4包含新名稱空間System.Threading.Tasks,它 包含的類抽象出了線程功能。 在後臺使用ThreadPool。 任務表示應完成的某個單元的工作。 這個單元的工作可以

kafka學習筆記知識點整理

一個 eight true med 分組 pos 間接 fig ges 一、為什麽需要消息系統 1.解耦:  允許你獨立的擴展或修改兩邊的處理過程,只要確保它們遵守同樣的接口約束。 2.冗余:   消息隊列把數據進行持久化直到它們已經被完全處理,通過這一方式規避了數據

JVM基礎深入學習JVM堆與JVM棧(轉)

面向 對象的引用 進入 信息 分離 字節 address 出現 運行 JVM棧解決程序的運行問題,即程序如何執行,或者說如何處理數據;JVM堆解決的是數據存儲的問題,即數據怎麽放、放在哪兒,另外JVM堆中存的是對象。JVM棧中存的是基本數據類型和JVM堆中對象的引用。 JV

深入學習Redis(2)持久化

kvm 服務器 let vm虛擬機 文件同步 8K bus lena 機制 前言 在上一篇文章中,介紹了Redis的內存模型,從這篇文章開始,將依次介紹Redis高可用相關的知識——持久化、復制(及讀寫分離)、哨兵、以及集群。 本文將先說明上述幾種技術分別解決了Redis高

如何運用zookepper進行kafka Leader Election?

註冊 bsp roo 可能 zookepper 監聽 election 如果 etc 主要有兩種方法: (一)搶註Leader節點-----非公平模式 (二)先到先得,後者監聽前者-----公平模式 (一)搶註Leader節點-----非公平模式 1.創建Leader父節點

深入學習Redis(5)叢集

前言 在前面的文章中,已經介紹了Redis的幾種高可用技術:持久化、主從複製和哨兵,但這些方案仍有不足,其中最主要的問題是儲存能力受單機限制,以及無法實現寫操作的負載均衡。 Redis叢集解決了上述問題,實現了較為完善的高可用方案。本文將詳細介紹叢集,主要內容包括:叢集的作用;叢集的搭建方法

Docker深入學習微服務+Docker

  最近在學docker、k8s什麼的,看得腦子有點亂。從來沒弄過在linux上搭建一個分散式的環境,所以對這些不太瞭解,還是從最簡單的地方剖析吧。 Docker學習傳送:http://www.ityouknow.com/docker/2018/03/07/docker-introduction.html