深入學習Kafka:PartitionLeaderSelector原始碼分析
PartitionLeaderSelector主要是為分割槽選舉出leader broker,該trait只定義了一個方法selectLeader,接收一個TopicAndPartition物件和一個LeaderAndIsr物件。
TopicAndPartition表示要選leader的分割槽,而第二個引數表示zookeeper中儲存的該分割槽的當前leader和ISR記錄。該方法會返回一個元組包括了選舉出來的leader和ISR以及需要接收LeaderAndISr請求的一組副本。
trait PartitionLeaderSelector {
def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int])
}
PartitionLeaderSelector的實現類主要有
1. NoOpLeaderSelector
2. OfflinePartitionLeaderSelector
3. ReassignedPartitionLeaderSelector
4. PreferredReplicaPartitionLeaderSelector
5. ControlledShutdownLeaderSelector
NoOpLeaderSelector
NoOpLeaderSelector就是啥也不做的LeaderSelector。
class NoOpLeaderSelector (controllerContext: ControllerContext) extends PartitionLeaderSelector with Logging {
this.logIdent = "[NoOpLeaderSelector]: "
def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = {
warn("I should never have been asked to perform leader election, returning the current LeaderAndIsr and replica assignment." )
(currentLeaderAndIsr, controllerContext.partitionReplicaAssignment(topicAndPartition))
}
}
ControlledShutdownLeaderSelector
當controller收到shutdown命令後,觸發新的分割槽主副本選舉
先找出已分配的副本集合(assignedReplicas),然後過濾出仍存活的副本集合(liveAssignedReplicas),在該列表中選取第一個broker作為該分割槽的主副本
class ControlledShutdownLeaderSelector(controllerContext: ControllerContext)
extends PartitionLeaderSelector
with Logging {
this.logIdent = "[ControlledShutdownLeaderSelector]: "
def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = {
val currentLeaderEpoch = currentLeaderAndIsr.leaderEpoch
val currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion
val currentLeader = currentLeaderAndIsr.leader
//已分配的Replica
val assignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
val liveOrShuttingDownBrokerIds = controllerContext.liveOrShuttingDownBrokerIds
//仍存活的Replica
val liveAssignedReplicas = assignedReplicas.filter(r => liveOrShuttingDownBrokerIds.contains(r))
//當前的ISR列表中濾掉掛掉的broker
val newIsr = currentLeaderAndIsr.isr.filter(brokerId => !controllerContext.shuttingDownBrokerIds.contains(brokerId))
//存活的ISR列表中,選出第一個broker作為該分割槽的Leader(主副本)
liveAssignedReplicas.filter(newIsr.contains).headOption match {
case Some(newLeader) =>
//如果存在,則將當前LeaderEpoch計數器加1,對應的Zookeeper節點的版本號也加1
debug("Partition %s : current leader = %d, new leader = %d".format(topicAndPartition, currentLeader, newLeader))
(LeaderAndIsr(newLeader, currentLeaderEpoch + 1, newIsr, currentLeaderIsrZkPathVersion + 1), liveAssignedReplicas)
case None =>
//不存在則報錯StateChangeFailedException
throw new StateChangeFailedException(("No other replicas in ISR %s for %s besides" +
" shutting down brokers %s").format(currentLeaderAndIsr.isr.mkString(","), topicAndPartition, controllerContext.shuttingDownBrokerIds.mkString(",")))
}
}
}
PreferredReplicaPartitionLeaderSelector
當controller收到分割槽主副本重新優化分配命令後,觸發新的分割槽主副本優化,即將AR裡的第一個取出,作為優化後的主副本
class PreferredReplicaPartitionLeaderSelector(controllerContext: ControllerContext) extends PartitionLeaderSelector
with Logging {
this.logIdent = "[PreferredReplicaPartitionLeaderSelector]: "
def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = {
//已分配的Replica
val assignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
//已分配的Replica列表中第一個即為最優的副本
val preferredReplica = assignedReplicas.head
// check if preferred replica is the current leader
//檢查是否當前分割槽主副本已經是最優的副本,則報錯LeaderElectionNotNeededException
val currentLeader = controllerContext.partitionLeadershipInfo(topicAndPartition).leaderAndIsr.leader
if (currentLeader == preferredReplica) {
throw new LeaderElectionNotNeededException("Preferred replica %d is already the current leader for partition %s"
.format(preferredReplica, topicAndPartition))
} else {
info("Current leader %d for partition %s is not the preferred replica.".format(currentLeader, topicAndPartition) +
" Trigerring preferred replica leader election")
// check if preferred replica is not the current leader and is alive and in the isr
if (controllerContext.liveBrokerIds.contains(preferredReplica) && currentLeaderAndIsr.isr.contains(preferredReplica)) {
//如果當前的最優主副本存活,返回將其設為最優主副本,則將當前LeaderEpoch計數器加1,對應的Zookeeper節點的版本號也加1
(new LeaderAndIsr(preferredReplica, currentLeaderAndIsr.leaderEpoch + 1, currentLeaderAndIsr.isr,
currentLeaderAndIsr.zkVersion + 1), assignedReplicas)
} else {
//如果當前的最優主副本掛掉了,則報錯StateChangeFailedException
throw new StateChangeFailedException("Preferred replica %d for partition ".format(preferredReplica) +
"%s is either not alive or not in the isr. Current leader and ISR: [%s]".format(topicAndPartition, currentLeaderAndIsr))
}
}
}
}
ReassignedPartitionLeaderSelector
在某個topic重新分配分割槽的時候,觸發新的主副本選舉,將存活的ISR中的第一個副本選舉成為主副本(leader)
class ReassignedPartitionLeaderSelector(controllerContext: ControllerContext) extends PartitionLeaderSelector with Logging {
this.logIdent = "[ReassignedPartitionLeaderSelector]: "
/**
* The reassigned replicas are already in the ISR when selectLeader is called.
*/
def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = {
//重新分配的ISR副本集
val reassignedInSyncReplicas = controllerContext.partitionsBeingReassigned(topicAndPartition).newReplicas
val currentLeaderEpoch = currentLeaderAndIsr.leaderEpoch
val currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion
//過濾出仍存活的重新分配的ISR副本集
val aliveReassignedInSyncReplicas = reassignedInSyncReplicas.filter(r => controllerContext.liveBrokerIds.contains(r) &&
currentLeaderAndIsr.isr.contains(r))
//選取ISR中的第一個為主副本
val newLeaderOpt = aliveReassignedInSyncReplicas.headOption
newLeaderOpt match {
//返回ISR中的第一個為主副本,則將當前LeaderEpoch計數器加1,對應的Zookeeper節點的版本號也加1
case Some(newLeader) => (new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, currentLeaderAndIsr.isr,
currentLeaderIsrZkPathVersion + 1), reassignedInSyncReplicas)
case None =>
//如果沒有存活的ISR,則報錯NoReplicaOnlineException,選舉失敗
reassignedInSyncReplicas.size match {
case 0 =>
throw new NoReplicaOnlineException("List of reassigned replicas for partition " +
" %s is empty. Current leader and ISR: [%s]".format(topicAndPartition, currentLeaderAndIsr))
case _ =>
throw new NoReplicaOnlineException("None of the reassigned replicas for partition " +
"%s are in-sync with the leader. Current leader and ISR: [%s]".format(topicAndPartition, currentLeaderAndIsr))
}
}
}
}
OfflinePartitionLeaderSelector
選出新的Leader,新的ISR,步驟如下:
1. 如果至少有一個broker在ISR列表中,並且存活,則將其選為leader,ISR中存活的為新的ISR
2. 如果ISR列表為空,且unclean.leader.election.enable=false,則報錯NoReplicaOnlineException
3. 如果unclean.leader.election.enable=true,即意味著可以選舉不在ISR列表中的broker為Leader,即在AR列表中選出Leader,但是這樣會引起資料不一致
4. 若AR列表也為空,則報錯NoReplicaOnlineException
class OfflinePartitionLeaderSelector(controllerContext: ControllerContext, config: KafkaConfig)
extends PartitionLeaderSelector with Logging {
this.logIdent = "[OfflinePartitionLeaderSelector]: "
def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = {
controllerContext.partitionReplicaAssignment.get(topicAndPartition) match {
case Some(assignedReplicas) =>
val liveAssignedReplicas = assignedReplicas.filter(r => controllerContext.liveBrokerIds.contains(r))
val liveBrokersInIsr = currentLeaderAndIsr.isr.filter(r => controllerContext.liveBrokerIds.contains(r))
val currentLeaderEpoch = currentLeaderAndIsr.leaderEpoch
val currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion
val newLeaderAndIsr = liveBrokersInIsr.isEmpty match {
case true =>
// Prior to electing an unclean (i.e. non-ISR) leader, ensure that doing so is not disallowed by the configuration
// for unclean leader election.
if (!LogConfig.fromProps(config.originals, AdminUtils.fetchEntityConfig(controllerContext.zkUtils,
ConfigType.Topic, topicAndPartition.topic)).uncleanLeaderElectionEnable) {
throw new NoReplicaOnlineException(("No broker in ISR for partition " +
"%s is alive. Live brokers are: [%s],".format(topicAndPartition, controllerContext.liveBrokerIds)) +
" ISR brokers are: [%s]".format(currentLeaderAndIsr.isr.mkString(",")))
}
debug("No broker in ISR is alive for %s. Pick the leader from the alive assigned replicas: %s"
.format(topicAndPartition, liveAssignedReplicas.mkString(",")))
liveAssignedReplicas.isEmpty match {
case true =>
//若AR列表也為空,則報錯NoReplicaOnlineException
throw new NoReplicaOnlineException(("No replica for partition " +
"%s is alive. Live brokers are: [%s],".format(topicAndPartition, controllerContext.liveBrokerIds)) +
" Assigned replicas are: [%s]".format(assignedReplicas))
case false =>
//如果unclean.leader.election.enable=true,即意味著可以選舉不在ISR列表中的broker為Leader,即在AR列表中,選出Leader
ControllerStats.uncleanLeaderElectionRate.mark()
val newLeader = liveAssignedReplicas.head
warn("No broker in ISR is alive for %s. Elect leader %d from live brokers %s. There's potential data loss."
.format(topicAndPartition, newLeader, liveAssignedReplicas.mkString(",")))
new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, List(newLeader), currentLeaderIsrZkPathVersion + 1)
}
case false =>
//如果至少有一個broker在ISR列表中,並且存活,則將其選為leader,ISR中存活的為新的ISR
val liveReplicasInIsr = liveAssignedReplicas.filter(r => liveBrokersInIsr.contains(r))
val newLeader = liveReplicasInIsr.head
debug("Some broker in ISR is alive for %s. Select %d from ISR %s to be the leader."
.format(topicAndPartition, newLeader, liveBrokersInIsr.mkString(",")))
new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, liveBrokersInIsr.toList, currentLeaderIsrZkPathVersion + 1)
}
info("Selected new leader and ISR %s for offline partition %s".format(newLeaderAndIsr.toString(), topicAndPartition))
(newLeaderAndIsr, liveAssignedReplicas)
case None =>
throw new NoReplicaOnlineException("Partition %s doesn't have replicas assigned to it".format(topicAndPartition))
}
}
}