Kafka 生產者Producer
Kafka 生產者Producer
原理
producer和consumer過去直接與Zookeeper連線,以獲得這些資訊。
現在Kafka已經脫離了這種耦合,從0.8版和0.9版開始,客戶端直接從Kafka brokers那裡獲取元資料資訊,叢集中的每個 broker 都會快取所有主題的分割槽副本資訊,元資料同步可以通過配置metadata.max.age.ms引數(預設五分鐘)定時重新整理元資料(注:如果在定時請求的時間間隔內發生的分割槽副本的選舉,則意味著原來快取的資訊可能已經過時了,此時還有可能會收到 Not a Leader for Partition 的錯誤響應,這種情況下客戶端會再次向Controller【與Zookeeper資料一致性由Controller完成】發出元資料請求,然後重新整理本地快取),有了元資料資訊後,客戶端就知道了leader副本所在的 broker,之後直接將讀寫請求傳送給對應的 broker 即可。
如下圖
傳送訊息流程:
- 傳送到kafka的資料會封裝為ProducerRecord物件,包含topic、partition、key、value資訊;
- 呼叫send()方法後,將資料序列化為位元組陣列,分割槽器Partitioner會根據ProducerRecord 物件的鍵來計算一個分割槽
- 當訊息達到一個批次設定的量(訊息放在緩衝區中),通過網路傳送到不同的主題,不同的分割槽;
- 如果訊息成功寫入 Kafka,就返回一 個
RecordMetaData
物件,它包含了主題和分割槽資訊,以及記錄在分割槽裡的偏移量。如果寫入失敗, 則告知生產者嘗試重新發送訊息,達到最大重試次數就丟擲異常,其中重試次數可以在配置message.send.max.retries中指定。
下面看一下詳細流程圖:
整個生產者客戶端由兩個執行緒協調執行,這兩個執行緒分別為主執行緒和 Sender 執行緒(傳送執行緒)。
-
主執行緒中由 KafkaProducer 建立訊息,然後通過攔截器、序列化器和分割槽器的作用之後快取到訊息累加器(RecordAccumulator,也稱為訊息收集器)中。
-
Sender 執行緒負責從 RecordAccumulator 中獲取訊息並將其傳送到 Kafka 中。
攔截器
生產者攔截器既可以用來在訊息傳送前做一些準備工作,比如按照某個規則過濾不符合要求的訊息、修改訊息的內容等,也可以用來在傳送回撥邏輯前做一些定製化的需求,比如統計類工作。
生產者攔截器的使用也很方便,主要是自定義實現 org.apache.kafka.clients.producer. ProducerInterceptor
ProducerInterceptor
介面中包含3個方法:
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);
public void onAcknowledgement(RecordMetadata metadata, Exception exception);
public void close();
KafkaProducer 在將訊息序列化和計算分割槽之前會呼叫生產者攔截器的 onSend() 方法來對訊息進行相應的定製化操作。一般來說最好不要修改訊息 ProducerRecord 的 topic、key 和 partition 等資訊。
KafkaProducer 會在訊息被應答(Acknowledgement)之前或訊息傳送失敗時呼叫生產者攔截器的 onAcknowledgement() 方法,優先於使用者設定的 Callback 之前執行。這個方法執行在 Producer 的I/O執行緒中,所以這個方法中實現的程式碼邏輯越簡單越好,否則會影響訊息的傳送速度。
close() 方法主要用於在關閉攔截器時執行一些資源的清理工作。
序列化器
生產者需要用序列化器(Serializer)把物件轉換成位元組陣列才能通過網路傳送給 Kafka。而在對側,消費者需要用反序列化器(Deserializer)把從 Kafka 中收到的位元組陣列轉換成相應的物件。
生產者使用的序列化器和消費者使用的反序列化器是需要一一對應的,如果生產者使用了某種序列化器,比如 StringSerializer,而消費者使用了另一種序列化器,比如 IntegerSerializer,那麼是無法解析出想要的資料的。
序列化器都需要實現org.apache.kafka.common.serialization.Serializer
介面,此介面有3個方法:
public void configure(Map<String, ?> configs, boolean isKey)
public byte[] serialize(String topic, T data)
public void close()
configure() 方法用來配置當前類,在建立 KafkaProducer 例項的時候呼叫的,主要用來確定編碼型別。serialize() 方法用來執行序列化操作。而 close() 方法用來關閉當前的序列化器。
分割槽器
訊息經過序列化之後就需要確定它發往的分割槽,如果訊息 ProducerRecord 中指定了 partition 欄位,那麼就不需要分割槽器的作用,因為 partition 代表的就是所要發往的分割槽號。
如果訊息 ProducerRecord 中沒有指定 partition 欄位,那麼就需要依賴分割槽器,根據 key 這個欄位來計算 partition 的值。分割槽器的作用就是為訊息分配分割槽。
Kafka 中提供的預設分割槽器是 org.apache.kafka.clients.producer.internals.DefaultPartitioner
,它實現了 org.apache.kafka.clients.producer.Partitioner
介面,這個介面中定義了2個方法,具體如下所示。
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);
public void close();
-
partition() 方法用來計算分割槽號,返回值為 int 型別。partition() 方法中的引數分別表示主題、鍵、序列化後的鍵、值、序列化後的值,以及叢集的元資料資訊,通過這些資訊可以實現功能豐富的分割槽器。
-
close() 方法在關閉分割槽器的時候用來回收一些資源。
自定義的分割槽器,只需同 DefaultPartitioner 一樣實現 Partitioner 介面即可。由於每個分割槽下的訊息處理都是有順序的,我們可以利用自定義分割槽器實現在某一系列的key都發送到一個分割槽中,從而實現有序消費。
生產者在向broker傳送訊息時是怎麼確定向哪一個broker傳送訊息?
- 生產者客戶端會向任一個broker傳送一個元資料請求(MetadataRequest),獲取到每一個分割槽對應的leader資訊,並快取到本地。
- 生產者在傳送訊息時,會指定partion或者通過key得到到一個partion,然後根據partion從快取中獲取相應的leader資訊。
RecordAccumulator 訊息累加器
RecordAccumulator 主要用來快取訊息以便 Sender 執行緒可以批量傳送,進而減少網路傳輸的資源消耗以提升效能。
主執行緒中傳送過來的訊息都會被追加到 RecordAccumulator 的某個雙端佇列(Deque)中,在 RecordAccumulator 的內部為每個分割槽都維護了一個雙端佇列。
訊息寫入快取時,追加到雙端佇列的尾部;Sender 讀取訊息時,從雙端佇列的頭部讀取。
InFlightRequests
請求在從 Sender 執行緒發往 Kafka 之前還會儲存到 InFlightRequests 中,InFlightRequests 儲存物件的具體形式為 Map<NodeId, Deque>,它的主要作用是快取了已經發出去但還沒有收到響應的請求(NodeId 是一個 String 型別,表示節點的 id 編號)。
Sender 從 RecordAccumulator 中獲取快取的訊息之後,會進一步將原本<分割槽, Deque< ProducerBatch>> 的儲存形式轉變成 <Node, List< ProducerBatch> 的形式,其中 Node 表示 Kafka 叢集的 broker 節點。
KafkaProducer 要將此訊息追加到指定主題的某個分割槽所對應的 leader 副本之前,首先需要知道主題的分割槽數量,然後經過計算得出(或者直接指定)目標分割槽,之後 KafkaProducer 需要知道目標分割槽的 leader 副本所在的 broker 節點的地址、埠等資訊才能建立連線,最終才能將訊息傳送到 Kafka。
所以這裡需要一個轉換,對於網路連線來說,生產者客戶端是與具體的 broker 節點建立的連線,也就是向具體的 broker 節點發送訊息,而並不關心訊息屬於哪一個分割槽。
客戶端快取池技術
當我們應用程式呼叫kafka客戶端 producer傳送訊息的時候,在kafka客戶端內部,會把屬於同一個topic分割槽的訊息先彙總起來,形成一個batch。真正發往kafka伺服器的訊息都是以batch為單位的。
kafka是用java語言編寫的(新版本大部分都是用java實現的了),如果是使用new一個空間然後賦值給一個引用,釋放的時候把引用置為null等JVM GC處理就可以了。在併發量比較高的時候就會頻繁的進行GC。我們都知道GC的時候有個stop the world,儘管最新的GC技術這個時間已經非常短,依然有可能成為生產環境的效能瓶頸。
針對上述容易出現GC的問題,Kafka客戶端內部實現了一個非常優秀的機制,就是 緩衝池的機制(類似於資料庫連線池,執行緒池等的池化技術)。
即首先開闢初始化一些記憶體塊做為緩衝池,每個batch其實都對應了緩衝池中的一個記憶體空間,傳送完訊息之後,batch不再使用了,此時這個batch底層的記憶體空間不是交給JVM去垃圾回收,而是把記憶體塊歸還給緩衝池。
如果一個緩衝池裡的記憶體資源都佔滿了,暫時沒有記憶體塊了,怎麼辦呢?很簡單,阻塞寫入,不停的等待,直到有記憶體塊釋放出來,然後再繼續寫入訊息。
緩衝池的機制原理如下圖: