kafka原始碼分析
原文地址:http://www.aboutyun.com/thread-9938-1-1.html
問題導讀
1.Kafka提供了Producer類作為java producer的api,此類有幾種傳送方式?
2.總結呼叫producer.send方法包含哪些流程?
3.Producer難以理解的在什麼地方?
producer的傳送方式剖析
Kafka提供了Producer類作為java producer的api,該類有sync和async兩種傳送方式。
sync架構圖
async架構圖
呼叫流程如下:
程式碼流程如下:
Producer:當new Producer(new ProducerConfig()),其底層實現,實際會產生兩個核心類的例項:Producer、DefaultEventHandler。在建立的同時,會預設new一個ProducerPool,即我們每new一個
呼叫producer.send方法流程:
當應用程式呼叫producer.send方法時,其內部其實調的是eventhandler.handle(message)方法,eventHandler會首先序列化該訊息,
eventHandler.serialize(events)-->dispatchSerializedData()-->partitionAndCollate()-->send()-->SyncProducer.send()
呼叫邏輯解釋:當
Producer的sync與async傳送訊息處理,大家看以上
partitionAndCollate方法詳細作用:獲取所有partitions的leader所在leaderBrokerId(就是在該partiionid的leader分佈在哪個broker上),
建立一個HashMap>>>,把messages按照brokerId分組組裝資料,然後為SyncProducer分別傳送訊息作準備工作。
名稱解釋:partKey:分割槽關鍵字,當客戶端應用程式實現Partitioner介面時,傳入引數key為分割槽關鍵字,根據key和numPartitions,返回分割槽(partitions)索引。記住partitions分割槽索引是從0開始的。
Producer平滑擴容機制
如果開發過producer客戶端程式碼,會知道metadata.broker.list引數,它的含義是kafak
broker的ip和port列表,producer初始化時,就連線這幾個broker,這時大家會有疑問,producer支援kafka cluster新增broker節點?它又沒有監聽zk broker節點或從zk中獲取broker資訊,答案是肯定的,producer可以支援平滑擴容broker,他是通過定時與現有的metadata.broker.list通訊,獲取新增broker資訊,然後把新建的SyncProducer放入ProducerPool中。等待後續應用程式呼叫。
DefaultEventHandler類中初始化例項化BrokerPartitionInfo類,然後定期brokerPartitionInfo.updateInfo方法,DefaultEventHandler部分程式碼如下: def handle(events: Seq[KeyedMessage[K,V]]) { ...... while (remainingRetries > 0 && outstandingProduceRequests.size > 0) { topicMetadataToRefresh ++= outstandingProduceRequests.map(_.topic) if (topicMetadataRefreshInterval >= 0 && SystemTime.milliseconds - lastTopicMetadataRefreshTime > topicMetadataRefreshInterval) { Utils.swallowError(brokerPartitionInfo.updateInfo(topicMetadataToRefresh.toSet, correlationId.getAndIncrement)) sendPartitionPerTopicCache.clear() topicMetadataToRefresh.clear lastTopicMetadataRefreshTime = SystemTime.milliseconds } outstandingProduceRequests = dispatchSerializedData(outstandingProduceRequests) if (outstandingProduceRequests.size > 0) { info("Back off for %d ms before retrying send. Remaining retries = %d".format(config.retryBackoffMs, remainingRetries-1)) //休眠時間,多長時間重新整理一次 Thread.sleep(config.retryBackoffMs) // 生產者定期請求重新整理最新topics的broker元資料資訊 Utils.swallowError(brokerPartitionInfo.updateInfo(outstandingProduceRequests.map(_.topic).toSet, correlationId.getAndIncrement)) ..... } } }
BrokerPartitionInfo的updateInfo方法程式碼如下:
def updateInfo(topics: Set[String], correlationId: Int) { var topicsMetadata: Seq[TopicMetadata] = Nil //根據topics列表,meta.broker.list,其他配置引數,correlationId表示請求次數,一個計數器引數而已 //建立一個topicMetadataRequest,並隨機的選取傳入的broker資訊中任何一個去取metadata,直到取到為止 val topicMetadataResponse = ClientUtils.fetchTopicMetadata(topics, brokers, producerConfig, correlationId) topicsMetadata = topicMetadataResponse.topicsMetadata // throw partition specific exception topicsMetadata.foreach(tmd =>{ trace("Metadata for topic %s is %s".format(tmd.topic, tmd)) if(tmd.errorCode == ErrorMapping.NoError) { topicPartitionInfo.put(tmd.topic, tmd) } else warn("Error while fetching metadata [%s] for topic [%s]: %s ".format(tmd, tmd.topic, ErrorMapping.exceptionFor(tmd.errorCode).getClass)) tmd.partitionsMetadata.foreach(pmd =>{ if (pmd.errorCode != ErrorMapping.NoError && pmd.errorCode == ErrorMapping.LeaderNotAvailableCode) { warn("Error while fetching metadata %s for topic partition [%s,%d]: [%s]".format(pmd, tmd.topic, pmd.partitionId, ErrorMapping.exceptionFor(pmd.errorCode).getClass)) } // any other error code (e.g. ReplicaNotAvailable) can be ignored since the producer does not need to access the replica and isr metadata }) }) producerPool.updateProducer(topicsMetadata) }
ClientUtils.fetchTopicMetadata方法程式碼:
def fetchTopicMetadata(topics: Set[String], brokers: Seq[Broker], producerConfig: ProducerConfig, correlationId: Int): TopicMetadataResponse = { var fetchMetaDataSucceeded: Boolean = false var i: Int = 0 val topicMetadataRequest = new TopicMetadataRequest(TopicMetadataRequest.CurrentVersion, correlationId, producerConfig.clientId, topics.toSeq) var topicMetadataResponse: TopicMetadataResponse = null var t: Throwable = null val shuffledBrokers = Random.shuffle(brokers) //生成隨機數 while(i ProducerPool的updateProducer def updateProducer(topicMetadata: Seq[TopicMetadata]) { val newBrokers = new collection.mutable.HashSet[Broker] topicMetadata.foreach(tmd => { tmd.partitionsMetadata.foreach(pmd => { if(pmd.leader.isDefined) newBrokers+=(pmd.leader.get) }) }) lock synchronized { newBrokers.foreach(b => { if(syncProducers.contains(b.id)){ syncProducers(b.id).close() syncProducers.put(b.id, ProducerPool.createSyncProducer(config, b)) } else syncProducers.put(b.id, ProducerPool.createSyncProducer(config, b)) }) } }
當我們啟動kafka broker後,並且大量producer和consumer時,經常會報如下異常資訊。
- [email protected]:/opt/soft$ Closing socket connection to 192.168.11.166
筆者也是經常很長時間看原始碼分析,才明白了為什麼ProducerConfig配置資訊裡面並不要求使用者提供完整的kafka叢集的broker資訊,而是任選一個或幾個即可。因為他會通過您選擇的broker和topics資訊而獲取最新的所有的broker資訊。
值得了解的是用於傳送TopicMetadataRequest的SyncProducer雖然是用ProducerPool.createSyncProducer方法建出來的,但用完並不還回ProducerPool,而是直接Close.
重難點理解:
重新整理metadata並不僅在第一次初始化時做。為了能適應kafka broker執行中因為各種原因掛掉、paritition改變等變化,
eventHandler會定期的再去重新整理一次該metadata,重新整理的間隔用引數topic.metadata.refresh.interval.ms定義,預設值是10分鐘。
這裡有三點需要強調:
客戶端呼叫send,
才會新建SyncProducer,只有呼叫send才會去定期重新整理metadata在每次取metadata時,kafka會新建一個SyncProducer去取metadata,邏輯處理完後再close。根據當前SyncProducer(一個Broker的連線)取得的最新的完整的metadata,重新整理ProducerPool中到broker的連線.每10分鐘的重新整理會直接重新把到每個broker的socket連線重建,意味著在這之後的第一個請求會有幾百毫秒的延遲。如果不想要該延遲,把topic.metadata.refresh.interval.ms值改為-1,這樣只有在傳送失敗時,才會重新重新整理。Kafka的叢集中如果某個partition所在的broker掛了,可以檢查錯誤後重啟重新加入叢集,手動做rebalance,producer的連線會再次斷掉,直到rebalance完成,那麼重新整理後取到的連線著中就會有這個新加入的broker。
說明:每個SyncProducer例項化物件會建立一個socket連線
特別注意:
在ClientUtils.fetchTopicMetadata呼叫完成後,回到BrokerPartitionInfo.updateInfo繼續執行,在其末尾,pool會根據上面取得的最新的metadata建立所有的SyncProducer,即Socket通道producerPool.updateProducer(topicsMetadata)
在ProducerPool中,SyncProducer的數目是由該topic的partition數目控制的,即每一個SyncProducer對應一個broker,內部封了一個到該broker的socket連線。每次重新整理時,會把已存在SyncProducer給close掉,即關閉socket連線,然後新建SyncProducer,即新建socket連線,去覆蓋老的。
如果不存在,則直接建立新的。