1. 程式人生 > 其它 >Kafka從入門到放棄(三) —— 詳說生產者

Kafka從入門到放棄(三) —— 詳說生產者

上一篇對Kafka做了簡單介紹,還沒看的朋友可以點選下方連結。

Kafka從入門到放棄(一) —— 初識別Kafka

訊息中介軟體必須與生產者和消費者一起存在才有意義,這次先來聊聊Kafka的生產者。

在開始之前,先了解一下訊息在Kafka中是如何儲存的,如下圖所示,一般我們稱那些數字為offset(偏移量)一般來說,訊息在持久化後應該是有序的,這裡的有序是針對分割槽的,而不是針對 Topic 的。

而且,生產者寫入訊息時,是往 Leader 寫入,Follower 從 Leader 進行復制。

別看生產者只是發訊息,呼叫 API 也是幾行程式碼,但它的學問多著呢。為更好地理解後面的內容,請帶著以下問題閱讀:

  • 生產者傳送訊息前會做什麼準備?
  • 生產者傳送訊息怎麼保證資料不丟失?
  • 生產者傳送訊息如何保證訊息有序性?
  • 生產者傳送的訊息是怎麼分割槽的?

生產者設計了一個緩衝池,可以通過修改 buffer.memory 引數設定其大小;緩衝池內又有多個 Batch,當有多個訊息需要寫入同一個分割槽時,訊息會先往 Batch 裡面寫入,等訊息達到 batch.size 的時候開始傳送,如果 batch.size 設定太小,生產者會頻繁傳送訊息,帶來更多的網路開銷;

有些讀者可能有這個疑問,如果有時候生產者生產的訊息很少很小,一直達不到批次的大小,而消費者對時效性要求比較高,這種情況怎麼辦?其實,預設情況下,只要有執行緒,即使批次裡只有一條訊息,也會直接傳送出去。但是,可以設定引數 linger.ms 來指定等待訊息加入批次的時間,只要當批次訊息達到 batch.size 或者等待時間達到 linger.ms 的時候,訊息就會發送。

除此之外,生產者可以對訊息進行壓縮,以降低網路開銷以及儲存開銷,通過設定引數 compression.type 設定相應的壓縮演算法。

先拋開 Kafka 現有確認機制,假如一條訊息發到對應分割槽後,沒有任何確認就緊接著傳送第二條,很難不造成資料丟失。

於是我們讓分割槽在收到訊息後返回確認訊息給生產者,生產者收到後傳送下一條。

就這樣,訊息很順利地發著,正好在 Leader 拿到最新的訊息並返回確認給生產者的時候,Leader 掛了,此時,Follower 還沒同步最新的訊息,而生產者已經接收到了分割槽返回的確認,這時候還是丟了資料。

因此我們讓 Leader 以及參與複製的 Follower 都收到訊息後返回確認,這樣就能最大程度保證訊息不丟失,不過延遲較高。

針對上述的情況,Kafka 設定了一個 acks 引數,指定了必須有幾個副本收到訊息生產者才認為是寫入成功了。

  • acks=0,生產者只管寫入,不會等待 Broker 返回響應,預設成功。這種情況最容易造成資料丟失,不過吞吐量最高;
  • acks=1,Leader 收到訊息後響應,生產者才認為寫成功,這種也會造成丟失;
  • acks=all,Kafka 叢集內部會維護一個副本清單 ISR(後續會寫,再此不做描述),當 ISR 裡的所有副本都收到訊息,才認為寫入成功,最大程度保證訊息不丟失,不過可能會造成延遲較高。

另外,Kafka 還有一個引數 retries,表示當訊息傳送失敗後,生產者重試的次數,預設為0,如果對丟失訊息零容忍,那就不能設定為0.

事實上,生產者在收到分割槽返回的確認訊息前,還是可以持續傳送訊息的,這個可以通過設定 max.in.flight.requests.per.connection 引數進行修改,這個引數指定了生產者在收到響應前可以傳送多少個訊息。

這裡需要注意的是,如果這個引數不為1,而 retries 引數也不為 0 的時候,當發生重試的時候,有可能造成分割槽資料順序錯亂。在有些場景下,順序是很重要的,比如分析交易流水的過程,某個第一次存款的客戶先存1塊錢再取1塊錢是正常的,但反過來可能就有點奇怪了。

所以,如果要保證資料不丟失,同時要保證資料有序性,就需要將 retries 設定為非 0 整數,max.in.flight.requests.per.connection 設定為 1,注意不是 0.

生產者可以指定鍵作為分割槽鍵,如果不指定,生產者會使用輪詢演算法將訊息均勻的發到各個分割槽上

但如果指定了分割槽鍵,Kafka 會使用自己的 hash 演算法獲得 hash 值,然後根據 hash 值發到相應的分割槽。

到這,回顧一下前面的幾個問題,是不是有點豁然開朗了?

如果覺得寫得不錯,對你有幫助,麻煩點個小小的贊,謝謝!

轉載請註明出處:工眾號“大資料的奇妙冒險”,部落格園 Lyu_zt