1. 程式人生 > >kafka數據可靠傳輸

kafka數據可靠傳輸

發送數據 efault 復制 最好 取出 永遠 正在 個數 實時

再說復制
Kafka 的復制機制和分區的多副本架構是Kafka 可靠性保證的核心。把消息寫入多個副本可以使Kafka 在發生崩憤時仍能保證消息的持久性。

Kafka 的主題被分為多個分區,分區是基本的數據塊。分區存儲在單個磁盤上,Kafka 可以保證分區裏的事件是有序的,分區可以在線(可用),也可以離線(不可用) 。每個分區可以有多個副本,其中一個副本是首領。所有的事件都直接發送給首領副本,或者直接從首領副本讀取事件。其他副本只需要與首領保持同步,並及時復制最新的事件。當首領副本不可用時,其中一個同步副本將成為新首領。

分區首領是同步副本,而對於跟隨者副本來說,它需要滿足以下條件才能被認為是同步的。

  • 與Zookeeper 之間有一個活躍的會話,也就是說,它在過去的6s(可配置)內向Zookeeper 發送過心跳。
  • 在過去的10s 內(可配置)從首領那裏獲取過消息。
  • 在過去的10s 內從首領那裏獲取過最新的消息。光從首領那裏獲取消息是不夠的,它還必須是兒乎零延遲的。


如果跟隨者副本不能滿足以上任何一點,比如與Zookeeper 斷開連接,或者不再獲取新消息,或者獲取消息滯後了10s 以上,那麽它就被認為是不同步的。一個不同步的副本通過與Zookeeper 重新建立連接,並從首領那裏獲取最新消息,可以重新變成同步的。這個過程在網絡出現臨時問題並很快得到修復的情況下會很快完成,但如果broker 發生崩憤就需要較長的時間。

broker的配置

broker 有3 個配置參數會影響Kafka 消息存儲的可靠性。與其他配置參數一樣,它們可以應用在broker 級別,用於控制所有主題的行為,也可以應用在主題級別,用於控制個別主題的行為。在主題級別控制可靠性,意味著Kafka 集群可以同時擁有可靠的主題和非可靠的主題。例如,在銀行裏,管理員可能把整個集群設置為可靠的,但把其中的一個主題設置為非可靠的,用於保存來自客戶的投訴,因為這些消息是允許丟失的。

復制系數
主題級別的配置參數是replication.factor,而在broker 級別則可以通過default.replication.factor來配置自動創建的主題。

如果復制系數為N,那麽在凡l 個broker 失效的情況下,仍然能夠從主題讀取數據或向主題寫入數據。所以,更高的復制系數會帶來更高的可用性、可靠性和更少的故障。另一方面,復制系數N 需要至少N 個broker ,而且會有N 個數據副本,也就是說它們會占用N倍的磁盤空間。我們一般會在可用性和存儲硬件之間作出權衡。

建議在可用性的場景下,把復制系數設置為3.

不完全的首領選舉

之前提到過,當分區首領不可用時,一個同步副本會被選舉為新首領。如果在選舉過程中沒有丟失數據,也就是說提交的數據同時存在於所有的同步副本上,那麽這個選舉就是“完全的”

但如果在首領不可用時其他副本都是不同步的,我們該怎麽辦呢?

#以下兩種情況:

第一種: 分區有3 個副本,其中的兩個跟隨者副本不可用(比如有兩個broker 發生崩憤)。這個時候,如果生產者繼續往首領寫入數據,所有消息都會得到確認井被提交(因為此時首 領是唯一的同步副本)。現在我們假設首領也不可用了(又一個broker 發生崩憤),這個時候,如果之前的一個跟隨者重新啟動,它就成為了分區的唯一不同步副本。 第二種: 分區有3 個副本,因為網絡問題導致兩個跟隨者副本復制消息滯後,所以盡管它復制消息,但已經不同步了。首領作為唯一的同步副本繼續接收消息。這個時候,如果 首領變為不可用,另外兩個副本就再也無法變成同步的了。

