Kafka Producer同步模式傳送message原始碼分析
先把幾個比較重要的方法列出來
// 入口,處理 message及messages
def handle(events: Seq[KeyedMessage[K,V]])
//處理序列化的KeyedMessage資料
private def dispatchSerializedData(messages: Seq[KeyedMessage[K,Message]]): Seq[KeyedMessage[K, Message]]
// 為messages分割槽
def partitionAndCollate(messages: Seq[KeyedMessage[K,Message]]): Option[Map[Int, collection.mutable.Map[TopicAndPartition, Seq[KeyedMessage[K,Message]]]]]
/**
* Constructs and sends the produce request based on a map from (topic, partition) -> messages
*
* @param brokerId the broker that will receive the request
* @param messagesPerTopic the messages as a map from (topic, partition) -> messages
* @return the set (topic, partitions) messages which incurred an error sending or processing
*/
private def send(brokerId: Int, messagesPerTopic: collection.mutable.Map[TopicAndPartition, ByteBufferMessageSet])
kafka在sync模式下發送訊息時,是通過DefaultEventHandler的handle方法把message傳送給broker,可以是一條訊息也可以是多條訊息組成的List。
用List舉例:
DefaultEventHandler得到messages(可能包含多個topic的messages)後,呼叫dispatchSerializedData方法,遍歷message為各個topic的每個message進行partition分割槽。如果message中key!=null, 則呼叫配置的partition.class定義的分割槽規則類,得到partitionIndex,否則在sendPartitionPerTopicCache的記錄中去獲取topic的partitionId,然後找到partitionIndex對應的leaderBrokerId。所以1個message會分配給1個topic的1個partition的leaderBrokerId.
package kafka.producer.async
import kafka.common._
import kafka.message.{NoCompressionCodec, Message, ByteBufferMessageSet}
import kafka.producer._
import kafka.serializer.Encoder
import kafka.utils.{Utils, Logging, SystemTime}
import scala.util.Random
import scala.collection.{Seq, Map}
import scala.collection.mutable.{ArrayBuffer, HashMap, Set}
import java.util.concurrent.atomic._
import kafka.api.{TopicMetadata, ProducerRequest}
class DefaultEventHandler[K,V](config: ProducerConfig,
private val partitioner: Partitioner[K],
private val encoder: Encoder[V],
private val keyEncoder: Encoder[K],
private val producerPool: ProducerPool,
private val topicPartitionInfos: HashMap[String, TopicMetadata] = new HashMap[String, TopicMetadata])
extends EventHandler[K,V] with Logging {
val isSync = ("sync" == config.producerType)
val correlationId = new AtomicInteger(0)
val brokerPartitionInfo = new BrokerPartitionInfo(config, producerPool, topicPartitionInfos)
private val topicMetadataRefreshInterval = config.topicMetadataRefreshIntervalMs
private var lastTopicMetadataRefreshTime = 0L
private val topicMetadataToRefresh = Set.empty[String]
private val sendPartitionPerTopicCache = HashMap.empty[String, Int]
private val producerStats = ProducerStatsRegistry.getProducerStats(config.clientId)
private val producerTopicStats = ProducerTopicStatsRegistry.getProducerTopicStats(config.clientId)
// 入口,處理 message及messages
def handle(events: Seq[KeyedMessage[K,V]]) {
val serializedData = serialize(events)
serializedData.foreach {
keyed =>
val dataSize = keyed.message.payloadSize
producerTopicStats.getProducerTopicStats(keyed.topic).byteRate.mark(dataSize)
producerTopicStats.getProducerAllTopicsStats.byteRate.mark(dataSize)
}
var outstandingProduceRequests = serializedData
var remainingRetries = config.messageSendMaxRetries + 1
val correlationIdStart = correlationId.get()
debug("Handling %d events".format(events.size))
while (remainingRetries > 0 && outstandingProduceRequests.size > 0) {
topicMetadataToRefresh ++= outstandingProduceRequests.map(_.topic)
if (topicMetadataRefreshInterval >= 0 &&
SystemTime.milliseconds - lastTopicMetadataRefreshTime > topicMetadataRefreshInterval) {
Utils.swallowError(brokerPartitionInfo.updateInfo(topicMetadataToRefresh.toSet, correlationId.getAndIncrement))
sendPartitionPerTopicCache.clear()
topicMetadataToRefresh.clear
lastTopicMetadataRefreshTime = SystemTime.milliseconds
}
// 處理序列化的資料
outstandingProduceRequests = dispatchSerializedData(outstandingProduceRequests)
if (outstandingProduceRequests.size > 0) {
info("Back off for %d ms before retrying send. Remaining retries = %d".format(config.retryBackoffMs, remainingRetries-1))
// back off and update the topic metadata cache before attempting another send operation
Thread.sleep(config.retryBackoffMs)
// get topics of the outstanding produce requests and refresh metadata for those
Utils.swallowError(brokerPartitionInfo.updateInfo(outstandingProduceRequests.map(_.topic).toSet, correlationId.getAndIncrement))
sendPartitionPerTopicCache.clear()
remainingRetries -= 1
producerStats.resendRate.mark()
}
}
if(outstandingProduceRequests.size > 0) {
producerStats.failedSendRate.mark()
val correlationIdEnd = correlationId.get()
error("Failed to send requests for topics %s with correlation ids in [%d,%d]"
.format(outstandingProduceRequests.map(_.topic).toSet.mkString(","),
correlationIdStart, correlationIdEnd-1))
throw new FailedToSendMessageException("Failed to send messages after " + config.messageSendMaxRetries + " tries.", null)
}
}
//處理序列化的資料
private def dispatchSerializedData(messages: Seq[KeyedMessage[K,Message]]): Seq[KeyedMessage[K, Message]] = {
//按規則為messages分配分割槽
val partitionedDataOpt = partitionAndCollate(messages)
partitionedDataOpt match {
case Some(partitionedData) =>
val failedProduceRequests = new ArrayBuffer[KeyedMessage[K,Message]]
try {
//遍歷每個leaderBrokerId,傳送messages
for ((brokerid, messagesPerBrokerMap) <- partitionedData) {
if (logger.isTraceEnabled)
messagesPerBrokerMap.foreach(partitionAndEvent =>
trace("Handling event for Topic: %s, Broker: %d, Partitions: %s".format(partitionAndEvent._1, brokerid, partitionAndEvent._2)))
val messageSetPerBroker = groupMessagesToSet(messagesPerBrokerMap)
//傳送訊息到brokerid
val failedTopicPartitions = send(brokerid, messageSetPerBroker)
failedTopicPartitions.foreach(topicPartition => {
messagesPerBrokerMap.get(topicPartition) match {
case Some(data) => failedProduceRequests.appendAll(data)
case None => // nothing
}
})
}
} catch {
case t: Throwable => error("Failed to send messages", t)
}
failedProduceRequests
case None => // all produce requests failed
messages
}
}
def serialize(events: Seq[KeyedMessage[K,V]]): Seq[KeyedMessage[K,Message]] = {
val serializedMessages = new ArrayBuffer[KeyedMessage[K,Message]](events.size)
events.map{e =>
try {
if(e.hasKey)
serializedMessages += KeyedMessage[K,Message](topic = e.topic, key = e.key, message = new Message(key = keyEncoder.toBytes(e.key), bytes = encoder.toBytes(e.message)))
else
serializedMessages += KeyedMessage[K,Message](topic = e.topic, key = null.asInstanceOf[K], message = new Message(bytes = encoder.toBytes(e.message)))
} catch {
case t: Throwable =>
producerStats.serializationErrorRate.mark()
if (isSync) {
throw t
} else {
// currently, if in async mode, we just log the serialization error. We need to revisit
// this when doing kafka-496
error("Error serializing message for topic %s".format(e.topic), t)
}
}
}
serializedMessages
}
// 為messages分割槽
def partitionAndCollate(messages: Seq[KeyedMessage[K,Message]]): Option[Map[Int, collection.mutable.Map[TopicAndPartition, Seq[KeyedMessage[K,Message]]]]] = {
//定義一個leaderId ==> (topic,messages)的對映
val ret = new HashMap[Int, collection.mutable.Map[TopicAndPartition, Seq[KeyedMessage[K,Message]]]]
try {
for (message <- messages) {
//獲取topic的partition集合
val topicPartitionsList = getPartitionListForTopic(message)
//為message分配partition index,如果message的key不等於null,則會呼叫配置的partitioner.class
val partitionIndex = getPartition(message.topic, message.key, topicPartitionsList)
//根據partitionIndex獲取對應的partition-broker資訊
val brokerPartition = topicPartitionsList(partitionIndex)
// postpone the failure until the send operation, so that requests for other brokers are handled correctly
// 獲取partitionIndex的leaderBrokerId
val leaderBrokerId = brokerPartition.leaderBrokerIdOpt.getOrElse(-1)
// 定義leaderBrokerId需要傳送的資料集合<topic,messages>
//最終傳送messages時是按照 每個leaderBrokerId的每個Topic的每個partitionId傳送的
var dataPerBroker: HashMap[TopicAndPartition, Seq[KeyedMessage[K,Message]]] = null
ret.get(leaderBrokerId) match {
case Some(element) =>
dataPerBroker = element.asInstanceOf[HashMap[TopicAndPartition, Seq[KeyedMessage[K,Message]]]]
case None =>
dataPerBroker = new HashMap[TopicAndPartition, Seq[KeyedMessage[K,Message]]]
ret.put(leaderBrokerId, dataPerBroker)
}
val topicAndPartition = TopicAndPartition(message.topic, brokerPartition.partitionId)
// 定義topic的配個Partition傳送的messages
var dataPerTopicPartition: ArrayBuffer[KeyedMessage[K,Message]] = null
dataPerBroker.get(topicAndPartition) match {
case Some(element) =>
dataPerTopicPartition = element.asInstanceOf[ArrayBuffer[KeyedMessage[K,Message]]]
case None =>
dataPerTopicPartition = new ArrayBuffer[KeyedMessage[K,Message]]
dataPerBroker.put(topicAndPartition, dataPerTopicPartition)
}
// 為brokerid的topic的partition 追加需要傳送的訊息
dataPerTopicPartition.append(message)
}//end for
Some(ret)
}catch { // Swallow recoverable exceptions and return None so that they can be retried.
case ute: UnknownTopicOrPartitionException => warn("Failed to collate messages by topic,partition due to: " + ute.getMessage); None
case lnae: LeaderNotAvailableException => warn("Failed to collate messages by topic,partition due to: " + lnae.getMessage); None
case oe: Throwable => error("Failed to collate messages by topic, partition due to: " + oe.getMessage); None
}
}
private def getPartitionListForTopic(m: KeyedMessage[K,Message]): Seq[PartitionAndLeader] = {
val topicPartitionsList = brokerPartitionInfo.getBrokerPartitionInfo(m.topic, correlationId.getAndIncrement)
debug("Broker partitions registered for topic: %s are %s"
.format(m.topic, topicPartitionsList.map(p => p.partitionId).mkString(",")))
val totalNumPartitions = topicPartitionsList.length
if(totalNumPartitions == 0)
throw new NoBrokersForPartitionException("Partition key = " + m.key)
topicPartitionsList
}
/**
* Retrieves the partition id and throws an UnknownTopicOrPartitionException if
* the value of partition is not between 0 and numPartitions-1
* @param key the partition key
* @param topicPartitionList the list of available partitions
* @return the partition id
*/
private def getPartition(topic: String, key: K, topicPartitionList: Seq[PartitionAndLeader]): Int = {
val numPartitions = topicPartitionList.size
if(numPartitions <= 0)
throw new UnknownTopicOrPartitionException("Topic " + topic + " doesn't exist")
val partition =
if(key == null) {
// If the key is null, we don't really need a partitioner
// So we look up in the send partition cache for the topic to decide the target partition
val id = sendPartitionPerTopicCache.get(topic)
id match {
case Some(partitionId) =>
// directly return the partitionId without checking availability of the leader,
// since we want to postpone the failure until the send operation anyways
partitionId
case None =>
val availablePartitions = topicPartitionList.filter(_.leaderBrokerIdOpt.isDefined)
if (availablePartitions.isEmpty)
throw new LeaderNotAvailableException("No leader for any partition in topic " + topic)
val index = Utils.abs(Random.nextInt) % availablePartitions.size
val partitionId = availablePartitions(index).partitionId
sendPartitionPerTopicCache.put(topic, partitionId)
partitionId
}
} else
partitioner.partition(key, numPartitions)
if(partition < 0 || partition >= numPartitions)
throw new UnknownTopicOrPartitionException("Invalid partition id: " + partition + " for topic " + topic +
"; Valid values are in the inclusive range of [0, " + (numPartitions-1) + "]")
trace("Assigning message of topic %s and key %s to a selected partition %d".format(topic, if (key == null) "[none]" else key.toString, partition))
partition
}
/**
* Constructs and sends the produce request based on a map from (topic, partition) -> messages
*
* @param brokerId the broker that will receive the request
* @param messagesPerTopic the messages as a map from (topic, partition) -> messages
* @return the set (topic, partitions) messages which incurred an error sending or processing
*/
private def send(brokerId: Int, messagesPerTopic: collection.mutable.Map[TopicAndPartition, ByteBufferMessageSet]) = {
if(brokerId < 0) {
warn("Failed to send data since partitions %s don't have a leader".format(messagesPerTopic.map(_._1).mkString(",")))
messagesPerTopic.keys.toSeq
} else if(messagesPerTopic.size > 0) {
val currentCorrelationId = correlationId.getAndIncrement
val producerRequest = new ProducerRequest(currentCorrelationId, config.clientId, config.requestRequiredAcks,
config.requestTimeoutMs, messagesPerTopic)
var failedTopicPartitions = Seq.empty[TopicAndPartition]
try {
val syncProducer = producerPool.getProducer(brokerId)
debug("Producer sending messages with correlation id %d for topics %s to broker %d on %s:%d"
.format(currentCorrelationId, messagesPerTopic.keySet.mkString(","), brokerId, syncProducer.config.host, syncProducer.config.port))
val response = syncProducer.send(producerRequest)
debug("Producer sent messages with correlation id %d for topics %s to broker %d on %s:%d"
.format(currentCorrelationId, messagesPerTopic.keySet.mkString(","), brokerId, syncProducer.config.host, syncProducer.config.port))
if(response != null) {
if (response.status.size != producerRequest.data.size)
throw new KafkaException("Incomplete response (%s) for producer request (%s)".format(response, producerRequest))
if (logger.isTraceEnabled) {
val successfullySentData = response.status.filter(_._2.error == ErrorMapping.NoError)
successfullySentData.foreach(m => messagesPerTopic(m._1).foreach(message =>
trace("Successfully sent message: %s".format(Utils.readString(message.message.payload)))))
}
val failedPartitionsAndStatus = response.status.filter(_._2.error != ErrorMapping.NoError).toSeq
failedTopicPartitions = failedPartitionsAndStatus.map(partitionStatus => partitionStatus._1)
if(failedTopicPartitions.size > 0) {
val errorString = failedPartitionsAndStatus
.sortWith((p1, p2) => p1._1.topic.compareTo(p2._1.topic) < 0 ||
(p1._1.topic.compareTo(p2._1.topic) == 0 && p1._1.partition < p2._1.partition))
.map{
case(topicAndPartition, status) =>
topicAndPartition.toString + ": " + ErrorMapping.exceptionFor(status.error).getClass.getName
}.mkString(",")
warn("Produce request with correlation id %d failed due to %s".format(currentCorrelationId, errorString))
}
failedTopicPartitions
} else
Seq.empty[TopicAndPartition]
} catch {
case t: Throwable =>
warn("Failed to send producer request with correlation id %d to broker %d with data for partitions %s"
.format(currentCorrelationId, brokerId, messagesPerTopic.map(_._1).mkString(",")), t)
messagesPerTopic.keys.toSeq
}
} else {
List.empty
}
}
private def groupMessagesToSet(messagesPerTopicAndPartition: collection.mutable.Map[TopicAndPartition, Seq[KeyedMessage[K,Message]]]) = {
/** enforce the compressed.topics config here.
* If the compression codec is anything other than NoCompressionCodec,
* Enable compression only for specified topics if any
* If the list of compressed topics is empty, then enable the specified compression codec for all topics
* If the compression codec is NoCompressionCodec, compression is disabled for all topics
*/
val messagesPerTopicPartition = messagesPerTopicAndPartition.map { case (topicAndPartition, messages) =>
val rawMessages = messages.map(_.message)
( topicAndPartition,
config.compressionCodec match {
case NoCompressionCodec =>
debug("Sending %d messages with no compression to %s".format(messages.size, topicAndPartition))
new ByteBufferMessageSet(NoCompressionCodec, rawMessages: _*)
case _ =>
config.compressedTopics.size match {
case 0 =>
debug("Sending %d messages with compression codec %d to %s"
.format(messages.size, config.compressionCodec.codec, topicAndPartition))
new ByteBufferMessageSet(config.compressionCodec, rawMessages: _*)
case _ =>
if(config.compressedTopics.contains(topicAndPartition.topic)) {
debug("Sending %d messages with compression codec %d to %s"
.format(messages.size, config.compressionCodec.codec, topicAndPartition))
new ByteBufferMessageSet(config.compressionCodec, rawMessages: _*)
}
else {
debug("Sending %d messages to %s with no compression as it is not in compressed.topics - %s"
.format(messages.size, topicAndPartition, config.compressedTopics.toString))
new ByteBufferMessageSet(NoCompressionCodec, rawMessages: _*)
}
}
}
)
}
messagesPerTopicPartition
}
def close() {
if (producerPool != null)
producerPool.close
}
}
相關推薦
Kafka Producer同步模式傳送message原始碼分析
先把幾個比較重要的方法列出來 // 入口,處理 message及messages def handle(events: Seq[KeyedMessage[K,V]]) //處理序列化的KeyedMessage資料 private def dispatchSeria
Flink on Yarn模式啟動流程原始碼分析
此文已由作者嶽猛授權網易雲社群釋出。 歡迎訪問網易雲社群,瞭解更多網易技術產品運營經驗。 Flink on yarn的啟動流程可以參見前面的文章 Flink on Yarn啟動流程,下面主要是從原始碼角度看下這個實現,可能有的地方理解有誤,請給予指正,多謝。 --> 1.命令列啟動yarn sessi
Odoo 郵件系統設定 Odoo 郵件傳送失敗原始碼分析
Odoo的郵件跟蹤功能是做的非常好的, 可以通過系統郵件,當客戶回覆郵件之後,系統會自動將郵件內容作為訊息與原有記錄進行對應。很多初學者,傳送郵件時常常遇到如下錯誤。郵件投遞失敗 通過SMTP傳送郵件失敗 'None'. SMTPSenderRefused: 501 mail
Android訊息處理機制——Looper、Handler、Message 原始碼分析
原文地址:http://blog.csdn.net/wzy_1988/article/details/38346637 前言 雖然一直在做應用層開發,但是我們組是核心系統BSP,瞭解底層瞭解Android的執行機制還是很有必要的。就應用程式而言,Android系
Flink中非同步AsyncIO的實現 (原始碼分析)
先上張圖整體瞭解Flink中的非同步io 阿里貢獻給flink的,優點就不說了嘛,官網上都有,就是寫庫不會柱塞效能更好 然後來看一下, Flink 中非同步io主要分為兩種 一種是有序Ordered 一種是無序UNordered 主要區別是往下游output的順序(注意這裡順序不是寫庫
設計模式(十四)——模板模式(SpringIOC原始碼分析)
1 豆漿製作問題 編寫製作豆漿的程式,說明如下: 1) 製作豆漿的流程 選材--->新增配料--->浸泡--->放到豆漿機打碎 2) 通過新增不同的配料,可以製作出不同口味的豆漿 3) 選材、浸泡和放到豆漿機打碎這幾個步驟對於製作每種口味的豆漿都是一樣
設計模式(二十三)——策略模式(Arrays原始碼分析)
1 編寫鴨子專案,具體要求如下: 1) 有各種鴨子(比如 野鴨、北京鴨、水鴨等, 鴨子有各種行為,比如 叫、飛行等) 2) 顯示鴨子的資訊 2 傳統方案解決鴨子問題的分析和程式碼實現 1) 傳統的設計方案(類圖) 2)
【搞定Java併發程式設計】第17篇:佇列同步器AQS原始碼分析之共享模式
AQS系列文章: 1、佇列同步器AQS原始碼分析之概要分析 2、佇列同步器AQS原始碼分析之獨佔模式 3、佇列同步器AQS原始碼分析之共享模式 4、佇列同步器AQS原始碼分析之Condition介面、等待佇列 通過上一篇文章的的分析,我們知道獨佔模式獲取同步狀態(或者說獲取鎖
【搞定Java併發程式設計】第16篇:佇列同步器AQS原始碼分析之獨佔模式
AQS系列文章: 1、佇列同步器AQS原始碼分析之概要分析 2、佇列同步器AQS原始碼分析之獨佔模式 3、佇列同步器AQS原始碼分析之共享模式 4、佇列同步器AQS原始碼分析之Condition介面、等待佇列 本文主要講解佇列同步器AQS的獨佔模式:主要分為獨佔式同步狀態獲取
原始碼分析Kafka之Producer
我們來看看訊息類ProducerRecord有哪些屬性: private final String topic;//主題 private final Integer partition;//分割槽 private final Headers headers;//頭 private final K ke
kafka原始碼分析之producer
Producer的client端 示例程式碼 Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("client.id", "De
memcached 原始碼分析——半同步、半非同步模式
memcached 是目前應用非常廣泛的快取伺服器,採用的是半同步、半非同步模式。 半同步、半非同步 半同步/半非同步模型的基礎設施:主執行緒建立多個子執行緒(這些子執行緒也稱為worker執行緒),每一個執行緒都維持自己的事件迴圈,即每個執行緒都有自己
分散式訊息佇列 RocketMQ 原始碼分析 —— Message 順序傳送與消費
本文主要基於 RocketMQ 4.0.x 正式版 1. 概述 建議前置閱讀內容: 當然對 Message 傳送與消費已經有一定了解的同學,可以選擇跳過。 RocketMQ 提供了兩種順序級別: 普通順序訊息 :Producer 將相關聯的訊息傳送到相同
RTMPdump(libRTMP) 原始碼分析 8: 傳送訊息(Message)
=====================================================RTMPdump(libRTMP) 原始碼分析系列文章:=====================================================函式呼叫
Kafka原始碼分析及圖解原理之Producer端
一.前言 任何訊息佇列都是萬變不離其宗都是3部分,訊息生產者(Producer)、訊息消費者(Consumer)和服務載體(在Kafka中用Broker指代)。那麼本篇主要講解Producer端,會有適當的圖解幫助理解底層原理。 一.開發應用 首先介紹一下開發應用,如何構建一個KafkaP
原始碼分析 Kafka 訊息傳送流程(文末附流程圖)
溫馨提示:本文基於 Kafka 2.2.1 版本。本文主要是以原始碼的手段一步一步探究訊息傳送流程,如果對原始碼不感興趣,可以直接跳到文末檢視訊息傳送流程圖與訊息傳送本地快取儲存結構。 從上文 初識 Kafka Producer 生產者,可以通過 KafkaProducer 的 send 方法傳送訊息,s
zigbee 之ZStack-2.5.1a原始碼分析(三)無線資料傳送和接收
前面說過SampleApp_Init和SampleApp_ProcessEvent是我們重點關注的函式,接下來分析無線傳送和接收相關的程式碼: 在SampleApp_ProcessEvent函式中: if ( events & SYS_EVENT_MSG ) { &nbs
Kafka Producer生產資料時資料丟失分析
今天在測試 Storm 程式過程中,想通過執行在 idea 的 Kafka Producer 生產一條資料來驗證一下 Storm 程式,發現居然沒有成功將資料生產到 Kafka 叢集中,於是進行了一番測試,最終找到了原因! 注:下面程式測試中使用的 kafka 的版本為 0.1
Android Doze模式原始碼分析
轉自:https://www.cnblogs.com/l2rf/p/6373794.html 科技的仿生學無處不在,給予我們啟發。為了延長電池是使用壽命,google從蛇的冬眠中得到體會,那就是在某種情況下也讓手機進入類冬眠的情況,從而引入了今天的主題,Doze模式,Doze中
kafka學習小結(springboot2+kafka組成叢集模式3同步非同步模式)
接著上一篇補充 官網上關於這一塊迷迷糊糊的看不懂,自己總結了下其中的差異: 我們一般沒做特殊處理的就是同步模式,生產者傳送訊息,然後交給消費者,這裡面我們也可以對訊息的結果進行處理,防止訊息丟失 kafkademo中,修改REST介面如下: