深入學習Kafka:叢集中Controller和Broker之間通訊機制分析
Kafka叢集中,首先會選舉出一個broker作為controller,然後該controller負責跟其他broker進行協調topic建立,partition主副本選舉,topic刪除等事務。
下面我們來分析controller和其他broker的通訊機制
controller會發三種請求給其他broker,即:
- LeaderAndIsrRequest - 針對topic,KafkaController會進行partition選主,產生最新的ISR, 然後傳送LeaderAndIsrRequest到topic的各個replica,即其他broker。
- StopReplicaRequest - 當broker掛掉或使用者刪除某replica時,會發送LeaderAndIsrRequest給其他broker
- UpdateMetadataRequest - 當broker變動(掛掉或重啟), topic建立, partition增加,partition主副本選舉等等時機都需要更新metadata,以便KafkaClient能知道最新的topic的partition資訊,也知道每個partition的leader是哪個broker,知道該向哪個broker讀寫訊息。
為了提高KafkaController Leader和叢集其他broker的通訊效率,ControllerBrokerRequestBatch實現批量傳送請求的功能。
在ControllerBrokerRequestBatch先將一批需要傳送給其他broker的資訊壓入queue中,然後通過ControllerChannelManager從queue中取出資料批量傳送給其他broker。
leaderAndIsrRequestMap:儲存了發往指定broker的LeaderAndIsrRequest請求相關的資訊
stopReplicaRequestMap: 儲存了發往指定broker的StopReplicaRequest請求相關的資訊
updateMetadataRequestMap:儲存了發往指定broker的UpdateMetadataRequest請求相關的資訊
ControllerChannelManager
ControllerChannelManager在構造物件時,會初始化brokerStateInfo,呼叫addNewBroker方法,為每個存活的broker維持一個ControllerBrokerStateInfo物件,這個ControllerBrokerStateInfo物件中有networkClient, brokerNode, messageQueue, requestThread。其中
- networkClient - 負責底層的網路通訊的客戶端
- brokerNode - 有broker的IP,埠號以及機架資訊
- messageQueue - 需要傳送的訊息佇列BlockingQueue
- requestThread - 傳送請求的執行緒RequestSendThread
注意在addNewBroker中,初始化RequestSendThread後,不會立即執行,需要等到startup方法執行時才執行
private def addNewBroker(broker: Broker) {
//初始化阻塞佇列,用於存放發給broker的訊息
val messageQueue = new LinkedBlockingQueue[QueueItem]
debug("Controller %d trying to connect to broker %d".format(config.brokerId, broker.id))
val brokerEndPoint = broker.getBrokerEndPoint(config.interBrokerSecurityProtocol)
val brokerNode = new Node(broker.id, brokerEndPoint.host, brokerEndPoint.port)
//初始化底層的通訊客戶端
val networkClient = {
val channelBuilder = ChannelBuilders.create(
config.interBrokerSecurityProtocol,
Mode.CLIENT,
LoginType.SERVER,
config.values,
config.saslMechanismInterBrokerProtocol,
config.saslInterBrokerHandshakeRequestEnable
)
val selector = new Selector(
NetworkReceive.UNLIMITED,
config.connectionsMaxIdleMs,
metrics,
time,
"controller-channel",
Map("broker-id" -> broker.id.toString).asJava,
false,
channelBuilder
)
new NetworkClient(
selector,
new ManualMetadataUpdater(Seq(brokerNode).asJava),
config.brokerId.toString,
1,
0,
Selectable.USE_DEFAULT_BUFFER_SIZE,
Selectable.USE_DEFAULT_BUFFER_SIZE,
config.requestTimeoutMs,
time
)
}
val threadName = threadNamePrefix match {
case None => "Controller-%d-to-broker-%d-send-thread".format(config.brokerId, broker.id)
case Some(name) => "%s:Controller-%d-to-broker-%d-send-thread".format(name, config.brokerId, broker.id)
}
//初始化傳送請求的執行緒RequestSendThread,但是不開始
val requestThread = new RequestSendThread(config.brokerId, controllerContext, messageQueue, networkClient,
brokerNode, config, time, threadName)
requestThread.setDaemon(false)
brokerStateInfo.put(broker.id, new ControllerBrokerStateInfo(networkClient, brokerNode, messageQueue, requestThread))
}
class ControllerChannelManager(controllerContext: ControllerContext, config: KafkaConfig, time: Time, metrics: Metrics, threadNamePrefix: Option[String] = None) extends Logging {
protected val brokerStateInfo = new HashMap[Int, ControllerBrokerStateInfo]
private val brokerLock = new Object
this.logIdent = "[Channel manager on controller " + config.brokerId + "]: "
controllerContext.liveBrokers.foreach(addNewBroker(_))
def startup() = {
brokerLock synchronized {
brokerStateInfo.foreach(brokerState => startRequestSendThread(brokerState._1))
}
}
def shutdown() = {
brokerLock synchronized {
brokerStateInfo.values.foreach(removeExistingBroker)
}
}
def addBroker(broker: Broker) {
// be careful here. Maybe the startup() API has already started the request send thread
//有可能startup中已經啟動了thread
brokerLock synchronized {
if(!brokerStateInfo.contains(broker.id)) {
addNewBroker(broker)
startRequestSendThread(broker.id)
}
}
}
def removeBroker(brokerId: Int) {
brokerLock synchronized {
removeExistingBroker(brokerStateInfo(brokerId))
}
}
//移除已經存在的broker
private def removeExistingBroker(brokerState: ControllerBrokerStateInfo) {
try {
brokerState.networkClient.close()
brokerState.messageQueue.clear()
brokerState.requestSendThread.shutdown()
brokerStateInfo.remove(brokerState.brokerNode.id)
} catch {
case e: Throwable => error("Error while removing broker by the controller", e)
}
}
//開始請求傳送執行緒
protected def startRequestSendThread(brokerId: Int) {
val requestThread = brokerStateInfo(brokerId).requestSendThread
if(requestThread.getState == Thread.State.NEW)
requestThread.start()
}
}
sendRequest方法中,只是把request壓入對應broker的queue中
def sendRequest(brokerId: Int, apiKey: ApiKeys, apiVersion: Option[Short], request: AbstractRequest, callback: AbstractRequestResponse => Unit = null) {
brokerLock synchronized {
val stateInfoOpt = brokerStateInfo.get(brokerId)
stateInfoOpt match {
case Some(stateInfo) =>
stateInfo.messageQueue.put(QueueItem(apiKey, apiVersion, request, callback))
case None =>
warn("Not sending request %s to broker %d, since it is offline.".format(request, brokerId))
}
}
}
RequestSendThread
當broker被選舉成controller後,KafkaController的onControllerFailover方法會被呼叫,在這個方法中會呼叫ControllerChannelManager的startup方法。
在startup中會依次啟動每個broker的請求傳送執行緒,在RequestSendThread的doWork方法中,會從queue中取出需要傳送的Item,然後一個個傳送給相應的Broker,遇到broker未準備就緒,或者傳送失敗,都會等待300ms後再次重試,直到有收到正確的響應,並呼叫callback方法回撥。
class RequestSendThread(val controllerId: Int,
val controllerContext: ControllerContext,
val queue: BlockingQueue[QueueItem],
val networkClient: NetworkClient,
val brokerNode: Node,
val config: KafkaConfig,
val time: Time,
name: String)
extends ShutdownableThread(name = name) {
private val lock = new Object()
private val stateChangeLogger = KafkaController.stateChangeLogger
private val socketTimeoutMs = config.controllerSocketTimeoutMs
override def doWork(): Unit = {
def backoff(): Unit = CoreUtils.swallowTrace(Thread.sleep(300))
//從queue中取出QueueItem
val QueueItem(apiKey, apiVersion, request, callback) = queue.take()
import NetworkClientBlockingOps._
var clientResponse: ClientResponse = null
try {
lock synchronized {
var isSendSuccessful = false
while (isRunning.get() && !isSendSuccessful) {
// if a broker goes down for a long time, then at some point the controller's zookeeper listener will trigger a
// removeBroker which will invoke shutdown() on this thread. At that point, we will stop retrying.
//如果某個broker宕機了,controller會收到zookeeper事件,會觸發removeBroker,將會關閉該broker相應的thread
try {
if (!brokerReady()) {
//如果broker沒有就緒,則等待300ms後繼續嘗試
isSendSuccessful = false
backoff()
}
else {
val requestHeader = apiVersion.fold(networkClient.nextRequestHeader(apiKey))(networkClient.nextRequestHeader(apiKey, _))
val send = new RequestSend(brokerNode.idString, requestHeader, request.toStruct)
val clientRequest = new ClientRequest(time.milliseconds(), true, send, null)
//傳送請求
clientResponse = networkClient.blockingSendAndReceive(clientRequest)(time)
isSendSuccessful = true
}
} catch {
case e: Throwable => // if the send was not successful, reconnect to broker and resend the message
warn(("Controller %d epoch %d fails to send request %s to broker %s. " +
"Reconnecting to broker.").format(controllerId, controllerContext.epoch,
request.toString, brokerNode.toString()), e)
networkClient.close(brokerNode.idString)
//如果傳送失敗,則等待300ms後繼續嘗試
isSendSuccessful = false
backoff()
}
}
if (clientResponse != null) {
val response = ApiKeys.forId(clientResponse.request.request.header.apiKey) match {
case ApiKeys.LEADER_AND_ISR => new LeaderAndIsrResponse(clientResponse.responseBody)
case ApiKeys.STOP_REPLICA => new StopReplicaResponse(clientResponse.responseBody)
case ApiKeys.UPDATE_METADATA_KEY => new UpdateMetadataResponse(clientResponse.responseBody)
case apiKey => throw new KafkaException(s"Unexpected apiKey received: $apiKey")
}
stateChangeLogger.trace("Controller %d epoch %d received response %s for a request sent to broker %s"
.format(controllerId, controllerContext.epoch, response.toString, brokerNode.toString))
if (callback != null) {
callback(response)
}
}
}
} catch {
case e: Throwable =>
error("Controller %d fails to send a request to broker %s".format(controllerId, brokerNode.toString()), e)
// If there is any socket error (eg, socket timeout), the connection is no longer usable and needs to be recreated.
networkClient.close(brokerNode.idString)
}
}
}
ControllerBrokerRequestBatch
在KafkaController,PartitionStateMachine,ReplicaStateMachine等類中,有Broker,Partition狀態等發生改變時,需要向其他Broker同步這些資訊時,都會呼叫ControllerBrokerRequestBatch來批量傳送。
先看一個例子,先呼叫newBatch方法,然後再呼叫addStopReplicaRequestForBrokers等方法向queue中新增需要傳送的請求,最後呼叫sendRequestsToBrokers方法來完成傳送。
def handleStateChanges(replicas: Set[PartitionAndReplica], targetState: ReplicaState,
callbacks: Callbacks = (new CallbackBuilder).build) {
if(replicas.size > 0) {
info("Invoking state change to %s for replicas %s".format(targetState, replicas.mkString(",")))
try {
brokerRequestBatch.newBatch()
replicas.foreach(r => handleStateChange(r, targetState, callbacks))
brokerRequestBatch.sendRequestsToBrokers(controller.epoch)
}catch {
case e: Throwable => error("Error while moving some replicas to %s state".format(targetState), e)
}
}
}
ControllerBrokerRequestBatch中對於三種請求leaderAndIsrRequest,stopReplicaRequest,updateMetadataRequest維持了不同的map,分別提供的addLeaderAndIsrRequestForBrokers,addStopReplicaRequestForBrokers,addUpdateMetadataRequestForBrokers用來新增對應的Request到map中,最後呼叫sendRequestsToBrokers方法來將不同的request壓入不同的RequestSendThread執行緒佇列中。
class ControllerBrokerRequestBatch(controller: KafkaController) extends Logging {
val controllerContext = controller.controllerContext
val controllerId: Int = controller.config.brokerId
val leaderAndIsrRequestMap = mutable.Map.empty[Int, mutable.Map[TopicPartition, PartitionStateInfo]]
val stopReplicaRequestMap = mutable.Map.empty[Int, Seq[StopReplicaRequestInfo]]
val updateMetadataRequestMap = mutable.Map.empty[Int, mutable.Map[TopicPartition, PartitionStateInfo]]
private val stateChangeLogger = KafkaController.stateChangeLogger
def newBatch() {
// raise error if the previous batch is not empty
if (leaderAndIsrRequestMap.size > 0)
throw new IllegalStateException("Controller to broker state change requests batch is not empty while creating " +
"a new one. Some LeaderAndIsr state changes %s might be lost ".format(leaderAndIsrRequestMap.toString()))
if (stopReplicaRequestMap.size > 0)
throw new IllegalStateException("Controller to broker state change requests batch is not empty while creating a " +
"new one. Some StopReplica state changes %s might be lost ".format(stopReplicaRequestMap.toString()))
if (updateMetadataRequestMap.size > 0)
throw new IllegalStateException("Controller to broker state change requests batch is not empty while creating a " +
"new one. Some UpdateMetadata state changes %s might be lost ".format(updateMetadataRequestMap.toString()))
}
def clear() {
leaderAndIsrRequestMap.clear()
stopReplicaRequestMap.clear()
updateMetadataRequestMap.clear()
}
//新增LeaderAndIsrRequest
def addLeaderAndIsrRequestForBrokers(brokerIds: Seq[Int], topic: String, partition: Int,
leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch,
replicas: Seq[Int], callback: AbstractRequestResponse => Unit = null) {
val topicPartition = new TopicPartition(topic, partition)
brokerIds.filter(_ >= 0).foreach { brokerId =>
val result = leaderAndIsrRequestMap.getOrElseUpdate(brokerId, mutable.Map.empty)
result.put(topicPartition, PartitionStateInfo(leaderIsrAndControllerEpoch, replicas.toSet))
}
addUpdateMetadataRequestForBrokers(controllerContext.liveOrShuttingDownBrokerIds.toSeq,
Set(TopicAndPartition(topic, partition)))
}
//新增StopReplicaRequest
def addStopReplicaRequestForBrokers(brokerIds: Seq[Int], topic: String, partition: Int, deletePartition: Boolean,
callback: (AbstractRequestResponse, Int) => Unit = null) {
brokerIds.filter(b => b >= 0).foreach { brokerId =>
stopReplicaRequestMap.getOrElseUpdate(brokerId, Seq.empty[StopReplicaRequestInfo])
val v = stopReplicaRequestMap(brokerId)
//新增停止副本fetch的request到map中
if(callback != null)
stopReplicaRequestMap(brokerId) = v :+ StopReplicaRequestInfo(PartitionAndReplica(topic, partition, brokerId),
deletePartition, (r: AbstractRequestResponse) => callback(r, brokerId))
else
stopReplicaRequestMap(brokerId) = v :+ StopReplicaRequestInfo(PartitionAndReplica(topic, partition, brokerId),
deletePartition)
}
}
/** Send UpdateMetadataRequest to the given brokers for the given partitions and partitions that are being deleted */
//新增UpdateMetadataRequest
def addUpdateMetadataRequestForBrokers(brokerIds: Seq[Int],
partitions: collection.Set[TopicAndPartition] = Set.empty[TopicAndPartition],
callback: AbstractRequestResponse => Unit = null) {
def updateMetadataRequestMapFor(partition: TopicAndPartition, beingDeleted: Boolean) {
val leaderIsrAndControllerEpochOpt = controllerContext.partitionLeadershipInfo.get(partition)
leaderIsrAndControllerEpochOpt match {
case Some(leaderIsrAndControllerEpoch) =>
val replicas = controllerContext.partitionReplicaAssignment(partition).toSet
val partitionStateInfo = if (beingDeleted) {
val leaderAndIsr = new LeaderAndIsr(LeaderAndIsr.LeaderDuringDelete, leaderIsrAndControllerEpoch.leaderAndIsr.isr)
PartitionStateInfo(LeaderIsrAndControllerEpoch(leaderAndIsr, leaderIsrAndControllerEpoch.controllerEpoch), replicas)
} else {
PartitionStateInfo(leaderIsrAndControllerEpoch, replicas)
}
brokerIds.filter(b => b >= 0).foreach { brokerId =>
updateMetadataRequestMap.getOrElseUpdate(brokerId, mutable.Map.empty[TopicPartition, PartitionStateInfo])
//將最新的metadata資訊放入updateMetadataRequestMap中
updateMetadataRequestMap(brokerId).put(new TopicPartition(partition.topic, partition.partition), partitionStateInfo)
}
case None =>
info("Leader not yet assigned for partition %s. Skip sending UpdateMetadataRequest.".format(partition))
}
}
val filteredPartitions = {
val givenPartitions = if (partitions.isEmpty)
controllerContext.partitionLeadershipInfo.keySet
else
partitions
if (controller.deleteTopicManager.partitionsToBeDeleted.isEmpty)
givenPartitions
else
givenPartitions -- controller.deleteTopicManager.partitionsToBeDeleted
}
if (filteredPartitions.isEmpty)
brokerIds.filter(b => b >= 0).foreach { brokerId =>
updateMetadataRequestMap.getOrElseUpdate(brokerId, mutable.Map.empty[TopicPartition, PartitionStateInfo])
}
else
filteredPartitions.foreach(partition => updateMetadataRequestMapFor(partition, beingDeleted = false))
controller.deleteTopicManager.partitionsToBeDeleted.foreach(partition => updateMetadataRequestMapFor(partition, beingDeleted = true))
}
//依次取出三個map的內容,分別放入對應的queue中
def sendRequestsToBrokers(controllerEpoch: Int) {
try {
//將leaderAndIsrRequest放入queue
leaderAndIsrRequestMap.foreach { case (broker, partitionStateInfos) =>
partitionStateInfos.foreach { case (topicPartition, state) =>
val typeOfRequest = if (broker == state.leaderIsrAndControllerEpoch.leaderAndIsr.leader) "become-leader" else "become-follower"
stateChangeLogger.trace(("Controller %d epoch %d sending %s LeaderAndIsr request %s to broker %d " +
"for partition [%s,%d]").format(controllerId, controllerEpoch, typeOfRequest,
state.leaderIsrAndControllerEpoch, broker,
topicPartition.topic, topicPartition.partition))
}
val leaderIds = partitionStateInfos.map(_._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader).toSet
val leaders = controllerContext.liveOrShuttingDownBrokers.filter(b => leaderIds.contains(b.id)).map {
_.getNode(controller.config.interBrokerSecurityProtocol)
}
val partitionStates = partitionStateInfos.map { case (topicPartition, partitionStateInfo) =>
val LeaderIsrAndControllerEpoch(leaderIsr, controllerEpoch) = partitionStateInfo.leaderIsrAndControllerEpoch
val partitionState = new requests.PartitionState(controllerEpoch, leaderIsr.leader,
leaderIsr.leaderEpoch, leaderIsr.isr.map(Integer.valueOf).asJava, leaderIsr.zkVersion,
partitionStateInfo.allReplicas.map(Integer.valueOf).asJava
)
topicPartition -> partitionState
}
val leaderAndIsrRequest = new LeaderAndIsrRequest(controllerId, controllerEpoch, partitionStates.asJava, leaders.asJava)
//將leaderAndIsrRequest放入queue,而非真正傳送,此處並沒有給callback,不需要回調
controller.sendRequest(broker, ApiKeys.LEADER_AND_ISR, None, leaderAndIsrRequest, null)
}
leaderAndIsrRequestMap.clear()
updateMetadataRequestMap.foreach { case (broker, partitionStateInfos) =>
partitionStateInfos.foreach(p => stateChangeLogger.trace(("Controller %d epoch %d sending UpdateMetadata request %s " +
"to broker %d for partition %s").format(controllerId, controllerEpoch, p._2.leaderIsrAndControllerEpoch,
broker, p._1)))
val partitionStates = partitionStateInfos.map { case (topicPartition, partitionStateInfo) =>
val LeaderIsrAndControllerEpoch(leaderIsr, controllerEpoch) = partitionStateInfo.leaderIsrAndControllerEpoch
val partitionState = new requests.PartitionState(controllerEpoch, leaderIsr.leader,
leaderIsr.leaderEpoch, leaderIsr.isr.map(Integer.valueOf).asJava, leaderIsr.zkVersion,
partitionStateInfo.allReplicas.map(Integer.valueOf).asJava
)
topicPartition -> partitionState
}
val version = if (controller.config.interBrokerProtocolVersion >= KAFKA_0_10_0_IV1) 2: Short
else if (controller.config.interBrokerProtocolVersion >= KAFKA_0_9_0) 1: Short
else 0: Short
val updateMetadataRequest =
if (version == 0) {
val liveBrokers = controllerContext.liveOrShuttingDownBrokers.map(_.getNode(SecurityProtocol.PLAINTEXT))
new UpdateMetadataRequest(controllerId, controllerEpoch, liveBrokers.asJava, partitionStates.asJava)
}
else {
val liveBrokers = controllerContext.liveOrShuttingDownBrokers.map { broker =>
val endPoints = broker.endPoints.map { case (securityProtocol, endPoint) =>
securityProtocol -> new UpdateMetadataRequest.EndPoint(endPoint.host, endPoint.port)
}
new UpdateMetadataRequest.Broker(broker.id, endPoints.asJava, broker.rack.orNull)
}
new UpdateMetadataRequest(version, controllerId, controllerEpoch, partitionStates.asJava, liveBrokers.asJava)
}
//將updateMetadataRequest放入queue,而非真正傳送,此處並沒有給callback,不需要回調
controller.sendRequest(broker, ApiKeys.UPDATE_METADATA_KEY, Some(version), updateMetadataRequest, null)
}
updateMetadataRequestMap.clear()
stopReplicaRequestMap.foreach { case (broker, replicaInfoList) =>
val stopReplicaWithDelete = replicaInfoList.filter(_.deletePartition).map(_.replica).toSet
val stopReplicaWithoutDelete = replicaInfoList.filterNot(_.deletePartition).map(_.replica).toSet
debug("The stop replica request (delete = true) sent to broker %d is %s"
.format(broker, stopReplicaWithDelete.mkString(",")))
debug("The stop replica request (delete = false) sent to broker %d is %s"
.format(broker, stopReplicaWithoutDelete.mkString(",")))
replicaInfoList.foreach { r =>
val stopReplicaRequest = new StopReplicaRequest(controllerId, controllerEpoch, r.deletePartition,
Set(new TopicPartition(r.replica.topic, r.replica.partition)).asJava)
//將stopReplicaRequest放入queue,而非真正傳送
controller.sendRequest(broker, ApiKeys.STOP_REPLICA, None, stopReplicaRequest, r.callback)
}
}
stopReplicaRequestMap.clear()
} catch {
case e : Throwable => {
if (leaderAndIsrRequestMap.size > 0) {
error("Haven't been able to send leader and isr requests, current state of " +
s"the map is $leaderAndIsrRequestMap. Exception message: $e")
}
if (updateMetadataRequestMap.size > 0) {
error("Haven't been able to send metadata update requests, current state of " +
s"the map is $updateMetadataRequestMap. Exception message: $e")
}
if (stopReplicaRequestMap.size > 0) {
error("Haven't been able to send stop replica requests, current state of " +
s"the map is $stopReplicaRequestMap. Exception message: $e")
}
throw new IllegalStateException(e)
}
}
}
}
這篇博文並未對底層的通訊協議進行展開分析,後續會有專門博文來分析協議相關的東西。