1. 程式人生 > 其它 >kafka學習筆記02-深入學習

kafka學習筆記02-深入學習

基礎架構

1)Producer :訊息生產者,就是向 kafka broker 發訊息的客戶端;

2)Consumer :訊息消費者,向 kafka broker 取訊息的客戶端;

3)Consumer Group (CG):消費者組,由多個 consumer 組成。消費者組內每個消費者負責消費不同分割槽的資料,一個分割槽只能由一個組內消費者消費;消費者組之間互不影響。所有的消費者都屬於某個消費者組,即消費者組是邏輯上的一個訂閱者。

4)Broker :一臺 kafka 伺服器就是一個 broker。一個叢集由多個 broker 組成。一個 broker 可以容納多個 topic。

5)Topic :訊息主題,生產者和消費者面向的都是一個 topic;

6)Partition:為了實現擴充套件性,一個非常大的 topic 可以分佈到多個 broker(即伺服器)上, 一個 topic 可以分為多個 partition,每個 partition 是一個有序的佇列;

7)Replica:副本,為保證叢集中的某個節點發生故障時,該節點上的 partition 資料不丟失,且 kafka 仍然能夠繼續工作,kafka 提供了副本機制,一個 topic 的每個分割槽都有若干個副本,副本數一共包括了一個 leader 和若干個 follower。

8)leader:每個分割槽多個副本的“主”,生產者傳送資料的物件,以及消費者消費資料的物件都是 leader。

9)follower:每個分割槽多個副本中的“從”,實時從 leader 中同步資料,保持和 leader 資料 的同步。leader 發生故障時,某個 follower 會成為新的 follower。

Brocker

訊息儲存機制

Kafka 中訊息是以 topic 進行分類的,生產者生產訊息,消費者消費訊息,都是面向 topic 的。 topic 是邏輯上的概念,而 partition 是物理上的概念,每個 partition 對應於一個 log 文 件,該 log 檔案中儲存的就是 producer 生產的資料。Producer 生產的資料會被不斷追加到該 log 檔案末端,且每條資料都有自己的 offset。消費者組中的每個消費者,都會實時記錄自己消費到了哪個 offset,以便出錯恢復時,從上次的位置繼續消費。

由於生產者生產的訊息會不斷追加到 log 檔案末尾,為防止 log 檔案過大導致資料定位效率低下,Kafka 採取了分片和索引機制,將每個 partition 分為多個 segment。每個 segment 對應兩個檔案——“.index”檔案和“.log”檔案。這些檔案位於一個資料夾下,該資料夾的命名 規則為:topic 名稱+分割槽序號。

log檔案儲存具體的訊息內容, index檔案存取offset和訊息內容的地址

生產者(Producer)

分割槽策略

topic可以分為多個partition, 生產者傳送訊息時, 該條訊息具體分配到哪一個partition, 就是分割槽策略決定的.

生產者在傳送訊息時可以自己指定分割槽號, 若不指定, 則會通過預設的分割槽器進行分割槽, 不同的kafka API客戶端對分割槽策略的支援可能不一樣, 比如在pykafka這個包中, producer呼叫produce方法時, 不能手動指定具體的分割槽號, 只能通過分割槽器進行分割槽. 而在kafka-python中, 呼叫send方法時, 可以指定具體的分割槽號, 若未指定則會通過分割槽器進行分割槽

自定義分割槽器

分割槽器可以進行自定義, 以pykafka為例, 在partitioner檔案中預定義了一些分割槽器, 如RandomPartitioner, HashingPartitioner, GroupHashingPartitioner等, 自定義的分割槽器實現__call__方法即可, 其接收兩個引數, partitions(分割槽數)key(分割槽依據), pykafka中預設的分割槽器為RandomPartitioner, 它並沒有使用到key這個引數, 只是初始化一個0, 然後一直對partitions進行取模, 而HashingPartitioner就是通過對key進行雜湊, 再對partitions進行取模得到最後的分割槽號

ACK應答機制

為了保證kafka的broker確實收到並儲存了生產者傳送的訊息, kafka有一個ack應答機制. 該機制有三種應答方式可以選擇

  1. (acks=0): 生產者傳送訊息後, 就認為是傳送成功了, 不等待broker的返回. 這樣傳送的速度最快, 但是無法保證broker準確接收並儲存了訊息, 可能出現數據丟失.
  2. (acks=1): 生產者傳送訊息後, 只等待leader的返回結果, 不管follower是否同步完成, 只要leader返回成功, 那麼就認為訊息傳送成功了. 這樣如果leader接收到訊息後, 還沒有同步至follower就掛掉了, 那麼這條訊息也就丟失了.
  3. (acks=-1): 生產者傳送訊息後, 等leader和follower都同步完成了, 才認為是訊息傳送成功了. 這種情況速度最慢, 但是訊息丟失的概率最小. 但也有可能發生資料重複, 當follower同步完成後, leader傳送ack前, leader掛掉了, 那麼生產者長時間為接收到ack, 可能進行重發, 此時傳送到了新選舉出來的leader上, 該leader上次本身就已經同步了一次資料了, 現在又接受一次, 那麼就造成了資料重複.

