1. 程式人生 > >kafka rebalance 原始碼分析

kafka rebalance 原始碼分析

目錄

kafka hello world 

一. kafka 架構:

Partition儲存結構

ACK前需要保證有多少個備份

二  kafka partition 分配原理探究

三 rebalance 過程

四:mafka 優化

PUSH SERVER

重試

兩種

MafkaParallarWork

kafka hello world 

    kafka 安裝 & 常用API  kafka安裝  &  Kafka java api 0.10.1.1

一. kafka 架構:

Partition儲存結構

 

 

 

ACK前需要保證有多少個備份

和大部分分散式系統一樣,Kafka處理失敗需要明確定義一個Broker是否“活著”。對於Kafka而言,Kafka存活包含兩個條件,一是它必須維護與ZooKeeper的session(這個通過ZooKeeper的Heartbeat機制來實現)。二是Follower必須能夠及時將Leader的訊息複製過來,不能“落後太多”。

 

二  kafka partition 分配原理探究

kafka的官方文件提供了這樣一段描述

In fact, the only metadata retained on a per-consumer basis is the offset or position of that consumer in the log. This offset is controlled by the consumer: normally a consumer will advance its offset linearly as it reads records, but, in fact, since the position is controlled by the consumer it can consume records in any order it likes. For example a consumer can reset to an older offset to reprocess data from the past or skip ahead to the most recent record and start consuming from "now".

kafka不同於其他mq,由於他基於硬碟的儲存,所以kafka不會刪除消費過的資料,所以consumer可以從指定的offset讀取資料,針對這個特性做了以下實驗。

生產者程式碼略,消費者程式碼如下

效果如下:

這裡面有兩個問題,

offset是否跨partiton ?

partiton與consumer如何指定 ?

首先第一個問題,kafka的最小物理單位是partition,所以offset是記錄在partition中的(segment index中),那麼partition是跨機器的,kafka有沒有通過zk將partiton的節點統一管理呢,從以上的實驗來看,kafka的不同partition是有可能出現相同offset的,所以可見offset的是partiton內管理的,並沒有在manager中統一管理的。所以我們再指定offset的時候要同時指定partiton。

那麼第二個問題,既然offset是以partiton作為單位儲存的,那麼當一個consumer監聽多個partiton的時候,consumer如何知道自己該去哪個partiton拉資料呢?(因為consumer是poll方式,所以猜測是輪訓)

在kafka的ZookeeperConsumerConnector中發現這樣一段程式碼

程式碼塊

scala

private def rebalance(cluster: Cluster): Boolean = {
  val myTopicThreadIdsMap = TopicCount.constructTopicCount(
    group, consumerIdString, zkUtils, config.excludeInternalTopics).getConsumerThreadIdsPerTopic
  val brokers = zkUtils.getAllBrokersInCluster()
  if (brokers.size == 0) {
     // 1.如果目前沒有可用的brokers ,那麼就先在子節點上註冊監聽事件,當有brokers 啟動的時候進行rebalance。
    warn("no brokers found when trying to rebalance.")
    zkUtils.zkClient.subscribeChildChanges(BrokerIdsPath, loadBalancerListener)
    true
  }
  else {
    // 2.kafka 的 rebalance 會暫時stop the world ,以防止訊息的重複消費。
    closeFetchers(cluster, kafkaMessageAndMetadataStreams, myTopicThreadIdsMap)
    if (consumerRebalanceListener != null) {
      info("Invoking rebalance listener before relasing partition ownerships.")
      consumerRebalanceListener.beforeReleasingPartitions(
        if (topicRegistry.size == 0)
          new java.util.HashMap[String, java.util.Set[java.lang.Integer]]
        else
          topicRegistry.map(topics =>
            topics._1 -> topics._2.keys   // note this is incorrect, see KAFKA-2284
          ).toMap.asJava.asInstanceOf[java.util.Map[String, java.util.Set[java.lang.Integer]]]
      )
    }
    releasePartitionOwnership(topicRegistry)
    val assignmentContext = new AssignmentContext(group, consumerIdString, config.excludeInternalTopics, zkUtils)
    // 3.partition 重分配
    val globalPartitionAssignment = partitionAssignor.assign(assignmentContext)
後續程式碼略

 

 

val globalPartitionAssignment = partitionAssignor.assign(assignmentContext) 發現定義了這樣一個類

繼續跟蹤...

 

val nPartsPerConsumer = curPartitions.size / curConsumers.size // 每個consumer至少保證消費的分割槽數

val nConsumersWithExtraPart = curPartitions.size % curConsumers.size // 還剩下多少個分割槽需要單獨分配給開頭的執行緒們

...

for (consumerThreadId <- consumerThreadIdSet) {   // 對於每一個consumer執行緒

        val myConsumerPosition = curConsumers.indexOf(consumerThreadId)  //算出該執行緒在所有執行緒中的位置,介於[0, n-1]

        assert(myConsumerPosition >= 0)

// startPart 就是這個執行緒要消費的起始分割槽數

        val startPart = nPartsPerConsumer * myConsumerPosition + myConsumerPosition.min(nConsumersWithExtraPart)

// nParts 就是這個執行緒總共要消費多少個分割槽

        val nParts = nPartsPerConsumer + (if (myConsumerPosition + 1 > nConsumersWithExtraPart) 0 else 1)

...

}

 

這裡 kafka 提供兩種分配策略 range和roundrobin,由引數partition.assignment.strategy指定,預設是range策略。本文只討論range策略。所謂的range其實就是按照階段平均分配。

PartitionAssignor object定義了一個工廠方法用於建立不同策略的分割槽分配器,目前Kafka支援兩種再平衡策略(也就是分割槽分配策略):round robin和range。值得注意的是,這裡所說的分割槽策略其實是指指如何將分割槽分配給消費組內的不同consumer例項。

    假設我們有一個topic:T1,T1有10個分割槽,分別是[P0, P9],然後我們有2個consumer,C1和C2。

    下面我們來看看預設的range策略是如何分配分割槽的: 

1. Range策略

 

    對於每一個topic,range策略會首先按照數字順序排序所有可用的分割槽,並按照字典順序列出所有的consumer執行緒。結合我們上面的例子,分割槽順序是0,1,2,3,4,5,6,7,8,9,而consumer執行緒的順序是c1-0, c2-0, c2-1。然後使用分割槽數除以執行緒數以確定每個執行緒至少獲取的分割槽數。在我們的例子中,10/3不能整除,餘數為1,因此c1-0會被額外多分配一個分割槽。最後的分割槽分配如下:

c1-0 獲得分割槽 0 1 2 3

c2-0 獲得分割槽 4 5 6

c2-1 獲得分割槽 7 8 9

如果該topic是11個分割槽,那麼分割槽分配如下:

c1-0 獲取分割槽 0 1 2 3

c2-0 獲取分割槽 4 5 6 7

c2-1 獲取分割槽 8 9 10

2. roundrobin策略——輪詢策略

    如果是輪詢策略,我們上面假設的例子就不適用了,因為該策略要求訂閱某個topic的所有consumer都必須有相同數目的執行緒數。round robin策略與range的一個主要的區別就是在再分配之前你是沒法預測分配結果的——因為它會使用雜湊求模的方式隨機化排序順序。

瞭解了consumer的分配,再瞭解producer的分配就比較容易了。producer的分配與consumer 大致相同

程式碼塊

java

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {    List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
    int numPartitions = partitions.size();
    if (keyBytes == null) {
        int nextValue = counter.getAndIncrement();
        List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
        if (availablePartitions.size() > 0) {
            int part = DefaultPartitioner.toPositive(nextValue) % availablePartitions.size();
            return availablePartitions.get(part).partition();
        } else {
            // no partitions are available, give a non-available partition
            return DefaultPartitioner.toPositive(nextValue) % numPartitions;
        }
    } else {
        // hash the keyBytes to choose a partition
        return DefaultPartitioner.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
    }
}

無key的時候,是隨機選擇一個Partition傳送,如果有key的時候,則是根據key去選擇一個partition傳送。

 

三 rebalance 過程

coordinator 是kafka 的一個協調者,也起到中控的作用。對比rocketMQ將協調者放在了name_server 中,kafka是將協調者放到了其中的一個broker裡面去做。

Kafka的coordiantor要做的事情就是group management,就是要對一個團隊(或者叫組)的成員進行管理。Group management就是要做這些事情:

  • 維持group的成員組成。這包括允許新的成員加入,檢測成員的存活性,清除不再存活的成員。

  • 協調group成員的行為。

Kafka 0.9 版本的一個亮點就是consumer對zk無依賴,而想做到這點無疑要將group放到broker中去管理。很明顯,這就是consumer coordinator所要做的事情,是可以用group management 協議做到的。而cooridnator, 及這個協議,也是為了實現不依賴Zookeeper的高階消費者而提出並實現的。只不過,Kafka對高階消費者的成員管理行為進行了抽象,抽象出來group management功能共有的邏輯,以此設計了Group Management Protocol, 使得這個協議不只適用於Kafka consumer(目前Kafka Connect也在用它),也可以作為其它"group"的管理協議。

還是從consumer消費訊息開始理清coordinator的過程。

首先consumer的pollOnOnce 方法會呼叫 GROUP_COORDINATOR  request -→ kafka server

GROUP_COORDINATOR

程式碼塊

java

def handleGroupCoordinatorRequest(request: RequestChannel.Request) {
  val groupCoordinatorRequest = request.body[GroupCoordinatorRequest]
 
  if (!authorize(request.session, Describe, new Resource(Group, groupCoordinatorRequest.groupId))) {
    val responseBody = new GroupCoordinatorResponse(Errors.GROUP_AUTHORIZATION_FAILED, Node.noNode)
    requestChannel.sendResponse(new RequestChannel.Response(request, responseBody))
  } else {
    //  收到客戶端的GroupCoordinatorRequest請求後,
    //  根據group id雜湊運算完後模上一個特殊的內部topic:__consumer_offsets 的partition總個數(預設配置是50個)得到一個partitionId。
    val partition = coordinator.partitionFor(groupCoordinatorRequest.groupId)
 
    // get metadata (and create the topic if necessary)
    val offsetsTopicMetadata = getOrCreateGroupMetadataTopic(request.listenerName)
 
    val responseBody = if (offsetsTopicMetadata.error != Errors.NONE) {
      new GroupCoordinatorResponse(Errors.GROUP_COORDINATOR_NOT_AVAILABLE, Node.noNode)
    } else {
     // 將這個PartitionId的leader所在的broker節點作為這個消費組的coordinator節點資訊封裝成GroupCoordinatorResponse返回給客戶端。
      val coordinatorEndpoint = offsetsTopicMetadata.partitionMetadata().asScala
        .find(_.partition == partition)
        .map(_.leader())
 
      coordinatorEndpoint match {
        case Some(endpoint) if !endpoint.isEmpty =>
          new GroupCoordinatorResponse(Errors.NONE, endpoint)
        case _ =>
          new GroupCoordinatorResponse(Errors.GROUP_COORDINATOR_NOT_AVAILABLE, Node.noNode)
      }
    }

  

1.收到客戶端請求後會根據 group id雜湊運算完後模上一個特殊的內部topic:__consumer_offsets 的partition總個數(預設配置是50個)得到一個partitionId。zk節點位置如下圖。

2.  將這個PartitionId的leader所在的broker節點作為這個消費組的coordinator節點資訊封裝成GroupCoordinatorResponse返回給客戶端。

3.  如果元資料修改 或者更改了partition 規則(包含第一次請求),則傳送rejoinGroup 請求.

 

4.  consumer 參與到對應的consumer group中,group會對組內的consumer 選主,選主策略為第一個進入group的consumer為leader ,見程式碼: