1. 程式人生 > >二、kafka訊息與同步機制

二、kafka訊息與同步機制

如上圖所示:Producer根據指定的partition方法(預設round-robin、hash等),將訊息釋出到指定topic的partition裡面;kafka叢集接收到Producer發過來的訊息後,將其持久化到硬碟,並保留訊息指定時長(可配置),而不關注訊息是否被消費;Consumer從kafka叢集pull資料,並控制獲取訊息的offset。

下面討論以下Kafka如何確保訊息在producer和consumer之間的傳輸。producer與consumer有可能的delivery guarantee:

 

  • At most once 訊息可能會丟,但絕不會重複傳輸
  • At least one 訊息絕不會丟,但可能會重複傳輸
  • Exactly once 每條訊息肯定會被傳輸一次且僅傳輸一次,很多時候這是使用者所想要的

Producer

producer 的deliver guarantee 可以通過request.required.acks引數的設定來進行調整:

  • 0 ,相當於非同步傳送,訊息傳送完畢即offset增加,繼續生產;相當於At most once
  • 1,leader收到leader replica 對一個訊息的接受ack才增加offset,然後繼續生產;
  • -1,leader收到所有replica 對一個訊息的接受ack才增加offset,然後繼續生產

當producer向broker傳送訊息時,一旦這條訊息被commit,因數replication的存在,它就不會丟。但是如果producer傳送資料給broker後,遇到的網路問題而造成通訊中斷,那producer就無法判斷該條訊息是否已經commit。這一點有點像向一個自動生成primary key的資料庫表中插入資料。雖然Kafka無法確定網路故障期間發生了什麼,但是producer可以生成一種類似於primary key的東西,發生故障時冪等性的retry多次,這樣就做到了Exactly one。截止到目前(Kafka 0.8.2版本,2015-01-25),這一feature還並未實現,有希望在Kafka未來的版本中實現。(所以目前預設

情況下一條訊息從producer和broker是確保了At least once,但可通過設定producer非同步傳送實現At most once)。

Consumer

  consumer在從broker讀取訊息後,可以選擇commit,該操作會在Zookeeper中存下該consumer在該partition下讀取的訊息的offset。該consumer下一次再讀該partition時會從下一條開始讀取。如未commit,下一次讀取的開始位置會跟上一次commit之後的開始位置相同。當然可以將consumer設定為autocommit,即consumer一旦讀到資料立即自動commit。如果只討論這一讀取訊息的過程,那Kafka是確保了Exactly once。但實際上實際使用中consumer並非讀取完資料就結束了,而是要進行進一步處理,而資料處理與commit的順序在很大程度上決定了訊息從broker和consumer的delivery guarantee semantic。

  • 讀完訊息先commit再處理訊息。這種模式下,如果consumer在commit後還沒來得及處理訊息就crash了,下次重新開始工作後就無法讀到剛剛已提交而未處理的訊息,這就對應於At most once
  • 讀完訊息先處理再commit。這種模式下,如果處理完了訊息在commit之前consumer crash了,下次重新開始工作時還會處理剛剛未commit的訊息,實際上該訊息已經被處理過了。這就對應於At least once(預設)
  • 如果一定要做到Exactly once,就需要協調offset和實際操作的輸出。精典的做法是引入兩階段提交。如果能讓offset和操作輸入存在同一個地方,會更簡潔和通用。這種方式可能更好,因為許多輸出系統可能不支援兩階段提交。比如,consumer拿到資料後可能把資料放到HDFS,如果把最新的offset和資料本身一起寫到HDFS,那就可以保證資料的輸出和offset的更新要麼都完成,要麼都不完成,間接實現Exactly once。(目前就high level API而言,offset是存於Zookeeper中的,無法存於HDFS,而low level API的offset是由自己去維護的,可以將之存於HDFS中)

訊息傳遞過程

  Producer在釋出訊息到某個Partition時,先通過Zookeeper找到該Partition的Leader,然後無論該Topic的Replication Factor為多少(也即該Partition有多少個Replica),Producer只將該訊息傳送到該Partition的Leader。Leader會將該訊息寫入其本地Log。每個Follower都從Leader pull資料。這種方式上,Follower儲存的資料順序與Leader保持一致。Follower在收到該訊息並寫入其Log後,向Leader傳送ACK。一旦Leader收到了ISR中的所有Replica的ACK,該訊息就被認為已經commit了,Leader將增加HW(即offset)並且向Producer傳送ACK。

為了提高效能,每個Follower在接收到資料後就立馬向Leader傳送ACK,而非等到資料寫入Log中。因此,對於已經commit的訊息,Kafka只能保證它被存於多個Replica的記憶體中,而不能保證它們被持久化到磁碟中,也就不能完全保證異常發生後該條訊息一定能被Consumer消費。但考慮到這種場景非常少見,可以認為這種方式在效能和資料持久化上做了一個比較好的平衡。在將來的版本中,Kafka會考慮提供更高的永續性。

Consumer讀訊息也是從Leader讀取,只有被commit過的訊息(offset低於HW的訊息)才會暴露給Consumer。

Kafka Replication的資料流如下圖所示:

 

 

 producer 寫入訊息序列圖如下所示

 

具體步驟總結下來如下:

 

1. producer 先從 zookeeper 的 "/brokers/.../state" 節點找到該 partition 的 leader

2. producer 將訊息傳送給該 leader

3. leader 將訊息寫入本地 log

4. followers 從 leader pull 訊息,寫入本地 log 後 leader 傳送 ACK

5. leader 收到所有 ISR 中的 replica 的 ACK 後,增加 HW(high watermark,最後 commit 的 offset) 並向 producer 傳送 ACK