ISR(in-sync replica)

對於上述的第三中情況, 如果副本非常多, 那麼等每個副本都同步完成, 必然需要一定的時間, 因此kafka在此基礎上, 提出了ISR的概念, 意為和 leader 保持同步的 follower 集合(並不一定是所有的follower)。當 ISR 中的 follower 完成資料的同步之後,leader 就會給 follower 傳送 ack。如果 follower 長時間未向 leader 同步資料 ,則該 follower 將被踢出 ISR. 該時間閾值由replica.lag.time.max.ms引數設定。Leader 發生故障之後,就會從 ISR 中選舉新的 leader。

注:在0.9版本前, 除了通過時間來選擇follower是否進入ISR外, 還可以通過資料條數來選擇follower, 比如設定一個數據量閾值(10), 如果follower中的資料條數與leader的條數差小於10, 那麼就把該follower加入ISR中, 否則踢出ISR. 但如果根據條數來選擇的話, 因為生產者生產訊息可能是按批次持續傳送的, 那麼follower的同步也可能是一直在進行的, 也就意味著follower與leader的條數差可能一直在變動, 比如上一時刻follower的條數差小於閾值, 被加出了ISR, 下一時刻條數差又大於閾值, 又被踢出了ISR, 後面同步過來了後又加入了ISR, 這樣容易造成頻繁更新ISR, 並且ISR的作用只是一個leader的預備佇列, 是以防萬一的策略, 正常情況leader一直是正常工作的, 所以就沒有必要花費太多的資源去頻繁操作ISR.

因此kafka中對於第三種情況, 並不是會等所有的follower都同步完成才返回ack, 而只需要等ISR中的follower同步完成即可傳送ack

pykafka中, 建立Producer物件時, 可以傳入required_acks引數, 該引數預設為1, 即只等待leader接收到資料就返回ack, 也可以傳入0或者-1, 此外還可以傳入>=2的數, 即自己設定等待同步的副本數

副本同步故障細節

例如有1個leader, 2個follower共三個副本, 生產者第一批資料傳送了10條訊息, offset為0到9, 三個副本都同步完成了, offset都同步到了9

此時生產者傳送了第二批共10條訊息, 首先是leader接受訊息, offset從9變成了19. 然後與兩個follower進行同步. 兩個follower的同步速率總會有一個快一個慢. 假如某一時刻, 第一個follower的offset同步到了12, 第二個follower的offset同步到了15.

那麼對於這個時刻來說, 19/12/15這三個offset就稱為每個副本的LEO(Log End Offset), offset12就稱為當前的高水位(High Watermark), 即:

LEO:指的是每個副本最大的 offset;

HW:指的是ISR 佇列中最小的 LEO。也是消費者能見到的最大的 offset;

此時:

  1. 若leader掛掉了, 那麼需要從兩個follower中選舉一個來當新的leader, 此時為了保證兩個follower的資料統一, 不管選擇哪一個follower, 都會將高於HW的offset全部捨棄. 即使老的leader(LEO為19的leader)又啟動了, 那麼老的leader此時只能變成follower, 捨棄高於HW的offset, 從新leader中重新同步資料

  2. 若follower掛掉了, 那麼就會將該follower踢出ISR, 如果不久後該follower又活了, 那麼也會捨棄大於上一次儲存的HW後的offset, 重新從HW開始同步leader的資料. 若符合加入ISR的條件, 那麼可以繼續加入ISR.

    例: 若第二個follower(LEO為15的follower)掛掉了, 則踢出ISR, 恢復後. 就會捨棄offset13到15的資料, 重新從HW12開始與leader進行同步

精準一次性消費(Exactly Once)

kafka在0.11版本引入了冪等性, 即Producer不管向server傳送了多少次重複資料, Server端只會儲存一條.

要啟用冪等性,只需要將 Producer 的引數中enable.idompotence設定為true即可。開啟冪等性的 Producer 在 初始化的時候會被分配一個 PID,發往同一 Partition 的訊息會附帶 Sequence Number。而 Broker 端會對做快取,當具有相同主鍵的訊息提交時,Broker 只 會持久化一條。但是Producer重啟後 PID 就會變化,並且不同的 Partition 也具有不同主鍵id,所以冪等性無法保證跨分割槽跨會話的 Exactly Once。

