1. 程式人生 > 實用技巧 >Kafka Producer

Kafka Producer

文章目錄

1. Kafka Producer

0.9.x版本中,社群正式使用Java版本的producer替換了原Scala版本的producer。新版本的producer的主要入口類是org.apache.kafka.clients.producer.KafkaProducer,而非原來的kafka.producer.Producer

新版的producer不再依賴zookeeper,甚至不需要和zookeeper叢集進行直接互動,降低了維護成本,也簡化了部署producer應用的開銷成本。

1.1. Producer概覽

Producer的首要功能就是向某個topic的某個分割槽傳送一條訊息,所以首先需要確認到底要向topic的哪個分割槽寫入訊息,這就是分割槽器(partitioner)要做的事情。

Kafka Producer提供了一個預設的分割槽器。對於每條待發送的訊息而言,如果該訊息指定了key,那麼該partitioner會根據Key的雜湊值來選擇目標分割槽;若這條訊息沒有指定key,則partitioner使用輪詢的方式確認目標分割槽,這樣可以最大限度地確保訊息在所有分割槽上的均勻性。
當然producer的API賦予了使用者自行指定目標分割槽的權利,即使用者可以在訊息傳送時跳過partitioner直接指定要傳送到的分割槽。另外,producer也允許使用者實現自定義的分割槽策略而非使用預設的partitioner,這樣使用者可以靈活的根據自身業務需求確定不同的分割槽策略。

1.2. ProducerRecord

一個ProducerRecord物件封裝了一條待發送的訊息。ProducerRecord由5個欄位構成,分別如下(0.10.2.0):

  • String topic:該訊息所屬的topic
  • Integer partition:該訊息所屬的partition
  • K key:訊息key,同一個key的訊息會被劃分到同一個paritition中
  • V value:訊息體,一般不為空,如果為空則表示特定的訊息(墓碑訊息)
  • Long timestamp:訊息時間戳

0.11.x版本中引入了Headers headers屬性(訊息頭部),用來設定一些與應用無關的資訊。

ProducerRecord允許使用者在建立訊息物件的時候直接指定要傳送的分割槽,這樣producer後續傳送該訊息時可以直接傳送到指定分割槽,而不用先通過Partitioner計算目標分割槽。另外,還可以直接指定訊息的時間戳,但是一定要慎重使用這個功能,因為它很可能會令時間戳索引機制失效。

1.3. RecordMetadata

RecordMetadata類表示kafka伺服器端返回給客戶端的訊息的元資料資訊,包含如下內容:

  • long offset:訊息在分割槽日誌中的位移資訊。
  • long timestamp:訊息時間戳
  • TopicPartition topicPartition:所屬topic的partition。
  • long checksum:訊息CRC32碼
  • int serializedKeySize:序列化後的訊息key位元組數
  • int serializedValueSize:序列化後的訊息value位元組數

1.4. 傳送訊息