#上面這兩種情況該如何解決?
第一種:要麽等待原首領復活,但是等待過程中服務是宕的,有可能這是一個很長的時間段;要麽使用新的首領,但是肯定丟失了數據。
第二種:因為兩個副本已經滯後了,所以若首領不可用,那麽滯後的同步副本被選為新首領,就會造成數據丟失的問題(數據不一致)。

簡而言之,如果我們允許不同步的副本成為首領,那麽就要承擔丟失數據和出現數據不一致的風險。如果不允許它們成為首領,那麽就要接受較低的可用性,因為我們必須等待原先的首領恢復到可用狀態。
如果把unclean.leader.election.enable 設為true ,就是允許不同步的副本成為首領(也就是“ 不完全的選舉"),那麽我們將面臨丟失消息的風險。如果把這個參數設為false ,
就要等待原先的首領重新上線,從而降低了可用性。我們經常看到一些對數據質量和數據一致性要求較高的系統會禁用這種不完全的首領選舉( 把這個參數設為false ) 。銀行系統是這方面最好的例子,大部分銀行系統寧願選擇在幾分鐘甚至幾個小時內不處理信用卡支付事務,也不會冒險處理錯誤的消息。不過在對可用性要求較高的系統裏,比如實時點擊流分析系統, 一般會啟用不完全的首領選舉。

最少同步副本

在主題級別和broker 級別上,這個參數都叫min.insync.replicas 。我們知道,盡管為一個主題配置了3 個副本,還是會出現只有一個同步副本的情況。如果這個同步副本變為不可用,我們必須在可用性和一致性之間作出選擇一一這是一個兩難的選擇。根據Kafka 對可靠性保證的定義,消息只有在被寫入到所有同步副本之後才被認為
是已提交的。但如果這裏的“所有副本”只包含一個同步副本,那麽在這個副本變為不可用時,數據就會丟失。


如果要確保已提交的數據被寫入不止一個副本,就需要把最少同步副本數量設置為大一點的值。對於一個包含3 個副本的主題,如果min.insync.replicas被設為2 ,那麽至少要存在兩個同步副本才能向分區寫入數據。


如果3 個副本都是同步的,或者其中一個副本變為不可用,都不會有什麽問題。不過,如果有兩個副本變為不可用,那麽broker 就會停止接受生產者的請求。嘗試發送數據的生產者會收到NotEnoughReplicasException 異常。消費者仍然可以繼續讀取已有的數據。實際上,如果使用這樣的配置,那麽當只剩下一個同步副本時,它就變成只讀了,這是為了避免在發生不完全選舉時數據的寫入和讀取出現非預期的行為。為了從只讀狀態中恢復,必須讓兩個不可用分區中的一個重新變為可用的(比如重啟broker),並等待它變為同步的。

配置重試參數

如下的一個實例:

為broker 配置了3 個副本,並且禁用了不完全首領選舉。 把生產者的acks 設為all 。假設現在往Kafka 發送消息,分區的首領剛好崩憤,新的首領正在選舉當中, Kafka 會向生產者
返回“首領不可用”的響應。在這個時候,如果生產者沒能正確處理這個錯誤,也沒有重試發送消息直到發送成功,那麽消息也有可能丟失。這算不上是broker 的可靠性問題,因為broker
並沒有收到這個消息。這也不是一致性問題,因為消費者井沒有讀到這個消息。問題在於如果生產者沒能正確處理這些錯誤,弄丟消息的是它們自己。

#解決這個問題,生產者再發一次消息就可以了!

生產者需要處理的錯誤包括兩部分: 一部分是生產者可以自動處理的錯誤,還有一部分是需要開發者手動處理的錯誤。


如果broker 返回的錯誤可以通過重試來解決,那麽生產者會自動處理這些錯誤。生產者向broker 發送消息時, broker 可以返回一個成功晌應碼或者一個錯誤響應碼。錯民晌應碼可以分為兩種, 一種是在重試之後可以解決的,還有一種是無法通過重試解決的。例如,如果broker 返回的是LEADER_NOT_AVAILABLE 錯誤,生產者可以嘗試重新發送消息。也許在這個時候一個新的首領被選舉出來了,那麽這次發送就會成功。也就是說, LEADER_NOT_AVAILABLE是一個可重試錯誤。另一方面,如果broker 返回的是INVALID_CONFIG 錯誤,即使通過重試也無能改變配置選項,所以這樣的重試是沒有意義的。這種錯誤是不可重試錯誤。

上面提到的是生成者的一些配置,下面我們來說明消費者的一些配置。

消費者讀取數據

下面這段著重理解一下:

只有那些被提交到Kafka 的數據,也就是那些已經被寫入所有同步副本的數據,對消費者是可用的,這意味著消費者得到的消息已經具備了一致性。消費者唯一要做的是跟蹤哪些消息是已經讀取過的,哪些是還沒有讀取過的。這是在讀取消息時不丟失悄息的關鍵。


在從分區讀取數據時,消費者會獲取一批事件,檢查這批事件裏最大的偏移量,然後從這個偏移量開始讀取另外一批事件。這樣可以保證消費者總能以正確的順序獲取新數據, 不會錯過任何事件。


如果一個悄費者退出,另一個消費者需要知道從什麽地方開始繼續處理,它需要知道前一個消費者在退出前處理的最後一個偏移量是多少。所謂的“另一個”消費者,也可能就是它自己重啟之後重新回來工作。這也就是為什麽消費者要“提交”它們的偏移量。它們把當前讀取的偏移量保存起來,在退出之後,同一個群組裏的其他消費者就可以接孚它們的工作。如果消費者提交了偏移量卻未能處理完消息,那麽就有可能造成消息丟失,這也是消費者丟失消息的主要原因。在這種情況下,如果其他消費者接手了工作,那些沒有被處理完的消息就會被忽略,永遠得不到處理。這就是為什麽我們為什麽非常重視偏移量提交的時間點和提交的方式。

#已提交的消息與已提交的偏移量
此處的己提交消息與之前討論過的已提交消息是不一樣的,它是指已經被寫入所有同步副本並且對消費者可見的消息,而己提交偏移量是指消費者發送給Kafka 的偏移量,
用於確認它已經收到並處理好的消息位置。

消費者的配置

這裏僅說明四個比較重要參數的配置

  • group.id:如果兩個消費者具有相同的group.id,井且訂閱了同一個主題,那麽每個消費者會分到主題分區的一個子集, 也就是說它們只能讀到所有消息的一個子集(不過群組會讀取主題所有的消息)。如果你希望消費者可以看到主題的所有消息,那麽需要為它們設置唯一的group.id 。
  • auto.offset.reset:這個參數指定了在沒有偏移量可提交時(比如消費者第l 次啟動時)或者請求的偏移量在broker 上不存在時),消費者會做些什麽。這個參數有兩種配置。一種是earliest ,如果選擇了這種配置,消費者會從分區的開始位置讀取數據,不管偏移量是否有效,這樣會導致消費者讀取大量的重復數據,但可以保證最少的數據丟失。一種是latest,如果選擇了這種配置, 消費者會從分區的末尾開始讀取數據,這樣可以減少重復處理消息,但很有可能會錯過一些消息。
  • enable.auto.commit:這是一個非常重要的配置參數,你可以讓消費者基於任務調度自動提交偏移量,也可以在代碼裏手動提交偏移量。自動提交的一個最大好處是,在實現消費者邏輯時可以少考慮一些問題。如果你在消費者輪詢操作裏處理所有的數據,那麽自動提交可以保證只提交已經處理過的偏移量。自動提交的主要缺點是,無法控制重復處理消息(比如消費者在自動提交偏移量之前停止處理悄息),而且如果把消息交給另外一個後臺線程去處理,自動提交機制可能會在消息還沒有處理完畢就提交偏移量。
  • auto.commit.interval.ms: 與第3個參數直接聯系。如果選擇了自動提交偏移量,可以通過該參數配置提交的頻度,默認值是每5s提交一次。一般來說,頻繁提交會增加額外的開銷,但也會降低重復處理消息的頻率。

kafka數據可靠傳輸