kafka原始碼分析之kafkaApis
KafkaApis
說明:用於處理對kafka的訊息請求的中心轉發元件,kafkaapis需要依賴於如下幾個元件:
apis = new KafkaApis(socketServer.requestChannel, replicaManager,
consumerCoordinator,kafkaController, zkUtils, config.brokerId, config, metadataCache, metrics,
authorizer)
其最核心的處理主要由KafkaApis中的handle函式進行排程.
請求處理池
在KafkaApis例項生成後,會同時生成一個KafkaRequestHandlerPool
這個例項主要用於對kafka的請求進行處理的例項,需要依賴如下幾個元件與配置:
配置項num.io.threads,預設值8,用於處理IO操作的執行緒個數.
requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId,
socketServer.requestChannel, apis, config.numIoThreads)
這裡會根據io的執行緒個數,生成對應的處理執行緒KafkaRequestHandler.
this.logIdent = "[Kafka Request Handler on Broker "
接下來看看KafkaRequestHandler執行緒:
def run() {while(true) {try {
這裡從請求佇列中,取出一個請求,直接交給KafkaApis進行處理.var req : RequestChannel.Request = null while (req == null) {// We use a single meter for aggregate idle percentage for the thread pool. // Since meter is calculated as total_recorded_value / time_window and // time_window is independent of the number of threads, each recorded idle // time should be discounted by # threads.val startSelectTime = SystemTime.nanosecondsreq = requestChannel.receiveRequest(300)val idleTime = SystemTime.nanoseconds - startSelectTime aggregateIdleMeter.mark(idleTime / totalHandlerThreads) }if(req eq RequestChannel.AllDone) { debug("Kafka request handler %d on broker %d received shut down
command".format( id, brokerId))return} req.requestDequeueTimeMs = SystemTime.millisecondstrace("Kafka request handler %d on broker %d handling request %s".format(id,
brokerId, req)) apis.handle(req) } catch {case e: Throwable => error("Exception when handling request", e) } }}
對網路請求進行處理
這個部分通過KafkaApis中的handle函式進行處理,並根據不同的請求路由進行不同的處理.
處理metadata更新請求
當某個partition發生變化後,會通過生成UpdateMetadataRequest請求向所有的brokers傳送這個請求,也就是說每一個活著的broker都會接受到metadata變化的請求,並對請求進行處理.
這個處理在partition的狀態發生變化,partition重新分配,broker的啟動與停止時,會發起update metadata的請求.
入口通過KafkaApis中的handle函式
caseRequestKeys.UpdateMetadataKey => handleUpdateMetadataRequest(request)
接下來看看handleUpdateMetadataRequest的函式處理流程:
def handleUpdateMetadataRequest(request: RequestChannel.Request) {val updateMetadataRequest =
request.requestObj.asInstanceOf[UpdateMetadataRequest]
首先檢查當前的使用者是否有ClusterAction操作的許可權,如果有接著執行下面的流程。 authorizeClusterAction(request)
根據請求的metadata的更新訊息,更新對memtadataCache中的內容。這個包含有broker的新增與刪除,partition的狀態更新等。 replicaManager.maybeUpdateMetadataCache(updateMetadataRequest, metadataCache)val updateMetadataResponse = new UpdateMetadataResponse(
updateMetadataRequest.correlationId)
requestChannel.sendResponse(new Response(request,
new RequestOrResponseSend(request.connectionId, updateMetadataResponse)))}
看看ReplicaManager中處理對更新metadata的請求的流程:
在副本管理元件中,直接通過MetadataCache中的updateCache函式對請求過來的訊息進行處理,用於更新當前的broker中的cache資訊。
更新cache的流程:
1,更新cache中用於儲存所有的broker節點的aliveBrokers集合。
2,對請求過來的修改過狀態的partition的集合進行迭代,
2,1,如果partition的leader的節點被標記為-2,表示這是一個被刪除的partition,從cache集合中找到這個partition對應的topic的子集合,並從這個集合中移出這個partition,如果這個topic中已經不在包含partition時,從cache中直接移出掉這個topic.
2,2,這種情況下,表示是對partition的狀態的修改,包含partition的副本資訊,與partition的leader的isr的資訊,直接更新cache集合中topic子集合中對應此partition的狀態資訊。
def maybeUpdateMetadataCache(updateMetadataRequest: UpdateMetadataRequest,
metadataCache: MetadataCache) {replicaStateChangeLock synchronized {if(updateMetadataRequest.controllerEpoch < controllerEpoch) {val stateControllerEpochErrorMessage = ("Broker %d received update metadata
request with correlation id %d from an " +"old controller %d with epoch %d. Latest known controller epoch is %d")
.format(localBrokerId,updateMetadataRequest.correlationId, updateMetadataRequest.controllerId,
updateMetadataRequest.controllerEpoch,controllerEpoch)stateChangeLogger.warn(stateControllerEpochErrorMessage)throw new ControllerMovedException(stateControllerEpochErrorMessage) } else { metadataCache.updateCache(updateMetadataRequest, localBrokerId,
stateChangeLogger)controllerEpoch = updateMetadataRequest.controllerEpoch } }}
處理partition的LeaderAndIsr請求
這個請求主要是針對partition的leader或者isr發生變化後的請求處理.這個接收請求的broker節點一定會是包含有對應的partition的副本的節點才會被接收到資料.
caseRequestKeys.LeaderAndIsrKey => handleLeaderAndIsrRequest(request)
接下來看看handleLeaderAndIsrRequest的處理流程:
defhandleLeaderAndIsrRequest(request: RequestChannel.Request) {
首先先得到請求的內容.針對一個LeaderAndIsr的請求,得到的請求內容是一個LeaderAndIsrRequest的例項.val leaderAndIsrRequest = request.requestObj.asInstanceOf[LeaderAndIsrRequest]
檢查當前的請求使用者是否具備ClusterAction操作的許可權. authorizeClusterAction(request)try {
這個函式用於在partition的isr被改變後,對成為leader的副本與成為follower的副本判斷這個副本對應的topic是否是內建的__consumer_offsets topic,通過GroupMetadataManager中的對應函式來處理內建的topic的leader上線與下線的操作.def onLeadershipChange(updatedLeaders: Iterable[Partition],
updatedFollowers: Iterable[Partition]) {updatedLeaders.foreach { partition =>if (partition.topic == GroupCoordinator.GroupMetadataTopicName) coordinator.handleGroupImmigration(partition.partitionId) } updatedFollowers.foreach { partition =>if (partition.topic == GroupCoordinator.GroupMetadataTopicName) coordinator.handleGroupEmigration(partition.partitionId) } }
根據請求的partition,通過副本管理元件來對partition進行leader或者follower的選擇.// call replica manager to handle updating partitions to become leader or followerval result = replicaManager.becomeLeaderOrFollower(leaderAndIsrRequest,
metadataCache, onLeadershipChange)val leaderAndIsrResponse = new LeaderAndIsrResponse(
leaderAndIsrRequest.correlationId,
result.responseMap, result.errorCode)
生成操作成功後的返回結果,並向請求方進行響應.requestChannel.sendResponse(new Response(request,
new RequestOrResponseSend(request.connectionId, leaderAndIsrResponse)))
} catch {case e: KafkaStorageException => fatal("Disk error during leadership change.", e)Runtime.getRuntime.halt(1) }}
在ReplicaManager中的becomeLeaderOrFollower函式:
這個函式用於判斷指定的partition是應該成為leader還是應該成為follower.def becomeLeaderOrFollower(leaderAndISRRequest: LeaderAndIsrRequest,metadataCache: MetadataCache,onLeadershipChange: (Iterable[Partition],
Iterable[Partition]) => Unit): BecomeLeaderOrFollowerResult = {
leaderAndISRRequest.partitionStateInfos.foreach { case (
(topic, partition), stateInfo) =>stateChangeLogger.trace("日誌) }
replicaStateChangeLock synchronized {val responseMap = new mutable.HashMap[(String, Int), Short]
如果當前請求的epoch的值小於當前controllerEpoch的值,列印warn的日誌,
並返回StaleControllerEpochCode錯誤程式碼.if (leaderAndISRRequest.controllerEpoch < controllerEpoch) { leaderAndISRRequest.partitionStateInfos.foreach {
case ((topic, partition), stateInfo) =>stateChangeLogger.warn(("日誌) }
BecomeLeaderOrFollowerResult(responseMap,
ErrorMapping.StaleControllerEpochCode) } else {
這裡得到當前請求的最新的epoch的值,並設定當前的broker的epoch的值為請求的值,val controllerId = leaderAndISRRequest.controllerIdval correlationId = leaderAndISRRequest.correlationIdcontrollerEpoch = leaderAndISRRequest.controllerEpoch
對請求的所有的partition進行迭代,並對partition的狀態進行檢查.// First check partition's leader epochval partitionState = new mutable.HashMap[Partition, PartitionStateInfo]() leaderAndISRRequest.partitionStateInfos.foreach {
case ((topic, partitionId), partitionStateInfo) =>
這裡通過getOrCreatePartition從allPartitions集合中得到這個partition的例項,如果這個partition的例項在集合中不存在時,會建立這個例項.val partition = getOrCreatePartition(topic, partitionId)val partitionLeaderEpoch = partition.getLeaderEpoch()
檢查當前的partition中的leaderEpoch的值是否小於新請求的值,如果小於這個值,同時這個partition對應的副本包含有當前的broker時,把這個partition與狀態新增到partitionState的集合中,否則表示當前的broker中不包含這個partition的副本,列印一個日誌,並在responseMap中記錄這個partition的error code為UnknownTopicOrPartitionCode.如果partition的leaderEpoch的值大於或等於請求的epoch的值,列印日誌,並在responseMap中新增這個partition的error code的值為StaleLeaderEpochCode.if (partitionLeaderEpoch < partitionStateInfo.
leaderIsrAndControllerEpoch.leaderAndIsr.leaderEpoch) {if(partitionStateInfo.allReplicas.contains(config.brokerId)) partitionState.put(partition, partitionStateInfo)else {stateChangeLogger.warn(("日誌)
responseMap.put((topic, partitionId),
ErrorMapping.UnknownTopicOrPartitionCode) } } else {// Otherwise record the error code in responsestateChangeLogger.warn(("日誌) responseMap.put((topic, partitionId), ErrorMapping.StaleLeaderEpochCode) } }
這裡根據partition的副本包含有當前的broker節點的所有的partition的集合,得到這個partition的leader是當前的broker的所有的partition的集合,同時得到包含有當前的broker的副本的partition中,leader不是當前的broker的所有的partition的集合.val partitionsTobeLeader = partitionState.filter {
case (partition, partitionStateInfo) => partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader ==
config.brokerId}val partitionsToBeFollower = (partitionState -- partitionsTobeLeader.keys)
如果partitions需要被切換成leader的集合不為空,對這些需要在當前的broker中的partition執行leader操作的集合執行makeLeaders函式.這裡得到的集合是partition中當前broker被搞成leader的partition集合,val partitionsBecomeLeader = if (!partitionsTobeLeader.isEmpty) makeLeaders(controllerId, controllerEpoch, partitionsTobeLeader,
leaderAndISRRequest.correlationId, responseMap)elseSet.empty[Partition]
如果partitions需要被切換成follower的集合不為空,對這些需要在當前的broker中的partition執行follower操作的集合執行makeFollowers函式.這裡得到的集合是partition中當前broker被搞成follower的partition集合,val partitionsBecomeFollower = if (!partitionsToBeFollower.isEmpty) makeFollowers(controllerId, controllerEpoch, partitionsToBeFollower,
leaderAndISRRequest.correlationId, responseMap, metadataCache)elseSet.empty[Partition]
更新每個partition中最後一個offset的值到日誌目錄下的checkpoint檔案中.if (!hwThreadInitialized) { startHighWaterMarksCheckPointThread()hwThreadInitialized = true}
停止掉沒有partition引用的fetcher的執行緒.這個執行緒用於對partition的訊息的同步,從leader的partition中同步資料到follower中.replicaFetcherManager.shutdownIdleFetcherThreads()
這裡根據當前節點是leader的partition集合與當前節點變成follower的partition集合,檢查這些partition對應的topic是否是__consumer_offsets topic,這個topic用來記錄每個consumer對應的消費的offset的資訊,如果是這個topic的partition時,根據leader與follower的集合,通過GroupMetadataManager例項對兩個集合分別執行partition的leader的上線與下線的操作. onLeadershipChange(partitionsBecomeLeader, partitionsBecomeFollower)BecomeLeaderOrFollowerResult(responseMap, ErrorMapping.NoError) } }}
Partition的leader設定
在LeaderAndIsr的請求過來時,如果請求的訊息中對應的partition的leader是當前的broker節點時,會根據這個partitions的集合執行ReplicaManager.makeLeaders的操作,
*/private def makeLeaders(controllerId: Int,epoch: Int,partitionState: Map[Partition, PartitionStateInfo],correlationId: Int,responseMap: mutable.Map[(String, Int), Short])
: Set[Partition] = { partitionState.foreach(state =>stateChangeLogger.trace(這裡列印日誌)
對所有要把當前節點設定成leader的partition設定為NoError的錯誤程式碼,這個表示沒有錯誤.for (partition <- partitionState.keys) responseMap.put((partition.topic, partition.partitionId),
ErrorMapping.NoError)val partitionsToMakeLeaders: mutable.Set[Partition] = mutable.Set()try {
這裡從同步partition的訊息的執行緒中移出需要在當前的broker中成為Leader的partition.// First stop fetchers for all the partitionsreplicaFetcherManager.removeFetcherForPartitions(
partitionState.keySet.map(new TopicAndPartition(_)))
這裡根據需要在當前的broker中設定成leader的partition的集合進行迭代,根據迭代的Partition例項中的makeLeader函式來設定partition的leader,並得到成功設定leader的所有的partition的集合(如果partition的leader本身就在這個broker上,這個函式返回的值為false).這個函式的最後返回這個被生成設定leader的partitions的集合.// Update the partition information to be the leaderpartitionState.foreach{ case (partition, partitionStateInfo) =>if (partition.makeLeader(controllerId, partitionStateInfo, correlationId)) partitionsToMakeLeaders += partitionelsestateChangeLogger.info(("日誌"));} partitionsToMakeLeaders.foreach { partition =>stateChangeLogger.trace(日誌) } } catch {case e: Throwable => partitionState.foreach { state =>v