1. 程式人生 > 其它 >kafka:(8) 事務、冪等

kafka:(8) 事務、冪等

一、訊息傳輸保障

  一般而言,訊息中介軟體的訊息傳輸保障有3個層級,分別如下。
(1) at most once:至多一次。訊息可能會丟失,但絕對不會重複傳輸。
(2) at least once : 最少一次。訊息絕不會丟失,但可能會重複傳輸。
(3) exactly once :恰好一次。每條訊息肯定會被傳輸一次且僅傳輸一次。

  Kafka 的訊息傳輸保障機制非常直觀。當生產者向Kafka傳送訊息時,一旦訊息被成功提交到日誌檔案,由於多副本機制的存在,這條訊息就不會丟失。如果生產者傳送訊息到Kafka之後,遇到了網路問題而造成通訊中斷,那麼生產者就無法判斷該訊息是否己經提交。雖然Kafka無法確定網路故障期間發生了什麼,但生產者可以進行多次重試來確保訊息已經寫入Kafka,這個重試的過程中有可能會造成訊息的重複寫入,所以這裡Kafka提供的訊息傳輸保障為at least once 。

  對消費者而言,消費者處理訊息和提交消費位移的順序在很大程度上決定了消費者提供哪一種訊息傳輸保障。如果消費者在拉取完訊息之後,應用邏輯先處理訊息後提交消費位移,那麼在訊息處理之後且在位移提交之前消費者宕機了,待它重新上線之後,會從上一次位移提交的位置拉取,這樣就出現了重複消費,因為有部分訊息已經處理過了只是還沒來得及提交消費位移,此時就對應at least once。如果消費者在拉完訊息之後,應用邏輯先提交消費位移後進行訊息處理,那麼在位移提交之後且在訊息處理完成之前消費者巖機了,待它重新上線之後,會從己經提交的位移處開始重新消費,但之前尚有部分訊息未進行消費,如此就會發生訊息丟失,此時就對應at most once 。

二、冪等

  所謂的幕等,簡單地說就是對介面的多次呼叫所產生的結果和呼叫一次是一致的。生產者在進行重試的時候有可能會重複寫入訊息,而使用Kafka的冪等性功能之後就可以避免這種情況。
  開啟幕等性功能的方式很簡單,只需要顯式地將生產者客戶端引數enab le.idempotence設定為true即可(預設值為false )

properties.put(ProducerConfig.ENABLE_IDEMPOTENCECONFIG, true);
#或者
properties.put (“enable.idempotence”, true);

  不過如果要確保冪等性功能正常,還需要確保生產者客戶端的 retries、acks、max.in.flight.requests.per.connection 這幾個引數不被配置錯。實際上在使用冪等性功能的時候,使用者完全可以不用配置(也不建議配置)這幾個引數。

  • 如果使用者顯式地指定了 retries 引數,那麼這個引數的值必須大於0,否則會報出 ConfigException。如果使用者沒有顯式地指定 retries 引數,那麼 KafkaProducer 會將它置為 Integer.MAX_VALUE。
  • 同時還需要保證 max.in.flight.requests.per.connection 引數的值不能大於5(這個引數的值預設為5),否則也會報出 ConfigException。
  • 如果使用者還顯式地指定了 acks 引數,那麼還需要保證這個引數的值為 -1(all),如果不為 -1(這個引數的值預設為1),那麼也會報出 ConfigException。

  為了實現生產者的冪等性,Kafka 為此引入了 producer id(以下簡稱 PID)和序列號(sequence number)這兩個概念。每個新的生產者例項在初始化的時候都會被分配一個 PID,這個 PID 對使用者而言是完全透明的。對於每個 PID,訊息傳送到的每一個分割槽都有對應的序列號,這些序列號從0開始單調遞增。生產者每傳送一條訊息就會將 <PID,分割槽> 對應的序列號的值加1。broker 端會在記憶體中為每一對 <PID,分割槽> 維護一個序列號。對於收到的每一條訊息,只有當它的序列號的值(SN_new)比 broker 端中維護的對應的序列號的值(SN_old)大1(即 SN_new = SN_old + 1)時,broker 才會接收它。如果 SN_new< SN_old + 1,那麼說明訊息被重複寫入,broker 可以直接將其丟棄。如果 SN_new> SN_old + 1,那麼說明中間有資料尚未寫入,出現了亂序,暗示可能有訊息丟失,對應的生產者會丟擲 OutOfOrderSequenceException,這個異常是一個嚴重的異常,後續的諸如 send()、beginTransaction()、commitTransaction() 等方法的呼叫都會丟擲 IllegalStateException 的異常。引入序列號來實現冪等也只是針對每一對 <PID,分割槽> 而言的,也就是說,Kafka 的冪等只能保證單個生產者會話(session)中單分割槽的冪等

三、事務

  冪等性並不能跨多個分割槽運作,而事務可以彌補這個缺陷。事務可以保證對多個分割槽寫入操作的原子性。操作的原子性是指多個操作要麼全部成功,要麼全部失敗,不存在部分成功、部分失敗的可能。

  Kafka中的事務可以使應用程式將消費訊息、生產訊息、提交消費位移當作原子操作來處理,同時成功或失敗,即使該生產或消費會跨多個分割槽。

  為了實現事務,應用程式必須提供唯一的 transactionalId,這個 transactionalId 通過客戶端引數 transactional.id 來顯式設定

properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transactionId");
# 或者
properties.put("transactional.id", "transactionId");

  事務要求生產者開啟冪等特性,因此通過將 transactional.id 引數設定為非空從而開啟事務特性的同時需要將 enable.idempotence 設定為 true(如果未顯式設定,則 KafkaProducer 預設會將它的值設定為 true),如果使用者顯式地將 enable.idempotence 設定為 false,則會報出 ConfigException。

  transactionalId 與 PID 一一對應,兩者之間所不同的是 transactionalId 由使用者顯式設定,而 PID 是由 Kafka 內部分配的。另外,為了保證新的生產者啟動後具有相同 transactionalId 的舊生產者能夠立即失效,每個生產者通過 transactionalId 獲取 PID 的同時,還會獲取一個單調遞增的 producer epoch。如果使用同一個 transactionalId 開啟兩個生產者,那麼前一個開啟的生產者會報錯。

  從消費者的角度分析,事務能保證的語義相對偏弱。出於以下原因,Kafka 並不能保證已提交的事務中的所有訊息都能夠被消費:

  • 對採用日誌壓縮策略的主題而言,事務中的某些訊息有可能被清理(相同key的訊息,後寫入的訊息會覆蓋前面寫入的訊息)。
  • 事務中訊息可能分佈在同一個分割槽的多個日誌分段(LogSegment)中,當老的日誌分段被刪除時,對應的訊息可能會丟失。
  • 消費者可以通過 seek() 方法訪問任意 offset 的訊息,從而可能遺漏事務中的部分訊息。
  • 消費者在消費時可能沒有分配到事務內的所有分割槽,如此它也就不能讀取事務中的所有訊息。

  KafkaProducer 提供了5個與事務相關的方法:

void initTransactions();
void beginTransaction() throws ProducerFencedException;
void sendOffsetsToTransaction(Map offsets, String consumerGroupId) throws ProducerFencedException;
void commitTransaction() throws ProducerFencedException;
void abortTransaction() throws ProducerFencedException;

  initTransactions() 方法用來初始化事務,這個方法能夠執行的前提是配置了 transactionalId,如果沒有則會報出 IllegalStateException;beginTransaction() 方法用來開啟事務;sendOffsetsToTransaction() 方法為消費者提供在事務內的位移提交的操作;commitTransaction() 方法用來提交事務;abortTransaction() 方法用來中止事務,類似於事務回滾。

  在消費端有一個引數 isolation.level,與事務有著莫大的關聯,這個引數的預設值為“read_uncommitted”,意思是說消費端應用可以看到(消費到)未提交的事務,當然對於已提交的事務也是可見的。這個引數還可以設定為“read_committed”,表示消費端應用不可以看到尚未提交的事務內的訊息。舉個例子,如果生產者開啟事務並向某個分割槽值傳送3條訊息 msg1、msg2 和 msg3,在執行 commitTransaction() 或 abortTransaction() 方法前,設定為“read_committed”的消費端應用是消費不到這些訊息的,不過在 KafkaConsumer 內部會快取這些訊息,直到生產者執行 commitTransaction() 方法之後它才能將這些訊息推送給消費端應用。反之,如果生產者執行了 abortTransaction() 方法,那麼 KafkaConsumer 會將這些快取的訊息丟棄而不推送給消費端應用。

  日誌檔案中除了普通的訊息,還有一種訊息專門用來標誌一個事務的結束,它就是控制訊息(ControlBatch)。控制訊息一共有兩種型別:COMMIT 和 ABORT,分別用來表徵事務已經成功提交或已經被成功中止。KafkaConsumer 可以通過這個控制訊息來判斷對應的事務是被提交了還是被中止了,然後結合引數 isolation.level 配置的隔離級別來決定是否將相應的訊息返回給消費端應用,如下圖所示, ControlBatch 對消費端應用不可見。

  

  為了實現事務的功能,Kafka 還引入了事務協調器(TransactionCoordinator)來負責處理事務,這一點可以類比一下組協調器(GroupCoordinator)。每一個生產者都會被指派一個特定的 TransactionCoordinator,所有的事務邏輯包括分派 PID 等都是由 TransactionCoordinator 來負責實施的。TransactionCoordinator 會將事務狀態持久化到內部主題 __transaction_state 中。