1. 程式人生 > >Kafka生產者producer簡要總結

Kafka生產者producer簡要總結

Kafka producer在設計上要比consumer簡單,不涉及複雜的組管理操作,每個producer都是獨立進行工作的,與其他producer例項之間沒有關聯。Producer的主要功能就是向某個topic的某個分割槽傳送訊息,所以首先要確認向topic的哪個分割槽寫入訊息——即分割槽器(partitioner)的功能。Kafka producer提供了一個預設的分割槽器。對於每條待發送的訊息而言,如果該訊息指定了key,則partitioner會根據key的雜湊值來選擇目標分割槽,將具有相同key的所有訊息都路由到相同的分割槽中;若該訊息未指定key,則partitioner使用輪詢的方式確認目標分割槽。另外producer的API賦予了使用者自行指定目標分割槽的權力,即使用者可以在訊息傳送時跳過partitioner直接指定到要傳送到的分割槽。

另外producer也允許使用者實現自定義的分割槽策略而非使用預設的partitioner。

確認分割槽後,producer要做的第二件事是尋找該分割槽對應的leader,也就是該分割槽leader副本所在的Kafka broker。每個topic分割槽都由若干個副本組成,其中一個副本充當leader角色,只有leader能響應clients傳送的請求,剩下的副本中有一部分副本會與leader副本保持同步,即所謂的ISR(In-Sync Replicas, 副本同步佇列)。因此,在傳送訊息時,producer可以有多種選擇來實現訊息傳送,例如不等待任何副本的響應便返回成功,或者只是等待leader副本響應寫入操作後再返回成功。

補充副本概念:

副本(Replication)

同一個partition可能會有多個replication(對應server.properties 配置中的 default.replication.factor=N)。沒有replication的情況下,一旦broker 宕機,其上所有 patition 的資料都不可被消費,同時producer也不能再將資料存於其上的patition。引入replication之後,同一個partition可能會有多個replication,而這時需要在這些replication之間選出一個leader,producer和consumer只與這個leader互動,其它replication作為follower從leader 中複製資料。

Producer寫入流程

1)producer先從zookeeper的 “/brokers/…/state”節點找到該partition的leader

2)producer將訊息傳送給該leader

3)leader將訊息寫入本地log

4)followers從leader pull訊息,寫入本地log後向leader傳送ACK

5)leader收到所有ISR中的replication的ACK後,增加HW(high watermark,最後commit 的offset)並向producer傳送ACK

Producer傳送訊息

在Java版程式碼中,producer首先使用一個執行緒(使用者主執行緒,也就是使用者啟動producer的執行緒)將待發送的訊息封裝進一個ProducerRecord類例項,然後將其序列化之後傳送給partitioner,再由後者確定了目標分割槽後一同傳送到位於producer程式中的一塊記憶體緩衝池中。而producer的另一個工作執行緒(I/O傳送執行緒,也稱Sender執行緒)則負責實時地從該緩衝區中提取準備就緒的訊息封裝進一個批次(batch),統一發送給對應的broker。

Kafka producer傳送訊息的主方法是send方法,producer在底層完全實現了非同步化傳送,並且通過Java提供的Future同時實現了同步傳送和非同步傳送+回撥(Callback)(預設非同步)兩種傳送方式。最後producer程式結束時需要關閉producer。

acks引數(用來平衡吞吐量與可靠性):

producers可以一步的並行向kafka傳送訊息,但是通常producer在傳送完訊息之後會得到一個響應,返回的是offset值(metadata)或者傳送過程中遇到的錯誤(exception)。這其中有個非常重要的引數“request.required.acks”,這個引數決定了producer要求leader partition收到確認的副本個數:

如果acks設定為0,表示producer不會等待broker的響應,所以,producer無法知道訊息是否發生成功,這樣有可能導致資料丟失,但同時,acks值為0會得到最大的系統吞吐量。一般這種情況下允許訊息丟失,如統計伺服器日誌等;

若acks設定為1,表示producer會在leader partition收到訊息時得到broker的一個確認,這樣會有更好的可靠性,因為客戶端會等待知道broker確認收到訊息。此時,當傳送訊息時,leader broker僅將該訊息寫入本地日誌,然後便傳送響應結果給producer,而無需等待ISR中其他副本寫入該訊息;

若設定為-1或者all,producer會在所有備份的partition收到訊息時得到broker的確認,這個設定可以得到最高的可靠性保證。此時,當傳送訊息時,leader broker不僅會將訊息寫入本地日誌,同時還會等待ISR中所有其他副本都成功寫入它們各自的本地日誌後,才傳送響應結果給producer。

buffer.memory引數:

指定producer端用於快取訊息的緩衝區的大小,單位是位元組,預設32M,採用非同步傳送訊息的架構,Java版Producer啟動時會首先建立一塊記憶體緩衝區用於儲存待發送訊息,然後由另一個專屬執行緒負責從緩衝區中讀取訊息執行真正的傳送,這部分記憶體空間的大小就是由buffer.memory引數指定。該引數指定的記憶體大小几乎可以認為是producer程式使用的記憶體大小,若producer程式要給很多分割槽傳送訊息,那麼就需要仔細設定該引數防止過小的記憶體緩衝區降低了producer程式整體的吞吐量。

batch.size引數:

batch.size是producer調優吞吐量與延時效能最重要的引數之一,producer會將發往同一分割槽的多條訊息封裝進一個batch中,當batch滿了或者batch沒滿(linger.ms引數=0,預設訊息立即傳送)producer會發送該batch。

Batch小,吞吐量低,延時低,Batch大,吞吐量高,延時高

Batch.size預設為16KB。

request.timeout.ms引數:

當producer傳送請求給broker後,broker需要在規定時間範圍內將處理結果返回給producer。超時時間預設30s。

自定義分割槽機制

  1. 在producer程式中建立一個類,實現org.apache.kafka.clients.producer.Partitioner介面,主要分割槽邏輯在Partitioner.partition中實現。入參(topic, key, keyBytes, value, valueBytes, Cluster);
  2. 在用於構造KafkaProducer的Properties物件中設定partitioner.class引數。

序列化

網路上傳送資料都是以位元組的形式,kafka也不例外,Apache Kafka支援使用者給broker傳送各種型別訊息,如字串、整數、陣列或任意物件型別,序列化器serializer負責在producer傳送前將訊息轉換成位元組陣列;解序列化器deserializer則用於將consumer接收到的位元組陣列轉換成相應的物件。