1. 程式人生 > 其它 >kafka rebalance原始碼分析

kafka rebalance原始碼分析

一句話概述:

kafka重平衡機制以為了保證一個消費者組中消費環境發生變化後仍能夠負載均衡的一種機制。(消費策略的轉移)。

重平衡發生的幾種情況:

  • 有新的消費者加入Consumer Group。

  • 有消費者宕機下線。消費者並不一定需要真正下線,例如遇到長時間的GC、網路延遲導致消費者長時間未向GroupCoordinator傳送HeartbeatRequest時,GroupCoordinator會認為消費者下線。

  • 有消費者主動退出Consumer Group(close,kill -9 pid)。

  • Consumer Group訂閱的任一Topic出現分割槽數量的變化。

  • 消費者呼叫unsubscrible()取消對某Topic的訂閱。

Kafka 為消費者組定義了 5 種狀態,它們分別是:

  • Empty:組內無成員,但是存在未過期的已提交資料。

  • Dead:組內無成員,資料被刪除。

  • PreparingRebalance:準備開始重平衡,所有成員都需要重新請求加入組。

  • CompletingRebalance:所有成員都已經加入,等待分配方案。

  • Stable:重平衡完成,組內消費者可以正常開始消費

rebalance狀態流轉:

rebalance過程中的請求:

讓我們根據這個請求順序圖來解釋一下各個狀態是如何流轉的:

  • Empty(Empty):當一個Group是新建立的,或者內部沒有成員時,狀態就是Empty。我們假設有一個新的消費組,這個消費組的第一個成員傳送FIND_COORDINATOR請求的時候,也就是開啟了Rebalacne的第一個階段。

  • PreparingRebalance(JOIN):當完成FIND_COORDINATOR請求後,對應的客戶端就能找到自己的coordinator節點是哪個,然後緊接著就會發送JOIN_GROUP請求,當coordinator收到這個請求後,就會把狀態由Empty變更為PreparingRebalance,意味著準備要開始rebalance了。

  • CompletingRebalance(SYNC):當所有的成員都完成JOIN_GROUP請求的傳送之後,或者rebalance過程超時後,對應的PreparingRebalance階段就會結束,進而進入CompletingRebalance狀態。

  • Stabe(Stable):在進入CompletingRebalance狀態的時候呢,服務端會返回所有JOIN_GROUP請求對應的響應,然後客戶端收到響應之後立刻就傳送SYNC_GROUP請求,服務端在收到leader傳送的SNYC_GROUP請求後,就會轉換為Stable狀態,意味著整個rebalance過程已經結束了。

group 如何選擇相應的 GroupCoordinator

    要說這個,就必須介紹一下這個 __consumer_offsets topic 了,它是 Kafka 內部使用的一個 topic,專門用來儲存 group 消費的情況,預設情況下有50個 partition,每個 partition 預設有三個副本,而具體的一個 group 的消費情況要儲存到哪一個 partition 上,是根據 abs(GroupId.hashCode()) % NumPartitions 來計算的(其中,NumPartitions 是 __consumer_offsets 的 partition 數,預設是50個)。

    對於 consumer group 而言,是根據其 group.id 進行 hash 並計算得到其具對應的 partition 值,該 partition leader 所在 Broker 即為該 Group 所對應的 GroupCoordinator,GroupCoordinator 會儲存與該 group 相關的所有的 Meta 資訊。

客戶端重平衡的流程:

消費端重平衡分為兩個步驟,對應兩個請求:

JoinGroup

向協調者傳送加入組的請求,同時上報自己訂閱的主題,這樣協調者就能收集到所有成員的訂閱資訊。
一旦收集了全部成員的JoinGroup請求後, Coordinator 會從這些成員中選擇一個擔任這個消費者組的領導者。
領導者消費者的任務是收集所有成員的訂閱資訊,然後根據這些資訊,制定具體的分割槽消費分配方案。
Coordinator 會把消費者組訂閱資訊封裝進JoinGroup請求的 響應體中,然後發給領導者,由領導者統一做出分配方案後。

SyncGroup

領導者消費者(Leader Consumer)分配方案。領導者向 Coordinator 傳送SyncGroup請求,
將剛剛做出的分配方案發給協調者。 值得注意的是,其他成員也會向 Coordinator 傳送SyncGroup請求, 只不過請求體中並沒有實際的內容。
這一步的主要目的是讓 Coordinator 接收分配方案, 然後統一以 SyncGroup 響應的方式分發給所有成員, 這樣組內所有成員就都知道自己該消費哪些分割槽了。

服務端重平衡原始碼:

kafka各種請求的入口:KafkaApis.scala

