Kafka筆記—可靠性、冪等性和事務
這幾天很忙,但是我現在給我的要求是一週至少要出一篇文章,所以先拿這篇筆記來做開胃菜,原始碼分析估計明後兩天應該能寫一篇。給自己加油~,即使沒什麼人看。
可靠性
如何保證訊息不丟失
Kafka只對“已提交”的訊息(committed message)做有限度的持久化保證。
已提交的訊息
當Kafka的若干個Broker成功地接收到一條訊息並寫入到日誌檔案後,它們會告訴生產者程式這條訊息已成功提交。
有限度的持久化保證
假如一條訊息儲存在N個Kafka Broker上,那麼至少這N個Broker至少有一個存活,才能保證訊息不丟失。
丟失資料案例
生產者程式丟失資料
由於Kafka Producer是非同步傳送的,呼叫完producer.send(msg)並不能認為訊息已經發送成功。
所以,在Producer永遠要使用帶有回撥通知的傳送API,使用producer.send(msg,callback)。一旦出現訊息提交失敗的情況,可以由針對性地進行處理。
消費者端丟失資料
消費者是先更新offset,再消費訊息。如果這個時候消費者突然宕機了,那麼這條訊息就會丟失。
所以我們要先消費訊息,再更新offset位置。但是這樣會導致訊息重複消費。
還有一種情況就是consumer獲取到訊息後開啟了多個執行緒非同步處理訊息,而consumer自動地向前更新offset。假如其中某個執行緒執行失敗了,那麼訊息就丟失了。
遇到這樣的情況,consumer不要開啟自動提交位移,而是要應用程式手動提交位移。
最佳實現
- 使用producer.send(msg,callback)。
- 設定acks = all。acks是Producer的引數,代表了所有副本Broker都要接收到訊息,該訊息才算是“已提交”。
- 設定retries為一個較大的值。是Producer的引數,對應Producer自動重試。如果出現網路抖動,那麼可以自動重試訊息傳送,避免訊息丟失。
- unclean.leader.election.enable = false。控制有哪些Broker有資格競選分割槽的Leader。表示不允許落後太多的Broker競選Leader。
- 設定replication.factor>=3。Broker引數,冗餘Broker。
- 設定min.insync.replicas>1。Broker引數。控制訊息至少要被寫入到多少個副本才算是“已提交”。
- 確保replication.factor>min.insync.replicas。如果兩個相等,那麼只要有一個副本掛機,整個分割槽就無法正常工作了。推薦設定成replication.factor=min.insync.replicas+1.
- 確保訊息消費完成在提交。Consumer端引數enbale.auto.commit,設定成false,手動提交位移。
解釋第二條和第六條:
如果ISR中只有1個副本了,acks=all也就相當於acks=1了,引入min.insync.replicas的目的就是為了做一個下限的限制:不能只滿足於ISR全部寫入,還要保證ISR中的寫入個數不少於min.insync.replicas。
冪等性
在0.11.0.0版本引入了建立冪等性Producer的功能。僅需要設定props.put(“enable.idempotence”,true),或props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true)。
enable.idempotence設定成true後,Producer自動升級成冪等性Producer。Kafka會自動去重。Broker會多儲存一些欄位。當Producer傳送了相同欄位值的訊息後,Broker能夠自動知曉這些訊息已經重複了。
作用範圍:
- 只能保證單分割槽上的冪等性,即一個冪等性Producer能夠保證某個主題的一個分割槽上不出現重複訊息。
- 只能實現單回話上的冪等性,這裡的會話指的是Producer程序的一次執行。當重啟了Producer程序之後,冪等性不保證。
事務
Kafka在0.11版本開始提供對事務的支援,提供是read committed隔離級別的事務。保證多條訊息原子性地寫入到目標分割槽,同時也能保證Consumer只能看到事務成功提交的訊息。
事務性Producer
保證多條訊息原子性地寫入到多個分割槽中。這批訊息要麼全部成功,要不全部失敗。事務性Producer也不懼程序重啟。
Producer端的設定:
- 開啟
enable.idempotence = true
- 設定Producer端引數
transactional.id
除此之外,還要加上呼叫事務API,如initTransaction、beginTransaction、commitTransaction和abortTransaction,分別應對事務的初始化、事務開始、事務提交以及事務終止。
如下:
producer.initTransactions();
try {
producer.beginTransaction();
producer.send(record1);
producer.send(record2);
producer.commitTransaction();
} catch (KafkaException e) {
producer.abortTransaction();
}
這段程式碼能保證record1和record2被當做一個事務同一提交到Kafka,要麼全部成功,要麼全部寫入失敗。
Consumer端的設定:
設定isolation.level引數,目前有兩個取值:
- read_uncommitted:預設值表明Consumer端無論事務型Producer提交事務還是終止事務,其寫入的訊息都可以讀取。
- read_committed:表明Consumer只會讀取事務型Producer成功提交事務寫入的訊息。注意,非事務型Producer寫入的所有訊息都能看到。