1. 程式人生 > >Kafka筆記—可靠性、冪等性和事務

Kafka筆記—可靠性、冪等性和事務

這幾天很忙,但是我現在給我的要求是一週至少要出一篇文章,所以先拿這篇筆記來做開胃菜,原始碼分析估計明後兩天應該能寫一篇。給自己加油~,即使沒什麼人看。

可靠性

如何保證訊息不丟失

Kafka只對“已提交”的訊息(committed message)做有限度的持久化保證。

已提交的訊息
當Kafka的若干個Broker成功地接收到一條訊息並寫入到日誌檔案後,它們會告訴生產者程式這條訊息已成功提交。

有限度的持久化保證
假如一條訊息儲存在N個Kafka Broker上,那麼至少這N個Broker至少有一個存活,才能保證訊息不丟失。

丟失資料案例

生產者程式丟失資料

由於Kafka Producer是非同步傳送的,呼叫完producer.send(msg)並不能認為訊息已經發送成功。

所以,在Producer永遠要使用帶有回撥通知的傳送API,使用producer.send(msg,callback)。一旦出現訊息提交失敗的情況,可以由針對性地進行處理。

消費者端丟失資料

消費者是先更新offset,再消費訊息。如果這個時候消費者突然宕機了,那麼這條訊息就會丟失。

所以我們要先消費訊息,再更新offset位置。但是這樣會導致訊息重複消費。

還有一種情況就是consumer獲取到訊息後開啟了多個執行緒非同步處理訊息,而consumer自動地向前更新offset。假如其中某個執行緒執行失敗了,那麼訊息就丟失了。

遇到這樣的情況,consumer不要開啟自動提交位移,而是要應用程式手動提交位移。

最佳實現

  1. 使用producer.send(msg,callback)。
  2. 設定acks = all。acks是Producer的引數,代表了所有副本Broker都要接收到訊息,該訊息才算是“已提交”。
  3. 設定retries為一個較大的值。是Producer的引數,對應Producer自動重試。如果出現網路抖動,那麼可以自動重試訊息傳送,避免訊息丟失。
  4. unclean.leader.election.enable = false。控制有哪些Broker有資格競選分割槽的Leader。表示不允許落後太多的Broker競選Leader。
  5. 設定replication.factor>=3。Broker引數,冗餘Broker。
  6. 設定min.insync.replicas>1。Broker引數。控制訊息至少要被寫入到多少個副本才算是“已提交”。
  7. 確保replication.factor>min.insync.replicas。如果兩個相等,那麼只要有一個副本掛機,整個分割槽就無法正常工作了。推薦設定成replication.factor=min.insync.replicas+1.
  8. 確保訊息消費完成在提交。Consumer端引數enbale.auto.commit,設定成false,手動提交位移。

解釋第二條和第六條:
如果ISR中只有1個副本了,acks=all也就相當於acks=1了,引入min.insync.replicas的目的就是為了做一個下限的限制:不能只滿足於ISR全部寫入,還要保證ISR中的寫入個數不少於min.insync.replicas。

冪等性

在0.11.0.0版本引入了建立冪等性Producer的功能。僅需要設定props.put(“enable.idempotence”,true),或props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true)。

enable.idempotence設定成true後,Producer自動升級成冪等性Producer。Kafka會自動去重。Broker會多儲存一些欄位。當Producer傳送了相同欄位值的訊息後,Broker能夠自動知曉這些訊息已經重複了。

作用範圍:

  1. 只能保證單分割槽上的冪等性,即一個冪等性Producer能夠保證某個主題的一個分割槽上不出現重複訊息。
  2. 只能實現單回話上的冪等性,這裡的會話指的是Producer程序的一次執行。當重啟了Producer程序之後,冪等性不保證。

事務

Kafka在0.11版本開始提供對事務的支援,提供是read committed隔離級別的事務。保證多條訊息原子性地寫入到目標分割槽,同時也能保證Consumer只能看到事務成功提交的訊息。

事務性Producer

保證多條訊息原子性地寫入到多個分割槽中。這批訊息要麼全部成功,要不全部失敗。事務性Producer也不懼程序重啟。

Producer端的設定:

  1. 開啟enable.idempotence = true
  2. 設定Producer端引數 transactional.id

除此之外,還要加上呼叫事務API,如initTransaction、beginTransaction、commitTransaction和abortTransaction,分別應對事務的初始化、事務開始、事務提交以及事務終止。
如下:

producer.initTransactions();
try {
            producer.beginTransaction();
            producer.send(record1);
            producer.send(record2);
            producer.commitTransaction();
} catch (KafkaException e) {
            producer.abortTransaction();
}

這段程式碼能保證record1和record2被當做一個事務同一提交到Kafka,要麼全部成功,要麼全部寫入失敗。

Consumer端的設定:
設定isolation.level引數,目前有兩個取值:

  1. read_uncommitted:預設值表明Consumer端無論事務型Producer提交事務還是終止事務,其寫入的訊息都可以讀取。
  2. read_committed:表明Consumer只會讀取事務型Producer成功提交事務寫入的訊息。注意,非事務型Producer寫入的所有訊息都能看到。