def handle(request: RequestChannel.Request) {
  try {
    trace(s"Handling request:${request.requestDesc(true)} from connection ${request.context.connectionId};" +
      s"securityProtocol:${request.context.securityProtocol},principal:${request.context.principal}")
    request.header.apiKey match {
      case ApiKeys.PRODUCE => handleProduceRequest(request)
      case ApiKeys.FETCH => handleFetchRequest(request)
      case ApiKeys.LIST_OFFSETS => handleListOffsetRequest(request)
      case ApiKeys.METADATA => handleTopicMetadataRequest(request)
      case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request)
      case ApiKeys.STOP_REPLICA => handleStopReplicaRequest(request)
      case ApiKeys.UPDATE_METADATA => handleUpdateMetadataRequest(request)
      case ApiKeys.CONTROLLED_SHUTDOWN => handleControlledShutdownRequest(request)
      case ApiKeys.OFFSET_COMMIT => handleOffsetCommitRequest(request)
      case ApiKeys.OFFSET_FETCH => handleOffsetFetchRequest(request)
      case ApiKeys.FIND_COORDINATOR => handleFindCoordinatorRequest(request)
      case ApiKeys.JOIN_GROUP => handleJoinGroupRequest(request)
      case ApiKeys.HEARTBEAT => handleHeartbeatRequest(request)
      case ApiKeys.LEAVE_GROUP => handleLeaveGroupRequest(request)
      case ApiKeys.SYNC_GROUP => handleSyncGroupRequest(request)
      case ApiKeys.DESCRIBE_GROUPS => handleDescribeGroupRequest(request)
      case ApiKeys.LIST_GROUPS => handleListGroupsRequest(request)
    }

重平衡的業務處理:GroupCoordinator.scala

加入消費者組

def handleJoinGroup(groupId: String,
                    memberId: String,
                    clientId: String,
                    clientHost: String,
                    rebalanceTimeoutMs: Int,
                    sessionTimeoutMs: Int,
                    protocolType: String,
                    protocols: List[(String, Array[Byte])],
                    responseCallback: JoinCallback) {
   //異常情況返回錯誤碼 1.協調者是否啟動 2.組id是否非法 3.檢查組協調者是否管理此組 4.GroupCoordinator是否已經載入此Consumer Group對應的 offset 分割槽 5.超時時常是否在合法區間
  if (!isActive.get) {
    responseCallback(joinError(memberId, Errors.COORDINATOR_NOT_AVAILABLE))
  } else if (!validGroupId(groupId)) {
    responseCallback(joinError(memberId, Errors.INVALID_GROUP_ID))
  } else if (!isCoordinatorForGroup(groupId)) {
    responseCallback(joinError(memberId, Errors.NOT_COORDINATOR))
  } else if (isCoordinatorLoadInProgress(groupId)) {
    responseCallback(joinError(memberId, Errors.COORDINATOR_LOAD_IN_PROGRESS))
  } else if (sessionTimeoutMs < groupConfig.groupMinSessionTimeoutMs ||
             sessionTimeoutMs > groupConfig.groupMaxSessionTimeoutMs) {
    responseCallback(joinError(memberId, Errors.INVALID_SESSION_TIMEOUT))
  } else {
    // only try to create the group if the group is not unknown AND
    // the member id is UNKNOWN, if member is specified but group does not
    // exist we should reject the request
    //只有在group為建立並且成員id未知的情況下接受請求並進行處理
    groupManager.getGroup(groupId) match {
      case None =>
        if (memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID) {
          info(s"Group match none and memberId is: ${memberId}")
          responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID))
        } else {
            //建立消費者組
          val group = groupManager.addGroup(new GroupMetadata(groupId, initialState = Empty))
          info(s"Group match none and is a new member,GroupMetadata: ${group},next step doJoinGroup.")
          //進行加入組的操作
          doJoinGroup(group, memberId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs, protocolType, protocols, responseCallback)
        }

        //已經存在消費者組的情況下
      case Some(group) =>
        info(s"Group match some group,groupId is ${groupId},next step doJoinGroup.")
          //加入消費者組      
        doJoinGroup(group, memberId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs, protocolType, protocols, responseCallback)
    }
  }
}
//處理加入組的請求:
private def doJoinGroup(group: GroupMetadata,
                        memberId: String,
                        clientId: String,
                        clientHost: String,
                        rebalanceTimeoutMs: Int,
                        sessionTimeoutMs: Int,
                        protocolType: String,
                        protocols: List[(String, Array[Byte])],
                        responseCallback: JoinCallback) {
  group.inLock {
    if (!group.is(Empty) && (!group.protocolType.contains(protocolType) || !group.supportsProtocols(protocols.map(_._1).toSet))) {
      // if the new member does not support the group protocol, reject it
      responseCallback(joinError(memberId, Errors.INCONSISTENT_GROUP_PROTOCOL))
    } else if (group.is(Empty) && (protocols.isEmpty || protocolType.isEmpty)) {
      //reject if first member with empty group protocol or protocolType is empty
      responseCallback(joinError(memberId, Errors.INCONSISTENT_GROUP_PROTOCOL))
    } else if (memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID && !group.has(memberId)) {
      // if the member trying to register with a un-recognized id, send the response to let
      // it reset its member id and retry
      //memberid 無法被當前組識別
      responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID))
    } else {
      group.currentState match {
          //根據Consumer Group的狀態分類進行處理
          //組內沒有任何成員,且元資料已經被清除
        case Dead =>
          // if the group is marked as dead, it means some other thread has just removed the group
          // from the coordinator metadata; this is likely that the group has migrated to some other
          // coordinator OR the group is in a transient unstable phase. Let the member retry
          // joining without the specified member id,
          info("group.currentState is Dead.")
          //直接返回錯誤碼
          responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID))
          //正在重平衡狀態中
        case PreparingRebalance =>
          if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {
            info(s"group.currentState is ${group.currentState},and memberId is ${memberId},next step addMember.")
            //未知消費者申請加入
            addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, clientId, clientHost, protocolType, protocols, group, responseCallback)
          } else {
            info(s"group.currentState is ${group.currentState},and memberId is ${memberId},next step updateMember.")
            val member = group.get(memberId)
            //已知消費者申請加入,需要更新元資料
            updateMemberAndRebalance(group, member, protocols, responseCallback)
          }
        //消費者組下的所有成員已經加入,各個成員正在等待分配方案。
          //可能會轉換狀態到 --> PreparingRebalance
        case CompletingRebalance =>
          if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {
            info(s"group.currentState is ${group.currentState},and memberId is ${memberId},next step addMember.")
            addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, clientId, clientHost, protocolType, protocols, group, responseCallback)
          } else {
            //info(s"group.currentState is ${group.currentState},and memberId is ${memberId}.")
            val member = group.get(memberId)
            //元資料(PartitionAssignor)不變的情況下,只需要返回當前組的資訊即可
            if (member.matches(protocols)) {
              info(s"group.currentState is: ${group.currentState},and memberId is: ${memberId}," +
                s"member.matches(protocols) is : ${member.matches(protocols)}.")
              // member is joining with the same metadata (which could be because it failed to
              // receive the initial JoinGroup response), so just return current group information
              // for the current generation.
              responseCallback(JoinGroupResult(
                members = if (group.isLeader(memberId)) {
                  group.currentMemberMetadata
                } else {
                  Map.empty
                },
                memberId = memberId,
                generationId = group.generationId,
                subProtocol = group.protocolOrNull,
                leaderId = group.leaderOrNull,
                error = Errors.NONE))
            } else {
              // member has changed metadata, so force a rebalance
              //元資料已經改變,強制重平衡
              info(s"group.currentState is: ${group.currentState},and memberId is: ${memberId}," +
                s"member.matches(protocols) is : ${member.matches(protocols)},next step updateMember.")
              updateMemberAndRebalance(group, member, protocols, responseCallback)
            }
          }

          //消費者組未空或者消費者組處於穩定狀態
        case Empty | Stable =>
          if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {
            // if the member id is unknown, register the member to the group
            info(s"group.currentState is: ${group.currentState},and memberId is ${memberId},next step addMember.")
            addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, clientId, clientHost, protocolType, protocols, group, responseCallback)
          } else {
            //穩定狀態下怎麼會發送加入組的請求呢?(兜底機制,不排除有這種可能?)
            info(s"group.currentState is: ${group.currentState},and memberId is ${memberId}.")
            val member = group.get(memberId)
            if (group.isLeader(memberId) || !member.matches(protocols)) {
              // force a rebalance if a member has changed metadata or if the leader sends JoinGroup.
              // The latter allows the leader to trigger rebalances for changes affecting assignment
              // which do not affect the member metadata (such as topic metadata changes for the consumer)
              info(s"group.currentState is: ${group.currentState},and memberId is ${memberId}," +
                s"group.isLeader(memberId): ${group.isLeader(memberId)},!member.matches(protocols) is: ${!member.matches(protocols)},next step updateMember.")
              updateMemberAndRebalance(group, member, protocols, responseCallback)
            } else {
              info(s"group.currentState is: ${group.currentState},and memberId is ${memberId}," +
                s"group.isLeader(memberId): ${group.isLeader(memberId)},!member.matches(protocols) is: ${!member.matches(protocols)},next step updateMember.")
              updateMemberAndRebalance(group, member, protocols, responseCallback)
              // for followers with no actual change to their metadata, just return group information
              // for the current generation which will allow them to issue SyncGroup
              responseCallback(JoinGroupResult(
                members = Map.empty,
                memberId = memberId,
                generationId = group.generationId,
                subProtocol = group.protocolOrNull,
                leaderId = group.leaderOrNull,
                error = Errors.NONE))
            }
          }
      }

      info(s"Group is PreparingRebalance? line(248):${group.is(PreparingRebalance)}")
      // 嘗試完成相關的DelayedJoin
      if (group.is(PreparingRebalance))
        joinPurgatory.checkAndComplete(GroupKey(group.groupId))
    }
  }
}

延時加入組的操作:DelayedJoin

private[group] class DelayedJoin(coordinator: GroupCoordinator,
                                 group: GroupMetadata,
                                 rebalanceTimeout: Long) extends DelayedOperation(rebalanceTimeout, Some(group.lock)) {

  override def tryComplete(): Boolean = coordinator.tryCompleteJoin(group, forceComplete _)
  override def onExpiration() = coordinator.onExpireJoin()
  override def onComplete() = coordinator.onCompleteJoin(group)

    //首先嚐試下是否可以完成延時操作:
  def tryCompleteJoin(group: GroupMetadata, forceComplete: () => Boolean) = {
  group.inLock {
    if (group.notYetRejoinedMembers.isEmpty)
      forceComplete()
    else false
  }
}
//強制執行
def onCompleteJoin(group: GroupMetadata) {
  group.inLock {
    // remove any members who haven't joined the group yet
    // 如果組內成員依舊沒能連上,那麼就刪除它,接收當前JOIN階段
    group.notYetRejoinedMembers.foreach { failedMember =>
      group.remove(failedMember.memberId)
      info(s"Group remove failedMember memberId: ${failedMember.memberId}")
      // TODO: cut the socket connection to the client
    }

    if (!group.is(Dead)) {
      // 狀態機流轉 : preparingRebalancing -> CompletingRebalance
      group.initNextGeneration()
      if (group.is(Empty)) {
        info(s"Group ${group.groupId} with generation ${group.generationId} is now empty " +
          s"(${Topic.GROUP_METADATA_TOPIC_NAME}-${partitionFor(group.groupId)})")

        groupManager.storeGroup(group, Map.empty, error => {
          if (error != Errors.NONE) {
            // we failed to write the empty group metadata. If the broker fails before another rebalance,
            // the previous generation written to the log will become active again (and most likely timeout).
            // This should be safe since there are no active members in an empty generation, so we just warn.
            warn(s"Failed to write empty metadata for group ${group.groupId}: ${error.message}")
          }
        })
      } else {
        // JOIN階段標誌結束日誌
        info(s"Stabilized group ${group.groupId} generation ${group.generationId} " +
          s"(${Topic.GROUP_METADATA_TOPIC_NAME}-${partitionFor(group.groupId)})")

        // trigger the awaiting join group response callback for all the members after rebalancing
        for (member <- group.allMemberMetadata) {
          assert(member.awaitingJoinCallback != null)
          val joinResult = JoinGroupResult(
            // 如果是leader 就返回member列表及其元資料資訊
            members = if (group.isLeader(member.memberId)) {
              group.currentMemberMetadata
            } else {
              Map.empty
            },
            memberId = member.memberId,
            generationId = group.generationId,
            subProtocol = group.protocolOrNull,
            leaderId = group.leaderOrNull,
            error = Errors.NONE)

          member.awaitingJoinCallback(joinResult)
          member.awaitingJoinCallback = null
          completeAndScheduleNextHeartbeatExpiration(group, member)
        }
      }
    }
  }
}

prepareRebalance

&nbsp;&nbsp;&nbsp;&nbsp;協調者處理不同消費者的“加入組請求”,由於不能立即返回“加入組響應”給每個消費者,它會建立一個“延遲操作”,表示協調者會延遲傳送“加入組響應”給消費者。但協調者不會為每個消費者的“ 加入組請求”都建立一個“ 延遲操作”,而是僅當消費組狀態從“穩定”轉變為“準備再平衡”,才建立一個“延遲操作”物件。

private def prepareRebalance(group: GroupMetadata) {
  // if any members are awaiting sync, cancel their request and have them rejoin
  if (group.is(CompletingRebalance))
    resetAndPropagateAssignmentError(group, Errors.REBALANCE_IN_PROGRESS)
  //如果不是Empty就建立一個DelayedJoin 5min;如果是Empty就建立一個InitialDelayedJoin延時任務,超時時間是3s
  val delayedRebalance = if (group.is(Empty))
    new InitialDelayedJoin(this,
      joinPurgatory,
      group,
      groupConfig.groupInitialRebalanceDelayMs,// 預設3000ms,即3s
      groupConfig.groupInitialRebalanceDelayMs,
      max(group.rebalanceTimeoutMs - groupConfig.groupInitialRebalanceDelayMs, 0))
  else
    new DelayedJoin(this, group, group.rebalanceTimeoutMs)// 這裡這個超時時間是客戶端的poll間隔,預設5分鐘
  //狀態轉換為preparingrebalance
  group.transitionTo(PreparingRebalance)

  info(s"Preparing to rebalance group ${group.groupId} with old generation ${group.generationId} " +
    s"(${Topic.GROUP_METADATA_TOPIC_NAME}-${partitionFor(group.groupId)})")

  // 嘗試完成操作
  val groupKey = GroupKey(group.groupId)
  joinPurgatory.tryCompleteElseWatch(delayedRebalance, Seq(groupKey))
}

doSyncGroup:

//處理同步請求:

private def doSyncGroup(group: GroupMetadata,
                        generationId: Int,
                        memberId: String,
                        groupAssignment: Map[String, Array[Byte]],
                        responseCallback: SyncCallback) {
  group.inLock {
    if (!group.has(memberId)) {
      responseCallback(Array.empty, Errors.UNKNOWN_MEMBER_ID)
    } else if (generationId != group.generationId) {
      responseCallback(Array.empty, Errors.ILLEGAL_GENERATION)
    } else {
      group.currentState match {
        case Empty | Dead =>
          responseCallback(Array.empty, Errors.UNKNOWN_MEMBER_ID)

        // 只有group處於compeletingRebalance狀態下才會被處理
        // 其餘狀態都是錯誤的狀態
        case PreparingRebalance =>
          responseCallback(Array.empty, Errors.REBALANCE_IN_PROGRESS)

        case CompletingRebalance =>
          // 給當前member設定回撥,之後就啥也不幹,也不返回
          // 等到leader的分割槽方案就緒後,才會被返回。
          group.get(memberId).awaitingSyncCallback = responseCallback

          // if this is the leader, then we can attempt to persist state and transition to stable
          //只有收到leader的SYNC才會被處理,並進行狀態機流轉
          if (group.isLeader(memberId)) {
            info(s"Assignment received from leader for group ${group.groupId} for generation ${group.generationId} memberId is ${memberId}.")

            // fill any missing members with an empty assignment
            val missing = group.allMembers -- groupAssignment.keySet
            val assignment = groupAssignment ++ missing.map(_ -> Array.empty[Byte]).toMap

            groupManager.storeGroup(group, assignment, (error: Errors) => {
              group.inLock {
                // another member may have joined the group while we were awaiting this callback,
                // so we must ensure we are still in the CompletingRebalance state and the same generation
                // when it gets invoked. if we have transitioned to another state, then do nothing
                if (group.is(CompletingRebalance) && generationId == group.generationId) {
                  if (error != Errors.NONE) {
                    resetAndPropagateAssignmentError(group, error)
                    maybePrepareRebalance(group)
                  } else {
                    //給每個member分配方案
                    setAndPropagateAssignment(group, assignment)
                    // 狀態機流轉:CompletingRebalance -> Stable
                    group.transitionTo(Stable)
                  }
                }
              }
            })
          }

        case Stable =>
          // 如果已經處於stable狀態,說明leader已經把分割槽分配方案傳上來了
          // 那麼直接從group的元資料裡面返回對應的方案就好了
          // if the group is stable, we just return the current assignment
          val memberMetadata = group.get(memberId)
          responseCallback(memberMetadata.assignment, Errors.NONE)
          // 開啟心跳檢測
          completeAndScheduleNextHeartbeatExpiration(group, group.get(memberId))
      }
    }
  }
}

心跳原始碼:

AbstractCoordinator.scala:

if (coordinatorUnknown()) {
    if (findCoordinatorFuture != null || lookupCoordinator().failed())
        // the immediate future check ensures that we backoff properly in the case that no
        // brokers are available to connect to.
        AbstractCoordinator.this.wait(retryBackoffMs);
} else if (heartbeat.sessionTimeoutExpired(now)) {//將上次成功獲取響應的時間與當前時間做對比,超過時間則認為協調者不健康,相協調者標記為unknown
    //心跳超時超過sessiontime,
    // the session timeout has expired without seeing a successful heartbeat, so we should
    // probably make sure the coordinator is still healthy.
    log.info("HeartbeatThread: sessionTimeoutExpired.");
    markCoordinatorUnknown();
} else if (heartbeat.pollTimeoutExpired(now)) {
     //在兩次poll的時間超過設定值,則向協調者傳送離開組的請求。
    // the poll timeout has expired, which means that the foreground thread has stalled
    // in between calls to poll(), so we explicitly leave the group.
    log.info("HeartbeatThread: pollTimeoutExpired.");
    maybeLeaveGroup();
} else if (!heartbeat.shouldHeartbeat(now)) {
    // poll again after waiting for the retry backoff in case the heartbeat failed or the
    // coordinator disconnected
    AbstractCoordinator.this.wait(retryBackoffMs);
} else {
    heartbeat.sentHeartbeat(now);

服務端的對心跳的檢測:

private def completeAndScheduleNextHeartbeatExpiration(group: GroupMetadata, member: MemberMetadata) {
  // complete current heartbeat expectation
  member.latestHeartbeat = time.milliseconds()
  val memberKey = MemberKey(member.groupId, member.memberId)
  heartbeatPurgatory.checkAndComplete(memberKey)

  // reschedule the next heartbeat expiration deadline
  val newHeartbeatDeadline = member.latestHeartbeat + member.sessionTimeoutMs
  //建立  DelayedHeartbeat 物件用於超時檢查,
  val delayedHeartbeat = new DelayedHeartbeat(this, group, member, newHeartbeatDeadline, member.sessionTimeoutMs)
  heartbeatPurgatory.tryCompleteElseWatch(delayedHeartbeat, Seq(memberKey))
}

//延時心跳檢查物件

private[group] class DelayedHeartbeat(coordinator: GroupCoordinator,
                                      group: GroupMetadata,
                                      member: MemberMetadata,
                                      heartbeatDeadline: Long,
                                      sessionTimeout: Long)
  extends DelayedOperation(sessionTimeout, Some(group.lock)) {

  override def tryComplete(): Boolean = coordinator.tryCompleteHeartbeat(group, member, heartbeatDeadline, forceComplete _)
  override def onExpiration() = coordinator.onExpireHeartbeat(group, member, heartbeatDeadline)
  override def onComplete() = coordinator.onCompleteHeartbeat()
}

//心跳超時

def onExpireHeartbeat(group: GroupMetadata, member: MemberMetadata, heartbeatDeadline: Long) {
  group.inLock {
    if (!shouldKeepMemberAlive(member, heartbeatDeadline)) {
      info(s"Member ${member.memberId} in group ${group.groupId}  has failed, removing it from the group")
      //超時後將從組中移除
      removeMemberAndUpdateGroup(group, member)
    }
  }
}

幾個影響重平衡的引數:

  • maxpollinterval:兩次poll之間的時間間隔。

  • session.timeout.ms:表示 consumer 向 broker 傳送心跳的超時時間。預設10s。

  • heartbeat.interval.ms:表示 consumer 每次向 broker 傳送心跳的時間間隔。預設3s,(session.timeout.ms>=3heartbeat.interval.ms)。

幾種場景的重平衡測試:

1.新加入組:

server:
//開始新的重平衡
[2021-12-19 21:21:50,010] INFO [GroupCoordinator 0]: Preparing to rebalance group group-1 with old generation 4 (__consumer_offsets-49) (kafka.coordinator.group.GroupCoordinator)
//所有消費者均加入組
[2021-12-19 21:21:52,727] INFO [GroupCoordinator 0]: Stabilized group group-1 generation 5 (__consumer_offsets-49) (kafka.coordinator.group.GroupCoordinator)
//收到leader的分配方案
[2021-12-19 21:21:52,732] INFO [GroupCoordinator 0]: Assignment received from leader for group group-1 for generation 5 (kafka.coordinator.group.GroupCoordinator)


client:
//尋找協調者
[2022-01-21 11:01:52,692] INFO Discovered coordinator 10.56.40.142:9092 (id: 2147483647 rack: null) for group g1. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
//放棄原來的分割槽策略,原來的分配分割槽為:[]
[2022-01-21 11:01:52,696] INFO Revoking previously assigned partitions [] for group g1 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
//傳送加入組的請求
[2022-01-21 11:01:52,696] INFO (Re-)joining group g1 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
//成功加入組
[2022-01-21 11:01:53,812] INFO Successfully joined group g1 with generation 1 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
//設定新的消費分割槽
[2022-01-21 11:01:53,815] INFO Setting newly assigned partitions [test11-0, test11-1, test11-2] for group g1 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)

2.業務邏輯過重,業務處理的時間超過maxpollinterval

client:
[2022-01-24 15:26:26,598] WARN Auto-commit of offsets {test11-2=OffsetAndMetadata{offset=126, metadata=''}} failed for group g1:
 Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. 
 This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, 
 which typically implies that the poll loop is spending too much time message processing. 
 You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
  (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
server:
[2022-01-24 15:26:26,611] DEBUG [GroupCoordinator 0]: Member consumer22-0b748f59-4fbc-48f1-98e7-102e78b158e3 in group g1 has left, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2022-01-24 15:26:26,611] INFO [GroupCoordinator 0]: Preparing to rebalance group g1 with old generation 6 (__consumer_offsets-42) (kafka.coordinator.group.GroupCoordinator)
[2022-01-24 15:26:26,636] INFO [KafkaApi-0] Handle join group request for correlation id 49 to client consumer22. (kafka.server.KafkaApis)
[2022-01-24 15:26:26,637] INFO [GroupCoordinator 0]: Group match some group,groupId is g1,next step doJoinGroup. (kafka.coordinator.group.GroupCoordinator)
[2022-01-24 15:26:26,638] INFO [GroupCoordinator 0]: group.currentState is PreparingRebalance,and memberId is ,next step addMember. (kafka.coordinator.group.GroupCoordinator)
[2022-01-24 15:26:26,638] INFO [GroupCoordinator 0]: Group is PreparingRebalance? line(248):true (kafka.coordinator.group.GroupCoordinator)
[2022-01-24 15:26:27,193] INFO [KafkaApi-0] Handle join group request for correlation id 791 to client consumer0. (kafka.server.KafkaApis)
[2022-01-24 15:26:27,193] INFO [GroupCoordinator 0]: Group match some group,groupId is g1,next step doJoinGroup. (kafka.coordinator.group.GroupCoordinator)
[2022-01-24 15:26:27,193] INFO [GroupCoordinator 0]: group.currentState is PreparingRebalance,and memberId is consumer0-d45edb18-451e-429d-ab04-1102a95b2f1d,next step updateMember. (kafka.coordinator.group.GroupCoordinator)
[2022-01-24 15:26:27,193] INFO [GroupCoordinator 0]: Group is PreparingRebalance? line(248):true (kafka.coordinator.group.GroupCoordinator)
[2022-01-24 15:26:27,194] INFO [GroupCoordinator 0]: Stabilized group g1 generation 7 (__consumer_offsets-42) (kafka.coordinator.group.GroupCoordinator)
[2022-01-24 15:26:27,194] INFO [KafkaApi-0] Sending join group response JoinGroupResponse(throttleTimeMs=0, error=NONE, generationId=7, groupProtocol=range, memberId=consumer22-3126c929-6c89-47d4-8391-1854486f321e, leaderId=consumer0-d45edb18-451e-429d-ab04-1102a95b2f1d, members=) for correlation id 49 to client consumer22. (kafka.server.KafkaApis)
[2022-01-24 15:26:27,194] INFO [KafkaApi-0] Sending join group response JoinGroupResponse(throttleTimeMs=0, error=NONE, generationId=7, groupProtocol=range, memberId=consumer0-d45edb18-451e-429d-ab04-1102a95b2f1d, leaderId=consumer0-d45edb18-451e-429d-ab04-1102a95b2f1d, members=consumer22-3126c929-6c89-47d4-8391-1854486f321e,consumer0-d45edb18-451e-429d-ab04-1102a95b2f1d) for correlation id 791 to client consumer0. (kafka.server.KafkaApis)
[2022-01-24 15:26:27,196] INFO [GroupCoordinator 0]: Assignment received from leader for group g1 for generation 7 memberId is consumer0-d45edb18-451e-429d-ab04-1102a95b2f1d. (kafka.coordinator.group.GroupCoordinator)
[2022-01-24 15:29:00,213] INFO [GroupMetadataManager brokerId=0] Removed 0 expired offsets in 8 milliseconds. (kafka.coordinator.group.GroupMetadataManager)

3.心跳超時:

修改原始碼模擬超時的情況
synchronized RequestFuture<Void> sendHeartbeatRequest() {
        try {
            log.info("Test heartbeat thread block,sleep 10s");
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        log.debug("Sending Heartbeat request to coordinator {}", coordinator);
        HeartbeatRequest.Builder requestBuilder =
                new HeartbeatRequest.Builder(this.groupId, this.generation.generationId, this.generation.memberId);
        return client.send(coordinator, requestBuilder)
                .compose(new HeartbeatResponseHandler());
    }
server:
[2021-12-20 21:48:47,685] INFO [GroupCoordinator 0]: Member consumer2-dd4a191e-c537-4e3d-b5b9-81c17c5f3136 in group g1  has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2021-12-20 21:48:47,685] INFO [GroupCoordinator 0]: Preparing to rebalance group g1 with old generation 190 (__consumer_offsets-42) (kafka.coordinator.group.GroupCoordinator)
client:
[2021-12-20 21:48:38,681] INFO [Consumer clientId=consumer2, groupId=g1] This is consumer2:  sleep 10s (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)

4.客戶端主動呼叫unsubscribe

時間相差了10s,說明還是通過心跳機制觸發的重平衡。

server:
[2021-12-19 21:59:06,636] INFO [GroupCoordinator 0]: Member consumer-1-1b50470e-f950-4c37-bf95-3156321a8cbb in group group-1 has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2021-12-19 21:59:06,636] INFO [GroupCoordinator 0]: Preparing to rebalance group group-1 with old generation 15 (__consumer_offsets-49) (kafka.coordinator.group.GroupCoordinator)
[2021-12-19 21:59:08,641] INFO [GroupCoordinator 0]: Stabilized group group-1 generation 16 (__consumer_offsets-49) (kafka.coordinator.group.GroupCoordinator)
[2021-12-19 21:59:08,642] INFO [GroupCoordinator 0]: Assignment received from leader for group group-1 for generation 16 (kafka.coordinator.group.GroupCoordinator)


client:
2021-12-19 21:58:57: unsubscribe
if (state != MemberState.STABLE) {
    log.info("MemberState.STABLE:  "+MemberState.STABLE.toString());
    // the group is not stable (perhaps because we left the group or because the coordinator
    // kicked us out), so disable heartbeats and wait for the main thread to rejoin.
    //禁用心跳執行緒
    disable();
    continue;
}

5.客戶端主動呼叫close:

close最終會呼叫:maybeLeaveGroup傳送離開組的請求,所以從日誌上看,close與發生衝平衡幾乎是同一時間,重平衡是通過leave group請求觸發的。

//原始碼:
    public synchronized void maybeLeaveGroup() {
        if (!this.coordinatorUnknown() && this.state != AbstractCoordinator.MemberState.UNJOINED && this.generation != AbstractCoordinator.Generation.NO_GENERATION) {
            this.log.debug("Sending LeaveGroup request to coordinator {}", this.coordinator);
            org.apache.kafka.common.requests.LeaveGroupRequest.Builder request = new org.apache.kafka.common.requests.LeaveGroupRequest.Builder(this.groupId, this.generation.memberId);
            this.client.send(this.coordinator, request).compose(new AbstractCoordinator.LeaveGroupResponseHandler());
            this.client.pollNoWakeup();
        }
        this.resetGeneration();
    }


server:
[2022-01-25 10:38:47,274] DEBUG[GroupCoordinator 0]: Member consumer22-d183389d-9c9d-4f76-9da3-2162f0144923 in group g1 has left, removing it from the group (kafka.coordinator.group.GroupCoordinator)

[2021-12-19 22:02:33,080] INFO [GroupCoordinator 0]: Preparing to rebalance group group-1 with old generation 17 (__consumer_offsets-49) (kafka.coordinator.group.GroupCoordinator)
[2021-12-19 22:02:33,272] INFO [GroupCoordinator 0]: Stabilized group group-1 generation 18 (__consumer_offsets-49) (kafka.coordinator.group.GroupCoordinator)
[2021-12-19 22:02:33,274] INFO [GroupCoordinator 0]: Assignment received from leader for group group-1 for generation 18 (kafka.coordinator.group.GroupCoordinator)
[2021-12-19 22:02:45,544] INFO [GroupMetadataManager brokerId=0] Removed 0 expired offsets in 8 milliseconds. (kafka.coordinator.group.GroupMetadataManager)


client:
2021-12-19 22:02:33: close

從原始碼看 close與unsubscribe都會呼叫maybeLeaveGroup 向服務端傳送離開組的請求,但實際上只有close走的才是handleLeaveGroup處理邏輯,unsubscribe 後心跳執行緒也會停止,導致最終是由心跳觸發重平衡。

6.客戶端程序異常退出:

client:
[2021-12-19 21:25:38,412] INFO [GroupCoordinator 0]: Member consumer-1-3903ffdf-b59a-4e33-bb7b-5287afd4812c in group group-1 has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2021-12-19 21:25:38,412] INFO [GroupCoordinator 0]: Preparing to rebalance group group-1 with old generation 5 (__consumer_offsets-49) (kafka.coordinator.group.GroupCoordinator)
[2021-12-19 21:25:40,868] INFO [GroupCoordinator 0]: Stabilized group group-1 generation 6 (__consumer_offsets-49) (kafka.coordinator.group.GroupCoordinator)
[2021-12-19 21:25:40,869] INFO [GroupCoordinator 0]: Assignment received from leader for group group-1 for generation 6 (kafka.coordinator.group.GroupCoordinator)


server:
[2021-12-19 21:25:40,867] INFO Revoking previously assigned partitions [test6-1] for group group-1 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2021-12-19 21:25:40,867] INFO (Re-)joining group group-1 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2021-12-19 21:25:40,870] INFO Successfully joined group group-1 with generation 6 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
將會多消費一個分割槽
[2021-12-19 21:25:40,871] INFO Setting newly assigned partitions [test6-1, test6-0] for group group-1 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)

小結:

重平衡期間會無法消費,應儘量減少重平衡的次數和時間;

  • 應避免業務邏輯處理時間過長。
  • 網路波動較大的環境可以增加session.timeout.ms或者減少heartbeat.interval.ms,增加心跳檢查的次數。
  • 減少客戶端的重啟。
    排查問題時服務端日誌若無明顯的錯誤提示可能是以下三種情況,有成員新加入組、客戶端主動呼叫close、業務處理超時;業務處理超時會重平衡時間會劣化為maxpollinterval。

參考: