1. 程式人生 > 其它 >Kafka原始碼分析11:PartitionStateMachine分割槽狀態機(圖解+秒懂+史上最全)

Kafka原始碼分析11:PartitionStateMachine分割槽狀態機(圖解+秒懂+史上最全)

文章很長,建議收藏起來,慢慢讀! Java 高併發 發燒友社群:瘋狂創客圈 奉上以下珍貴的學習資源:


推薦:入大廠 、做架構、大力提升Java 內功 的 精彩博文

入大廠 、做架構、大力提升Java 內功 必備的精彩博文 2021 秋招漲薪1W + 必備的精彩博文
1:Redis 分散式鎖 (圖解-秒懂-史上最全) 2:Zookeeper 分散式鎖 (圖解-秒懂-史上最全)
3: Redis與MySQL雙寫一致性如何保證? (面試必備) 4: 面試必備:秒殺超賣 解決方案 (史上最全)
5:面試必備之:Reactor模式 6: 10分鐘看懂, Java NIO 底層原理
7:
TCP/IP(圖解+秒懂+史上最全)
8:Feign原理 (圖解)
9:DNS圖解(秒懂 + 史上最全 + 高薪必備) 10:CDN圖解(秒懂 + 史上最全 + 高薪必備)
11: 分散式事務( 圖解 + 史上最全 + 吐血推薦 ) 12:seata AT模式實戰(圖解+秒懂+史上最全)
13:seata 原始碼解讀(圖解+秒懂+史上最全) 14:seata TCC模式實戰(圖解+秒懂+史上最全)

Java 面試題 30個專題 , 史上最全 , 面試必刷 阿里、京東、美團... 隨意挑、橫著走!!!
1: JVM面試題(史上最強、持續更新、吐血推薦) 2:Java基礎面試題(史上最全、持續更新、吐血推薦
3:架構設計面試題 (史上最全、持續更新、吐血推薦) 4:設計模式面試題 (史上最全、持續更新、吐血推薦)
17、分散式事務面試題 (史上最全、持續更新、吐血推薦) 一致性協議 (史上最全)
29、多執行緒面試題(史上最全) 30、HR面經,過五關斬六將後,小心陰溝翻船!
9.網路協議面試題(史上最全、持續更新、吐血推薦) 更多專題, 請參見【 瘋狂創客圈 高併發 總目錄

SpringCloud 精彩博文
nacos 實戰(史上最全) sentinel (史上最全+入門教程)
SpringCloud gateway (史上最全) 更多專題, 請參見【 瘋狂創客圈 高併發 總目錄

背景:

下一個視訊版本,從架構師視角,尼恩為大家打造史上最強kafka原始碼視訊

並且,進一步,帶大家實現一個超高質量的專案實操:10WQPS超高併發訊息佇列架構與實操

整體的次序:

  • 首先,開始Kafka原始碼分析
  • 然後,10WQPS超高併發訊息佇列架構與實操

此文為Kafka原始碼分析之11.

本系列部落格的具體內容,請參見 Java 高併發 發燒友社群:瘋狂創客圈

Kafka原始碼分析11:PartitionStateMachine分割槽狀態機

kafka分割槽機制

分割槽機制是kafka實現高吞吐的祕密武器,但這個武器用得不好的話也容易出問題,今天主要就來介紹分割槽的機制以及相關的部分配置。

首先,從資料組織形式來說,kafka有三層形式,kafka有多個主題,每個主題有多個分割槽,每個分割槽又有多條訊息。

而每個分割槽可以分佈到不同的機器上,這樣一來,從服務端來說,分割槽可以實現高伸縮性,以及負載均衡,動態調節的能力。

下圖是一個3個分割槽的topic例子,並且每個分割槽有3個副本:

我們可以通過replication-factor指定建立topic時候所建立的分割槽數。

bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test

比如這裡就是建立了1個分割槽,的主題。

值得注意的是,還有一種建立主題的方法,是使用zookeeper引數的,那種是比較舊的建立方法,這裡是使用bootstrap引數的。

PartitionStateMachine:分割槽狀態機

PartitionStateMachine 負責管理 Kafka 分割槽狀態的轉換,和 ReplicaStateMachine 是一 脈相承的。

二者的對比

  • ReplicaStateMachine:

負責定義 Kafka 副本狀態、合法的狀態轉換,以及管理狀態之間的轉換。

  • PartitionStateMachine:

負責定義 Kafka 分割槽狀態、合法的狀態轉換,以及管理狀態之間的轉換。

分割槽狀態機相關的類設計

從程式碼結構、實現功能和設計原理來看,二者都極為相似。

  • PartitionStateMachine:分割槽狀態機抽象類

它定義了諸如 startup、shutdown 這 樣的公共方法,定義了處理分割槽狀態轉換入口方法 handleStateChanges ,另外,定義了一個私有的 doHandleStateChanges方法,實現分割槽狀態轉換的操作。

  • PartitionState 介面及其實現物件:
    定義 4 類分割槽狀態,分別是 NewPartition、 OnlinePartition、OfflinePartition 和 NonExistentPartition。除此之外,還定義了它 們之間的依賴關係。

  • PartitionLeaderElectionStrategy 介面及其實現物件:

定義 4 類分割槽 Leader 選舉策 略,對應到 Leader 選舉的 4 種場景。

  • PartitionLeaderElectionAlgorithms:分割槽 Leader 選舉的演算法實現。
    4 類分割槽 Leader 選舉策 略的實現程式碼,PartitionLeaderElectionAlgorithms 就提供了 這 4 類選舉策略的實現程式碼。

分割槽狀態機的啟用

每個 Broker 啟動時,都會建立對應的分割槽狀態機和副本狀態機例項,但只有 Controller 所在的 Broker 才會啟動它們。

如果 Controller 變更到其他 Broker,老 Controller 所在的 Broker 要呼叫這些狀態機的 shutdown 方法關閉它們,新 Controller 所在的 Broker 呼叫狀態機的 startup 方法啟動它們。

分割槽狀態

PartitionState 介面

PartitionState 介面及其實現類,用來定義分割槽狀態。

sealed trait PartitionState {
  def state: Byte // 狀態序號
  def validPreviousStates: Set[PartitionState] // 合法前置狀態集合
}

和 ReplicaState 類似,PartitionState 定義了分割槽的狀態空間以及依賴規則。

OnlinePartition 狀態

下面以 OnlinePartition 狀態為例,說明下程式碼是如何實現流轉的:


case object OnlinePartition extends PartitionState {
  val state: Byte = 1
  val validPreviousStates: Set[PartitionState] = Set(NewPartition, OnlinePartition, OfflinePartition)
}

如程式碼所示,每個 PartitionState 都定義了名為 validPreviousStates 的集合,也就是每個狀態對應的合法前置狀態集。

對於 OnlinePartition 而言,它的合法前置狀態集包括 NewPartition、OnlinePartition 和 OfflinePartition。

在 Kafka 中,從合法狀態集以外的狀態向目標狀態進行轉換,將被視為非法操作。

Kafka 的 4 類分割槽狀態

Kafka 為分割槽定義了 4 類狀態,分別是 NewPartition、OnlinePartition、OfflinePartition 和 NonExistentPartition。

1. NewPartition:

分割槽被建立後被設定成這個狀態,表明它是一個全新的分割槽物件。

處於 這個狀態的分割槽,被 Kafka 認為是“未初始化”,因此,不能選舉 Leader。

case object NewPartition extends PartitionState {
  val state: Byte = 0
  val validPreviousStates: Set[PartitionState] = Set(NonExistentPartition)
}

2. OnlinePartition:

分割槽正式提供服務時所處的狀態。


case object OnlinePartition extends PartitionState {
  val state: Byte = 1
  val validPreviousStates: Set[PartitionState] = Set(NewPartition, OnlinePartition, OfflinePartition)
}

3. OfflinePartition:

分割槽下線後所處的狀態。


case object OfflinePartition extends PartitionState {
  val state: Byte = 2
  val validPreviousStates: Set[PartitionState] = Set(NewPartition, OnlinePartition, OfflinePartition)
}

4. NonExistentPartition:

分割槽被刪除,並且從分割槽狀態機移除後所處的狀態。


case object NonExistentPartition extends PartitionState {
  val state: Byte = 3
  val validPreviousStates: Set[PartitionState] = Set(OfflinePartition)
}

分割槽狀態之間的轉換關係

處理分割槽狀態轉換的方法

handleStateChanges

handleStateChanges 把 partitions 的狀態設定為 targetState。

handleStateChanges 呼叫doHandleStateChanges方法執行真正的狀態變更邏輯,在這個方法中,可能需要用 為 partitions 選舉新的 Leader,最終將 partitions 的 Leader 資訊返回。

def handleStateChanges(partitions: Seq[TopicPartition], targetState: PartitionState,
                     partitionLeaderElectionStrategyOpt: Option[PartitionLeaderElectionStrategy] = None): Unit = {
if (partitions.nonEmpty) {
  try {
    // raise error if the previous batch is not empty
    //為了提高KafkaController Leader和叢集其他broker的通訊效率,實現批量傳送請求的功能
    // 檢查上一批請求KafkaController請求,如果沒有傳送完成,就報錯
    controllerBrokerRequestBatch.newBatch()
    // 呼叫doHandleStateChanges方法執行真正的狀態變更邏輯
    doHandleStateChanges(partitions, targetState, partitionLeaderElectionStrategyOpt)
    // Controller給相關Broker傳送請求通知狀態變化
    controllerBrokerRequestBatch.sendRequestsToBrokers(controllerContext.epoch)
  } catch {
    case e: Throwable => error(s"Error while moving some partitions to $targetState state", e)
  }
}
}

三個引數說明:

  • partitions 是待執行狀態變更的目標分割槽列表
  • targetState 是目標狀態
  • partitionLeaderElectionStrategyOpt 是一個可選項,如果傳入了,就表示要執行 Leader 選舉。

doHandleStateChanges

doHandleStateChanges方法執行真正的狀態變更邏輯。

在這個方法中,可能需要用 為 partitions 選舉新的 Leader,最終將 partitions 的 Leader 資訊返回

private def doHandleStateChanges(partitions: Seq[TopicPartition], targetState: PartitionState,
                        partitionLeaderElectionStrategyOpt: Option[PartitionLeaderElectionStrategy]): Unit = {
 val stateChangeLog = stateChangeLogger.withControllerEpoch(controllerContext.epoch)
 // 檢查分割槽的狀態,如果沒有,分割槽的狀態設定為NonExistentPartition
 partitions.foreach(partition => partitionState.getOrElseUpdate(partition, NonExistentPartition))
 // 找出要執行非法狀態轉換的分割槽,記錄錯誤日誌
 val (validPartitions, invalidPartitions) = partitions.partition(partition => isValidTransition(partition, targetState))
 invalidPartitions.foreach(partition => logInvalidTransition(partition, targetState))
 // 根據targetState進入到不同的case分支
 targetState match {
   case NewPartition =>
   ....  

   //4大分支
    
 }  

首先,這個方法會做狀態檢查工作。

檢查分割槽的狀態,如果沒有,分割槽的狀態設定為NonExistentPartition

接著,檢查哪些分割槽執行的狀態轉換不合法,如果當前的狀態不屬於targetState的前置依賴,則為不合法。然後為這些分割槽記錄相應的錯誤日誌。

然後,就是重點和核心。

根據targetState進入到 case 分支。由於分割槽狀態只有 4 個,其中,只有 OnlinePartition 這一路的分支邏輯相對複雜,其他 3 路僅僅是將分割槽狀態設定成目標狀態而已。

先看簡單的3路。

targetState match {
 case NewPartition =>
   validPartitions.foreach { partition =>
     stateChangeLog.trace(s"Changed partition $partition state from ${partitionState(partition)} to $targetState with " +
       s"assigned replicas ${controllerContext.partitionReplicaAssignment(partition).mkString(",")}")
     partitionState.put(partition, NewPartition)
   }
 case OnlinePartition =>
   ....
   }
 case OfflinePartition =>
   validPartitions.foreach { partition =>
     stateChangeLog.trace(s"Changed partition $partition state from ${partitionState(partition)} to $targetState")
     partitionState.put(partition, OfflinePartition)
   }
 case NonExistentPartition =>
   validPartitions.foreach { partition =>
     stateChangeLog.trace(s"Changed partition $partition state from ${partitionState(partition)} to $targetState")
     partitionState.put(partition, NonExistentPartition)
   }
}

接下來,是負責的那一路,目標狀態是 OnlinePartition 的分支。

圖解:目標狀態是 OnlinePartition 的分支

流程圖如下:

程式碼如下:

case OnlinePartition =>
  // 獲取未初始化的NewPartition狀態下的所有分割槽
  val uninitializedPartitions = validPartitions.filter(partition => partitionState(partition) == NewPartition)
  // 獲取具備Leader選舉資格的分割槽列表
 // 只能為OnlinePartition和OfflinePartition狀態的分割槽選舉Leader
  val partitionsToElectLeader = validPartitions.filter(partition => partitionState(partition) == OfflinePartition || partitionState(partition) == OnlinePartition)

  // 處理所有的未初始化的NewPartition狀態下的所有分割槽
  if (uninitializedPartitions.nonEmpty) {
    // 初始化NewPartition狀態分割槽,在ZooKeeper中寫入Leader和ISR資料
    // Initialize leader and isr partition state in zookeeper
    val successfulInitializations = initializeLeaderAndIsrForPartitions(uninitializedPartitions)
    successfulInitializations.foreach { partition =>
      stateChangeLog.trace(s"Changed partition $partition from ${partitionState(partition)} to $targetState with state " +
        s"${controllerContext.partitionLeadershipInfo(partition).leaderAndIsr}")
      partitionState.put(partition, OnlinePartition)
    }
  }

  // 處理所有的獲取具備Leader選舉資格的分割槽列表
  if (partitionsToElectLeader.nonEmpty) {
    // 為具備Leader選舉資格的分割槽推選Leader
    val successfulElections = electLeaderForPartitions(partitionsToElectLeader, partitionLeaderElectionStrategyOpt.get)
    // 將成功選舉Leader後的分割槽設定成OnlinePartition狀態
    successfulElections.foreach { partition =>
      stateChangeLog.trace(s"Changed partition $partition from ${partitionState(partition)} to $targetState with state " +
        s"${controllerContext.partitionLeadershipInfo(partition).leaderAndIsr}")
      partitionState.put(partition, OnlinePartition)
    }
  }

圖解:initializeLeaderAndIsrForPartitions初始化分割槽

處理所有的未初始化的NewPartition狀態下的所有分割槽,需要在 ZooKeeper 中,建立並寫入分割槽的znode節點資料。

znode節點的位置是/brokers/topics/partitions/,每個節點都要包含分割槽的 Leader 和 ISR 等資料。

ZK中partition狀態資訊

/brokers/topics/[topic]/partitions/[0...N] 其中[0..N]表示partition索引號

/brokers/topics/[topic]/partitions/[partitionId]/state

Schema:

{
"controller_epoch": 表示kafka叢集中的中央控制器選舉次數,
"leader": 表示該partition選舉leader的brokerId,
"version": 版本編號預設為1,
"leader_epoch": 該partition leader選舉次數,
"isr": [同步副本組brokerId列表]
}

Example:

{
"controller_epoch": 1,
"leader": 2,
"version": 1,
"leader_epoch": 0,
"isr": [2, 1]
}

分割槽的Leader 和 ISR 的確定規則是:選擇存活副本列表的第一個副本作為 Leader;選擇存活副本列表作為 ISR。

具體的程式碼如下:

private def initializeLeaderAndIsrForPartitions(partitions: Seq[TopicPartition]): Seq[TopicPartition] = {
   val successfulInitializations = mutable.Buffer.empty[TopicPartition]

   // 獲取每個分割槽的副本列表
   val replicasPerPartition = partitions.map(partition => partition -> controllerContext.partitionReplicaAssignment(partition))

   // 獲取每個分割槽的所有存活副本
   val liveReplicasPerPartition = replicasPerPartition.map { case (partition, replicas) =>
       val liveReplicasForPartition = replicas.filter(replica => controllerContext.isReplicaOnline(replica, partition))
       partition -> liveReplicasForPartition
   }

   // 按照有無存活副本對分割槽進行分組:有活副本的分割槽、無活副本的分割槽
   val (partitionsWithoutLiveReplicas, partitionsWithLiveReplicas) = liveReplicasPerPartition.partition { case (_, liveReplicas) => liveReplicas.isEmpty }

   partitionsWithoutLiveReplicas.foreach { case (partition, replicas) =>
     val failMsg = s"Controller $controllerId epoch ${controllerContext.epoch} encountered error during state change of " +
       s"partition $partition from New to Online, assigned replicas are " +
       s"[${replicas.mkString(",")}], live brokers are [${controllerContext.liveBrokerIds}]. No assigned " +
       "replica is alive."
     logFailedStateChange(partition, NewPartition, OnlinePartition, new StateChangeFailedException(failMsg))
   }

   // 為"有活副本的分割槽"確定Leader和ISR
   // Leader確認依據:存活副本列表的首個副本被認定為Leader
   // ISR確認依據:存活副本列表被認定為ISR
   val leaderIsrAndControllerEpochs = partitionsWithLiveReplicas.map { case (partition, liveReplicas) =>
     val leaderAndIsr = LeaderAndIsr(liveReplicas.head, liveReplicas.toList)
     val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(leaderAndIsr, controllerContext.epoch)
     partition -> leaderIsrAndControllerEpoch
   }.toMap
   val createResponses = try {
     zkClient.createTopicPartitionStatesRaw(leaderIsrAndControllerEpochs)
   } catch {
     case e: Exception =>
       partitionsWithLiveReplicas.foreach { case (partition,_) => logFailedStateChange(partition, partitionState(partition), NewPartition, e) }
       Seq.empty
   }
   createResponses.foreach { createResponse =>
     val code = createResponse.resultCode
     val partition = createResponse.ctx.get.asInstanceOf[TopicPartition]
     val leaderIsrAndControllerEpoch = leaderIsrAndControllerEpochs(partition)
     if (code == Code.OK) {
       controllerContext.partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch)
       controllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(leaderIsrAndControllerEpoch.leaderAndIsr.isr,
         partition, leaderIsrAndControllerEpoch, controllerContext.partitionReplicaAssignment(partition), isNew = true)
       successfulInitializations += partition
     } else {
       logFailedStateChange(partition, NewPartition, OnlinePartition, code)
     }
   }
   successfulInitializations
 }

分割槽 Leader 選舉的場景及方法

分割槽online狀態和分割槽leader選舉有關,這裡說說 分割槽 Leader 選舉的場景及方法。

在kafka中,每個主題可以有多個分割槽,每個分割槽又可以有多個副本。這多個副本中,只有一個是leader,而其他的都是follower副本。

注意:每個分割槽都必須選舉出 Leader副本 才能正常提供服務,沒有leader副本,分割槽無法提供服務。

總之,在kafka的叢集中,會存在著多個主題topic,在每個topic中,又被劃分為多個partition,為了防止資料不丟失,每個partition又有多個副本。

kafka主要的三種副本角色

kafka主要的三種副本角色:

  • 首領副本(leader):也就是leader主副本,每一個分割槽都有一個首領副本,為了保證資料一致性,全部的生產者與消費者的請求都會通過該副原本處理。
  • 跟隨者副本(follower):除了首領副本外的其餘全部副本都是跟隨者副本,跟隨者副本不處理來自客戶端的任何請求,只負責從首領副本同步資料,保證與首領保持一致。若是首領副本發生崩潰,就會從這其中選舉出一個leader。
  • 首選首領副本:建立分割槽時指定的首選首領。若是不指定,則為分割槽的第一個副本。

follower須要從leader中同步資料,可是因為網路或者其餘緣由,致使資料阻塞,出現不一致的狀況,為了不這種狀況,follower會向leader傳送請求資訊,這些請求資訊中包含了follower須要資料的偏移量offset,並且這些offset是有序的。