消費者(consumer)

消費方式

consumer 採用pull(拉)模式從 broker 中讀取資料。因為push(推)模式很難適應消費速率不同的消費者, 容易造成 consumer 來不及處理訊息, 導致服務拒絕或者網路阻塞.

pull 模式不足之處是,如果 kafka 沒有資料,消費者可能會陷入迴圈中,一直返回空資料。針對這一點,Kafka 的消費者在消費資料時會傳入一個時長引數 timeout,如果當前沒有資料可供消費,consumer 會等待一段時間之後再返回,這段時長即為 timeout

分割槽分配策略

消費者消費時是以組的形式進行消費的, 即消費者組. 即使只啟動了一個消費者, 也會預設為其建立消費者組.

一個分割槽Partition, 只能被消費者組中的某一個消費者消費. 即一個消費者組中, 不可能有兩個及以上消費者消費同一個Partition, 因為一個消費者組從邏輯上來說就是一個大的消費者, 那麼組裡的兩個消費者消費同一個分割槽, 就是重複消費了.

如果組內有多個消費者, 他們都消費了同一個主題, 並且這個主題存在多個分割槽, 那麼就會存在分割槽的消費分配問題, 即哪一個分割槽分配給哪一個消費者消費. 當然如果組內只有一個消費者, 那麼就不存在分割槽分配問題

kafka有三種分割槽分配策略, RangeAssignor/RoundRobin(輪詢)/StickyAssignor

RangeAssignor

單主題

![image-20211007212506955](file:///Users/alex/Library/Application%20Support/typora-user-images/image-20211007212506955.png?lastModify=1633618070)

所有消費者共同訂閱多主題

![image-20211007224639031](file:///Users/alex/Library/Application%20Support/typora-user-images/image-20211007224639031.png?lastModify=1633618070)

這樣會導致分配不均勻, 即Consumer1消費了四個分割槽, Consumer2只消費了2個分割槽, 如果主題數更多, 那麼偏差會更大

RoundRobin

RoundRobin: 即輪詢方式分配, 把主題中的所有分割槽和所有消費者排序後進行輪詢, 第一個partition分配給第一個消費者, 第二個partition分配給第二個消費者, 依次迴圈類推

單主題

所有消費者共同訂閱多主題

這種方式解決的RangeAssignor的問題, 但是下面的情況下又會出現不均勻

每個消費者分別訂閱不同主題

注:紅線是訂閱,其他顏色的線是分配分割槽

StickyAssignor

從0.11.x版本開始引入這種分配策略,它主要有兩個目的:

① 分割槽的分配要儘可能的均勻;
② 分割槽的分配儘可能的與上次分配的保持相同。

當兩者發生衝突時,第一個目標優先於第二個目標。鑑於這兩個目標,StickyAssignor策略的具體實現要比RangeAssignor和RoundRobinAssignor這兩種分配策略要複雜很多。

所有消費者共同訂閱多主題

這樣初看上去似乎與採用RoundRobinAssignor策略所分配的結果相同,但事實是否真的如此呢?再假設此時消費者C1脫離了消費組,那麼消費組就會執行再平衡操作,進而消費分割槽會重新分配。如果採用RoundRobinAssignor策略,那麼此時的分配結果如下:

如分配結果所示,RoundRobinAssignor策略會按照消費者C0和C2進行重新輪詢分配。而如果此時使用的是StickyAssignor策略,那麼分配結果為:

可以看到分配結果中保留了上一次分配中對於消費者C0和C2的所有分配結果,並將原來消費者C1的“負擔”分配給了剩餘的兩個消費者C0和C2,最終C0和C2的分配還保持了均衡。

如果發生分割槽重分配,那麼對於同一個分割槽而言有可能之前的消費者和新指派的消費者不是同一個,對於之前消費者進行到一半的處理還要在新指派的消費者中再次復現一遍,這顯然很浪費系統資源。StickyAssignor策略如同其名稱中的“sticky”一樣,讓分配策略具備一定的“粘性”,儘可能地讓前後兩次分配相同,進而減少系統資源的損耗以及其它異常情況的發生。

每個消費者分別訂閱不同主題

從結果上看StickyAssignor策略比另外兩者分配策略而言顯得更加的優異,這個策略的程式碼實現也是異常複雜,如果大家在一個 group 裡面,不同的 Consumer 訂閱不同的 topic, 那麼設定Sticky 分配策略還是很有必要的.