Kafka Producer
文章目錄
1. Kafka Producer
在0.9.x
版本中,社群正式使用Java版本的producer替換了原Scala版本的producer。新版本的producer的主要入口類是org.apache.kafka.clients.producer.KafkaProducer
,而非原來的kafka.producer.Producer
。
新版的producer不再依賴zookeeper,甚至不需要和zookeeper叢集進行直接互動,降低了維護成本,也簡化了部署producer應用的開銷成本。
1.1. Producer概覽
Producer的首要功能就是向某個topic的某個分割槽傳送一條訊息,所以首先需要確認到底要向topic的哪個分割槽寫入訊息,這就是分割槽器(partitioner)
要做的事情。
Kafka Producer提供了一個預設的分割槽器。對於每條待發送的訊息而言,如果該訊息指定了key,那麼該partitioner會根據Key的雜湊值來選擇目標分割槽;若這條訊息沒有指定key,則partitioner使用輪詢的方式確認目標分割槽,這樣可以最大限度地確保訊息在所有分割槽上的均勻性。
當然producer的API賦予了使用者自行指定目標分割槽的權利,即使用者可以在訊息傳送時跳過partitioner直接指定要傳送到的分割槽。另外,producer也允許使用者實現自定義的分割槽策略而非使用預設的partitioner,這樣使用者可以靈活的根據自身業務需求確定不同的分割槽策略。
1.2. ProducerRecord
一個ProducerRecord
物件封裝了一條待發送的訊息。ProducerRecord由5個欄位構成,分別如下(0.10.2.0):
- String topic:該訊息所屬的topic
- Integer partition:該訊息所屬的partition
- K key:訊息key,同一個key的訊息會被劃分到同一個paritition中
- V value:訊息體,一般不為空,如果為空則表示特定的訊息(墓碑訊息)
- Long timestamp:訊息時間戳
在0.11.x
版本中引入了Headers headers屬性(訊息頭部),用來設定一些與應用無關的資訊。
ProducerRecord允許使用者在建立訊息物件的時候直接指定要傳送的分割槽,這樣producer後續傳送該訊息時可以直接傳送到指定分割槽,而不用先通過Partitioner計算目標分割槽。另外,還可以直接指定訊息的時間戳,但是一定要慎重使用這個功能,因為它很可能會令時間戳索引機制失效。
1.3. RecordMetadata
RecordMetadata
類表示kafka伺服器端返回給客戶端的訊息的元資料資訊,包含如下內容:
- long offset:訊息在分割槽日誌中的位移資訊。
- long timestamp:訊息時間戳
- TopicPartition topicPartition:所屬topic的partition。
- long checksum:訊息CRC32碼
- int serializedKeySize:序列化後的訊息key位元組數
- int serializedValueSize:序列化後的訊息value位元組數
1.4. 傳送訊息
建立生產者例項和構建訊息例項後,就可以傳送(KafkaProducer#send
)訊息了。
傳送訊息主要有三種模式:
- 發後即忘(fire-and-forget)
- 同步(sync)
- 非同步(async)。
KafkaProducer#send
方法本身就是非同步的,send方法返回Future型別。
同步傳送和非同步傳送其實就是通過Java的Feture來區分的,呼叫Futrue#get
方法無限等待結果返回,即實現同步傳送的效果。
非同步傳送的方式,一般是在send方法裡指定一個CallBack的回撥類,實現方法是onCompletion,Kafka在返回響應時呼叫該回調函式來實現非同步的傳送確認。使用CallBack的方式非常簡潔,kafka有響應時就會回撥,要麼傳送成功,要麼丟擲異常。
對於同一個分割槽而言,如果訊息record1於record2之前傳送,那麼kafkaProducer就可以保證對應的callback1在callback2之前呼叫。也就是說,回撥函式的呼叫可以保證分割槽有序。
疑問:send方法的返回型別就是Future,而Future本身就可以用作非同步的邏輯處理。這樣做不是不行,只不過Future裡的get方法在何時呼叫,以及怎麼呼叫都是需要面對的問題,訊息不停的傳送,那麼諸多訊息對應的Future物件的處理難免會引起程式碼處理邏輯的混亂。
KafkaProducer中一般會發生兩種型別的異常,可重試異常和不可重試異常。
所有的可重試異常都繼承自org.apache.kafka.common.errors.RetriableException
抽象類。理論上未繼承自RetriableException類的其他異常都屬於不可重試異常。
常見的可重試異常有:NetworkException
(網路異常)、LeaderNotAvailableException
(分割槽的leader副本不可用,這個異常發發生在leader副本下線而新的leader副本選舉完成之前)、NotControException
(controller當前不可用)、UnknownTopicOrPartitionException
、NotEnoughReplicasException
、NotCoordinatorException
等。
對於可重試異常,如果配置了 retries
引數,預設為0,那麼只要在規定的重試次數內自行恢復了,就不會拋異常,否則拋異常。
1.4.1. 工作流程
使用者首先構建待發送的訊息物件ProducerRecord
,然後呼叫KafkaProducer#send
方法進行傳送。
整個生產者客戶端有兩個執行緒協調執行,這兩個執行緒分別主執行緒(即啟動KafkaProducer的執行緒)和Sender執行緒(傳送執行緒,自KafkaProducer建立後就一直都在執行著)。
在主執行緒中由KafkaProducer建立訊息,然後通過攔截器(Interceptor)
、序列化器(Serializer)
和分割槽器(Partitioner)
的作用之後,將訊息追加寫入記憶體中到訊息累加器(RecordAccumulator,也稱為訊息收集器)
中。此時KafkaProduer#send方法返回,即send方法僅僅把訊息放入訊息累加器中。
Sender執行緒負責從RecordAccumulator中獲取訊息並將訊息傳送到對應的broker中,並等待broker傳送response回來,完成真正的訊息傳送邏輯。
訊息累加器(RecordAccumulator)主要用來緩衝訊息以便Sender執行緒可以批量傳送,進而減少網路傳輸的資源消耗以提升效能。RecordAccumulator空間的大小可以通過生產者客戶端引數buffer.memory
配置,預設值為33554432,即32M。如果生產者傳送訊息的速度超過傳送到伺服器的速度,則會導致生產者空間不足,這個時候KafkaProducer#send方法呼叫要麼被堵塞,要麼拋異常,這個取決於生產者客戶端引數max.block.ms
的配置,預設值為60000,即60秒。
訊息累加器(RecordAccumulator)的內部為每個分割槽維護了一個雙端對列(Deque),佇列中的內容就是ProducerBatch
。主執行緒中發過來的訊息都會被追加到訊息累加器(RecordAccumulator)的某個對列。訊息寫入RecordAccumulator時,追加到雙端對列的尾部,Sender讀取訊息時,從雙端對列的頭部讀取。
注意:ProducerBatch不是ProducerRecord,ProducerBatch中可以包含一個至多個ProducerRecord。即ProducerRecord是生產者中建立的訊息,而ProducerBatch是指一個訊息批次,這樣可以使位元組的使用更加緊湊。
ProducerBatch的大小和生產者引數buffer.size
有著密切的關係,預設值是16384,即16KB。
當ProducerBatch滿了的時候,Sender會發送該ProducerBatch中的訊息。不過Sender並不總是等待ProducerBatch滿了才發訊息,很有可能當ProducerBatch還有很多空閒空間時就傳送該ProducerBatch。這實際上是吞吐量與延時之間的權衡。生產者引數linger.ms
及時控制訊息傳送延時行為的,預設為0,即表示訊息需要被立即傳送,無須關心ProducerBatch是否已被填滿。
從上述過程可以看出新版本producer傳送事件完全是非同步過程。
2. 引數配置
每個引數都在org.apache.kafka.clients.producer.ProducerConfig
中有對應的名稱
2.1. 必填引數
bootstrap.servers
key.serializer
和value.serializer
:broker端接收的訊息必須以位元組陣列(byte[])的形式存在。
2.2. 其他引數
acks
ack引數有3種類型的值,都是字串型別。
在生產者客戶端傳送訊息的時候將acks引數設定為-1,那麼就意味著需要等待ISR集合中的所有副本都確認收到消之後才能正確的收到響應的結果,或者捕獲超時異常。
如果在一定的時間內,follower副本沒有能夠完全拉取到leader的訊息,那麼就需要返回超時異常給客戶端。生產者請求的超時時間由引數request.timeout.ms
配置,預設值為30000ms。