若是有follower向leader傳送了請求1,接著傳送請求2,請求3,那麼再發送請求4,這時就意味著follower已經同步了前三條資料,不然不會發送請求4。leader經過跟蹤 每個follower的offset來判斷它們的複製進度。

預設的,若是follower與leader之間超過10s內沒有傳送請求,或者說沒有收到請求資料,此時該follower就會被認為“不一樣步副本”, 而持續請求的副本就是“同步副本”。

當leader發生故障時,只有“同步副本”才能夠被選舉為leader。其中的請求超時時間能夠經過引數replica.lag.time.max.ms引數來配置。

負載均衡的最佳目標:

每一個分割槽的leader能夠分佈到不一樣的broker中,儘量的達到最最佳的負載均衡效果。

因此會有一個首選首領,若是咱們設定引數auto.leader.rebalance.enable為true,那麼它會檢查首選首領是不是真正的首領,若是不是,則會觸發選舉,讓首選首領成為首領。

囉嗦:Replica副本的幾個術語

1、assignments

這是分割槽的副本列表。
該列表有個專屬的名稱,叫 Assigned Replicas,簡稱 AR。當我們 建立主題之後,使用 kafka-topics 指令碼檢視主題時,應該可以看到名為 Replicas 的一列數 據。這列資料顯示的,就是主題下每個分割槽的 AR。assignments 引數型別是 Seq[Int]。這 揭示了一個重要的事實:AR 是有順序的,而且不一定和 ISR 的順序相同!

2、isr

ISR 在 Kafka 中很有名氣,它儲存了分割槽所有與 Leader 副本保持同步的副本列表。
注意, Leader 副本自己也在 ISR 中。另外,作為 Seq[Int]型別的變數,isr 自身也是有順序的。

3、liveReplicas

從名字可以推斷出,它儲存了該分割槽下所有處於存活狀態的副本。
怎麼判斷副本是否存活 呢?可以根據 Controller 元資料快取中的資料來判定。簡單來說,所有在執行中的 Broker 上的副本,都被認為是存活的。

4、uncleanLeaderElectionEnabled

在預設配置下,只要不是由 AdminClient 發起的 Leader 選舉,這個引數的值一般是 false,即 Kafka 不允許執行 Unclean Leader 選舉。
所謂的 Unclean Leader 選舉,是指 在 ISR 列表為空的情況下,Kafka 選擇一個非 ISR 副本作為新的 Leader。
由於存在丟失數 據的風險,目前,社群已經通過把 Broker 端引數 unclean.leader.election.enable 的預設 值設定為 false 的方式,禁止 Unclean Leader 選舉了。

程式碼首先會順序搜尋 AR 列表,並把第一個同時滿足以下兩個條件的副本作為新的 Leader 返回:

  1. 該副本是存活狀態,即副本所在的 Broker 依然在執行中;
  2. 該副本在 ISR 列表中。

分割槽 Leader 選舉有 4 類場景

// 分割槽Leader選舉策略介面
sealed trait PartitionLeaderElectionStrategy
// 離線分割槽Leader選舉策略
case object OfflinePartitionLeaderElectionStrategy extends PartitionLeaderElectionStrategy
// 分割槽副本重分配Leader選舉策略
case object ReassignPartitionLeaderElectionStrategy extends PartitionLeaderElectionStrategy
// 分割槽Preferred副本Leader選舉策略
case object PreferredReplicaPartitionLeaderElectionStrategy extends PartitionLeaderElectionStrategy
// Broker Controlled關閉時Leader選舉策略
case object ControlledShutdownPartitionLeaderElectionStrategy extends PartitionLeaderElectionStrategy

  1. OfflinePartitionLeaderElectionStrategy:

因為 Leader 副本下線而引發的分割槽 Leader 選舉。

  1. ReassignPartitionLeaderElectionStrategy:

因為執行分割槽副本重分配操作而引發的分 區 Leader 選舉。

  1. PreferredReplicaPartitionLeaderElectionStrategy:

因為執行 Preferred 副本 Leader 選舉而引發的分割槽 Leader 選舉。

  1. ControlledShutdownPartitionLeaderElectionStrategy:

因為正常關閉 Broker 而引發 的分割槽 Leader 選舉。

scala基礎

上面使用到了 case object,補充下scala基礎知識

Scala中class、object、case class、case object區別

/** class、object、case class、case object區別
  *
  * class: 類似Java中的class;
  * object: 類似java的單例物件,Scala不能定義靜態成員,用定義單例物件代之;
  * case class:被稱為樣例類,是一種特殊的類,常被用於模式匹配。
  *
  * 一、class 和 object 關係:
  * 1.單例物件不能帶引數,類可以
  * 2.當物件和類名一樣時,object被稱為伴生物件,class被稱為伴生類;
  * 3.類和伴生物件可以相互訪問其私有屬性,但是它們必須在一個原始檔當中;
  * 4.類只會被編譯,不會被執行。要執行,必須在Object中。
  *
  * 二、case class 與 class 區別:
  * 1.初始化的時候可以不用new,也可以加上,但是普通類必須加new;
  * 2.預設實現了equals、hashCode方法;
  * 3.預設是可以序列化的,實現了Serializable;
  * 4.自動從scala.Product中繼承一些函式;
  * 5.case class 建構函式引數是public的,我們可以直接訪問;
  * 6.case class預設情況下不能修改屬性值;
  * 7.case class最重要的功能,支援模式匹配,這也是定義case class的重要原因。
  *
  * 三、case class 和 case object 區別:
  * 1.類中有參和無參,當類有引數的時候,用case class ,當類沒有引數的時候那麼用case object。
  *
  * 四、當一個類被聲名為case class的時候,scala會幫助我們做下面幾件事情:
  * 1.構造器中的引數如果不被宣告為var的話,它預設的話是val型別的,但一般不推薦將構造器中的引數宣告為var
  * 2.自動建立伴生物件,同時在裡面給我們實現子apply方法,使得我們在使用的時候可以不直接顯示地new物件
  * 3.伴生物件中同樣會幫我們實現unapply方法,從而可以將case class應用於模式匹配,關於unapply方法我們在後面的“提取器”那一節會重點講解
  * 4.實現自己的toString、hashCode、copy、equals方法
  * 除此之此,case class與其它普通的scala類沒有區別
  */

case class Iteblog(name:String)

object TestScala {

  def main(args: Array[String]): Unit = {

    val iteblog = new Iteblog("iteblog_hadoop")

    val iteblog2 = Iteblog("iteblog_hadoop")

    println(iteblog == iteblog2)

    println(iteblog.hashCode)

    println(iteblog2.hashCode)
  }

}

PartitionLeaderElectionAlgorithms

針對這 4 類場景,分割槽狀態機的 PartitionLeaderElectionAlgorithms 物件定義了 4 個方 法,分別負責為每種場景選舉 Leader 副本,這 4 種方法是:

  • offlinePartitionLeaderElection;
  • reassignPartitionLeaderElection;
  • preferredReplicaPartitionLeaderElection;
  • controlledShutdownPartitionLeaderElection。

具體的模式匹配程式碼在doElectLeaderForPartitions方法中如下:

val (partitionsWithoutLeaders, partitionsWithLeaders) = partitionLeaderElectionStrategy match {
  case OfflinePartitionLeaderElectionStrategy =>
    // 離線分割槽Leader選舉策略
    leaderForOffline(validPartitionsForElection).partition { case (_, newLeaderAndIsrOpt, _) => newLeaderAndIsrOpt.isEmpty }
  case ReassignPartitionLeaderElectionStrategy =>
    // 分割槽副本重分配Leader選舉策略
    leaderForReassign(validPartitionsForElection).partition { case (_, newLeaderAndIsrOpt, _) => newLeaderAndIsrOpt.isEmpty }
  case PreferredReplicaPartitionLeaderElectionStrategy =>
    // 分割槽Preferred副本Leader選舉策略
    leaderForPreferredReplica(validPartitionsForElection).partition { case (_, newLeaderAndIsrOpt, _) => newLeaderAndIsrOpt.isEmpty }
  case ControlledShutdownPartitionLeaderElectionStrategy =>
    // Broker Controlled關閉時Leader選舉策略
    leaderForControlledShutdown(validPartitionsForElection, shuttingDownBrokers).partition { case (_, newLeaderAndIsrOpt, _) => newLeaderAndIsrOpt.isEmpty }
}

參考文獻

https://www.cnblogs.com/boanxin/p/13696136.html

https://www.cnblogs.com/listenfwind/p/12465409.html

https://www.shangmayuan.com/a/5e15939288954d3cb3ad613e.html

https://my.oschina.net/u/3070368/blog/4338739

https://www.cnblogs.com/shimingjie/p/10374451.html

https://www.bbsmax.com/A/VGzlAONYJb/