建立生產者例項和構建訊息例項後,就可以傳送(KafkaProducer#send)訊息了。
傳送訊息主要有三種模式:

  • 發後即忘(fire-and-forget)
  • 同步(sync)
  • 非同步(async)。

KafkaProducer#send方法本身就是非同步的,send方法返回Future型別。

同步傳送和非同步傳送其實就是通過Java的Feture來區分的,呼叫Futrue#get方法無限等待結果返回,即實現同步傳送的效果。
非同步傳送的方式,一般是在send方法裡指定一個CallBack的回撥類,實現方法是onCompletion,Kafka在返回響應時呼叫該回調函式來實現非同步的傳送確認。使用CallBack的方式非常簡潔,kafka有響應時就會回撥,要麼傳送成功,要麼丟擲異常。
對於同一個分割槽而言,如果訊息record1於record2之前傳送,那麼kafkaProducer就可以保證對應的callback1在callback2之前呼叫。也就是說,回撥函式的呼叫可以保證分割槽有序。

疑問:send方法的返回型別就是Future,而Future本身就可以用作非同步的邏輯處理。這樣做不是不行,只不過Future裡的get方法在何時呼叫,以及怎麼呼叫都是需要面對的問題,訊息不停的傳送,那麼諸多訊息對應的Future物件的處理難免會引起程式碼處理邏輯的混亂。

KafkaProducer中一般會發生兩種型別的異常,可重試異常不可重試異常
所有的可重試異常都繼承自org.apache.kafka.common.errors.RetriableException抽象類。理論上未繼承自RetriableException類的其他異常都屬於不可重試異常。

常見的可重試異常有:NetworkException(網路異常)、LeaderNotAvailableException(分割槽的leader副本不可用,這個異常發發生在leader副本下線而新的leader副本選舉完成之前)、NotControException(controller當前不可用)、UnknownTopicOrPartitionExceptionNotEnoughReplicasExceptionNotCoordinatorException等。
對於可重試異常,如果配置了 retries 引數,預設為0,那麼只要在規定的重試次數內自行恢復了,就不會拋異常,否則拋異常。

1.4.1. 工作流程

kafka生產者客戶端的整體架構使用者首先構建待發送的訊息物件ProducerRecord,然後呼叫KafkaProducer#send方法進行傳送。

整個生產者客戶端有兩個執行緒協調執行,這兩個執行緒分別主執行緒(即啟動KafkaProducer的執行緒)和Sender執行緒(傳送執行緒,自KafkaProducer建立後就一直都在執行著)。

在主執行緒中由KafkaProducer建立訊息,然後通過攔截器(Interceptor)序列化器(Serializer)分割槽器(Partitioner)的作用之後,將訊息追加寫入記憶體中到訊息累加器(RecordAccumulator,也稱為訊息收集器)中。此時KafkaProduer#send方法返回,即send方法僅僅把訊息放入訊息累加器中。
Sender執行緒負責從RecordAccumulator中獲取訊息並將訊息傳送到對應的broker中,並等待broker傳送response回來,完成真正的訊息傳送邏輯。

訊息累加器(RecordAccumulator)主要用來緩衝訊息以便Sender執行緒可以批量傳送,進而減少網路傳輸的資源消耗以提升效能。RecordAccumulator空間的大小可以通過生產者客戶端引數buffer.memory配置,預設值為33554432,即32M。如果生產者傳送訊息的速度超過傳送到伺服器的速度,則會導致生產者空間不足,這個時候KafkaProducer#send方法呼叫要麼被堵塞,要麼拋異常,這個取決於生產者客戶端引數max.block.ms的配置,預設值為60000,即60秒。

訊息累加器(RecordAccumulator)的內部為每個分割槽維護了一個雙端對列(Deque),佇列中的內容就是ProducerBatch。主執行緒中發過來的訊息都會被追加到訊息累加器(RecordAccumulator)的某個對列。訊息寫入RecordAccumulator時,追加到雙端對列的尾部,Sender讀取訊息時,從雙端對列的頭部讀取。
注意:ProducerBatch不是ProducerRecord,ProducerBatch中可以包含一個至多個ProducerRecord。即ProducerRecord是生產者中建立的訊息,而ProducerBatch是指一個訊息批次,這樣可以使位元組的使用更加緊湊。

ProducerBatch的大小和生產者引數buffer.size有著密切的關係,預設值是16384,即16KB。
當ProducerBatch滿了的時候,Sender會發送該ProducerBatch中的訊息。不過Sender並不總是等待ProducerBatch滿了才發訊息,很有可能當ProducerBatch還有很多空閒空間時就傳送該ProducerBatch。這實際上是吞吐量與延時之間的權衡。生產者引數linger.ms及時控制訊息傳送延時行為的,預設為0,即表示訊息需要被立即傳送,無須關心ProducerBatch是否已被填滿。

從上述過程可以看出新版本producer傳送事件完全是非同步過程。

2. 引數配置

每個引數都在org.apache.kafka.clients.producer.ProducerConfig中有對應的名稱

2.1. 必填引數

bootstrap.servers

key.serializervalue.serializer:broker端接收的訊息必須以位元組陣列(byte[])的形式存在。

2.2. 其他引數

acks
ack引數有3種類型的值,都是字串型別。

在生產者客戶端傳送訊息的時候將acks引數設定為-1,那麼就意味著需要等待ISR集合中的所有副本都確認收到消之後才能正確的收到響應的結果,或者捕獲超時異常。
如果在一定的時間內,follower副本沒有能夠完全拉取到leader的訊息,那麼就需要返回超時異常給客戶端。生產者請求的超時時間由引數request.timeout.ms配置,預設值為30000ms。