1. 程式人生 > 其它 >Kafka面試準備,收藏這一篇就夠了!!!

Kafka面試準備,收藏這一篇就夠了!!!

  • 為什麼要用訊息佇列?為什麼選擇了kafka?
  • kafka的元件與作用(架構)?
  • kafka為什麼要分割槽?
  • Kafka生產者分割槽策略?
  • kafka的資料可靠性怎麼保證?(丟,重)
  • kafka的副本機制?
  • kafka的消費分割槽分配策略?
  • kafka的offset怎麼維護?
  • kafka為什麼這麼快?(高效讀寫資料)
  • Kafka訊息資料積壓,Kafka消費能力不足怎麼處理?
  • kafka事務是怎麼實現的?
  • Kafka中的資料是有序的嗎?
  • Kafka可以按照時間消費資料?
  • Kafka單條日誌傳輸大小?
  • Kafka引數優化?
  • Kafka適合以下應用場景?
  • Exactly Once語義?在流式計算中怎麼保持?

解析參考

為什麼要用訊息佇列

  1. 解耦

允許你獨立的擴充套件或修改兩邊的處理過程,只要確保它們遵守同樣的介面約束。

  1. 可恢復性

系統的一部分元件失效時,不會影響到整個系統。訊息佇列降低了程序間的耦合度,所以即使一個處理訊息的程序掛掉,加入佇列中的訊息仍然可以在系統恢復後被處理。

  1. 緩衝

有助於控制和優化資料流經過系統的速度,解決生產訊息和消費訊息的處理速度不一致的情況。

  1. 靈活性與峰值處理能力

在訪問量劇增的情況下,應用仍然需要繼續發揮作用,但是這樣的突發流量並不常見。如果為以能處理這類峰值訪問為標準來投入資源隨時待命無疑是巨大的浪費。使用訊息佇列能夠使關鍵元件頂住突發的訪問壓力,而不會因為突發的超負荷的請求而完全崩潰。

  1. 非同步通訊

很多時候,使用者不想也不需要立即處理訊息。訊息佇列提供了非同步處理機制,允許使用者把一個訊息放入佇列,但並不立即處理它。想向佇列中放入多少訊息就放多少,然後在需要的時候再去處理它們。

為什麼選擇了kafka

  1. 高吞吐量、低延遲:kafka每秒可以處理幾十萬條訊息,它的延遲最低只有幾毫秒。

  2. 可擴充套件性:kafka叢集支援熱擴充套件。

  3. 永續性、可靠性:訊息被持久化到本地磁碟,並且支援資料備份防止資料丟失。

  4. 容錯性:允許叢集中節點故障(若副本數量為n,則允許n-1個節點故障)。

  5. 高併發:支援數千個客戶端同時讀寫。

kafka的元件與作用(架構)

  • Producer :訊息生產者,就是向kafka broker發訊息的客戶端。
  • Consumer :訊息消費者,向kafka broker取訊息的客戶端。
  • Consumer Group (CG):消費者組,由多個consumer組成。消費者組內每個消費者負責消費不同分割槽的資料,一個分割槽只能由一個組內消費者消費;消費者組之間互不影響。所有的消費者都屬於某個消費者組,即消費者組是邏輯上的一個訂閱者。
  • Broker :一臺kafka伺服器就是一個broker。一個叢集由多個broker組成。一個broker可以容納多個topic。
  • Topic :可以理解為一個佇列,生產者和消費者面向的都是一個topic。
  • Partition:為了實現擴充套件性,一個非常大的topic可以分佈到多個broker(即伺服器)上,一個topic可以分為多個partition,每個partition是一個有序的佇列。
  • Replica:副本,為保證叢集中的某個節點發生故障時,該節點上的partition資料不丟失,且kafka仍然能夠繼續工作,kafka提供了副本機制,一個topic的每個分割槽都有若干個副本,一個leader和若干個follower。
  • leader:每個分割槽多個副本的“主”,生產者傳送資料的物件,以及消費者消費資料的物件都是leader。
  • follower:每個分割槽多個副本中的“從”,實時從leader中同步資料,保持和leader資料的同步。leader發生故障時,某個follower會成為新的follower。

kafka為什麼要分割槽

  1. 方便在叢集中擴充套件,每個Partition可以通過調整以適應它所在的機器,而一個topic又可以有多個Partition組成,因此整個叢集就可以適應任意大小的資料了。
  2. 可以提高併發,因為可以以Partition為單位讀寫。

Kafka生產者分割槽策略

  1. 指明 partition 的情況下,直接將指明的值直接作為partiton值。
  2. 沒有指明partition值但有key的情況下,將key的hash值與topic的partition數進行取餘得到partition值。
  3. 既沒有partition值又沒有key值的情況下,第一次呼叫時隨機生成一個整數(後面每次呼叫在這個整數上自增),將這個值與topic可用的partition總數取餘得到partition值,也就是常說的round-robin演算法。

kafka的資料可靠性怎麼保證

為保證producer傳送的資料,能可靠的傳送到指定的topic,topic的每個partition收到producer傳送的資料後,都需要向producer傳送ack(acknowledgement確認收到),如果producer收到ack,就會進行下一輪的傳送,否則重新發送資料。所以引出ack機制。

ack應答機制(可問:造成資料重複和丟失的相關問題)

Kafka為使用者提供了三種可靠性級別,使用者根據對可靠性和延遲的要求進行權衡,選擇以下的配置。acks引數配置:

  • 0:producer不等待broker的ack,這一操作提供了一個最低的延遲,broker一接收到還沒有寫入磁碟就已經返回,當broker故障時有可能丟失資料。
  • 1:producer等待broker的ack,partition的leader落盤成功後返回ack,如果在follower同步成功之前leader故障,那麼將會丟失資料。
  • -1(all):producer等待broker的ack,partition的leader和follower全部落盤成功後才返回ack。但是如果在follower同步完成後,broker傳送ack之前,leader發生故障,那麼會造成資料重複。
副本資料同步策略
方案優點缺點
半數以上完成同步,就傳送ack 延遲低 選舉新的leader時,容忍n臺節點的故障,需要2n+1個副本
全部完成同步,才傳送ack 選舉新的leader時,容忍n臺節點的故障,需要n+1個副本 延遲高

選擇最後一個的原因:

  1. 同樣為了容忍n臺節點的故障,第一種方案需要2n+1個副本,而第二種方案只需要n+1個副本,而Kafka的每個分割槽都有大量的資料,第一種方案會造成大量資料的冗餘。
  2. 雖然第二種方案的網路延遲會比較高,但網路延遲對Kafka的影響較小。
ISR

如果採用全部完成同步,才傳送ack的副本的同步策略的話:提出問題:leader收到資料,所有follower都開始同步資料,但有一個follower,因為某種故障,遲遲不能與leader進行同步,那leader就要一直等下去,直到它完成同步,才能傳送ack。這個問題怎麼解決呢?

Leader維護了一個動態的in-sync replica set (ISR),意為和leader保持同步的follower集合。當ISR中的follower完成資料的同步之後,leader就會給follower傳送ack。如果follower長時間未向leader同步資料,則該follower將被踢出ISR,該時間閾值由replica.lag.time.max.ms引數設定。Leader發生故障之後,就會從ISR中選舉新的leader。

故障處理(LEO與HW)

LEO:指的是每個副本最大的offset。

HW:指的是消費者能見到的最大的offset,ISR佇列中最小的LEO。

follower故障

follower發生故障後會被臨時踢出ISR,待該follower恢復後,follower會讀取本地磁碟記錄的上次的HW,並將log檔案高於HW的部分擷取掉,從HW開始向leader進行同步。等該follower的LEO大於等於該Partition的HW,即follower追上leader之後,就可以重新加入ISR了。

leader故障

leader發生故障之後,會從ISR中選出一個新的leader,之後,為保證多個副本之間的資料一致性,其餘的follower會先將各自的log檔案高於HW的部分截掉,然後從新的leader同步資料。

注意:這隻能保證副本之間的資料一致性,並不能保證資料不丟失或者不重複。

kafka的副本機制

參考上一個問題(副本資料同步策略)。

kafka的消費分割槽分配策略

一個consumer group中有多個consumer,一個topic有多個partition,所以必然會涉及到partition的分配問題,即確定那個partition由哪個consumer來消費 Kafka有三種分配策略,一是RoundRobin,一是Range。高版本還有一個StickyAssignor策略 將分割槽的所有權從一個消費者移到另一個消費者稱為重新平衡(rebalance)。當以下事件發生時,Kafka 將會進行一次分割槽分配:

同一個 Consumer Group 內新增消費者。

消費者離開當前所屬的Consumer Group,包括shuts down或crashes。

Range分割槽分配策略

Range是對每個Topic而言的(即一個Topic一個Topic分),首先對同一個Topic裡面的分割槽按照序號進行排序,並對消費者按照字母順序進行排序。然後用Partitions分割槽的個數除以消費者執行緒的總數來決定每個消費者執行緒消費幾個分割槽。如果除不盡,那麼前面幾個消費者執行緒將會多消費一個分割槽。假設n=分割槽數/消費者數量,m=分割槽數%消費者數量,那麼前m個消費者每個分配n+1個分割槽,後面的(消費者數量-m)個消費者每個分配n個分割槽。假如有10個分割槽,3個消費者執行緒,把分割槽按照序號排列

0,1,2,3,4,5,6,7,8,9

消費者執行緒為

C1-0,C2-0,C2-1

那麼用partition數除以消費者執行緒的總數來決定每個消費者執行緒消費幾個partition,如果除不盡,前面幾個消費者將會多消費一個分割槽。在我們的例子裡面,我們有10個分割槽,3個消費者執行緒,10/3 = 3,而且除除不盡,那麼消費者執行緒C1-0將會多消費一個分割槽,所以最後分割槽分配的結果看起來是這樣的:

C1-0:0,1,2,3

C2-0:4,5,6

C2-1:7,8,9

如果有11個分割槽將會是:

C1-0:0,1,2,3

C2-0:4,5,6,7

C2-1:8,9,10

假如我們有兩個主題T1,T2,分別有10個分割槽,最後的分配結果將會是這樣:

C1-0:T1(0,1,2,3) T2(0,1,2,3)

C2-0:T1(4,5,6) T2(4,5,6)

C2-1:T1(7,8,9) T2(7,8,9)

RoundRobinAssignor分割槽分配策略

RoundRobinAssignor策略的原理是將消費組內所有消費者以及消費者所訂閱的所有topic的partition按照字典序排序,然後通過輪詢方式逐個將分割槽以此分配給每個消費者. 使用RoundRobin策略有兩個前提條件必須滿足:

同一個消費者組裡面的所有消費者的num.streams(消費者消費執行緒數)必須相等;每個消費者訂閱的主題必須相同。加入按照 hashCode 排序完的topic-partitions組依次為

T1-5, T1-3, T1-0, T1-8, T1-2, T1-1, T1-4, T1-7, T1-6, T1-9

我們的消費者執行緒排序為

C1-0, C1-1, C2-0, C2-1

最後分割槽分配的結果為:

C1-0 將消費 T1-5, T1-2, T1-6 分割槽

C1-1 將消費 T1-3, T1-1, T1-9 分割槽

C2-0 將消費 T1-0, T1-4 分割槽

C2-1 將消費 T1-8, T1-7 分割槽

StickyAssignor分割槽分配策略

Kafka從0.11.x版本開始引入這種分配策略,它主要有兩個目的:

分割槽的分配要儘可能的均勻,分配給消費者者的主題分割槽數最多相差一個 分割槽的分配儘可能的與上次分配的保持相同。當兩者發生衝突時,第一個目標優先於第二個目標。鑑於這兩個目的,StickyAssignor策略的具體實現要比RangeAssignor和RoundRobinAssignor這兩種分配策略要複雜很多。

假設消費組內有3個消費者

C0、C1、C2

它們都訂閱了4個主題:

t0、t1、t2、t3

並且每個主題有2個分割槽,也就是說整個消費組訂閱了

t0p0、t0p1、t1p0、t1p1、t2p0、t2p1、t3p0、t3p1這8個分割槽

最終的分配結果如下:

消費者C0:t0p0、t1p1、t3p0

消費者C1:t0p1、t2p0、t3p1

消費者C2:t1p0、t2p1

這樣初看上去似乎與採用RoundRobinAssignor策略所分配的結果相同

此時假設消費者C1脫離了消費組,那麼消費組就會執行再平衡操作,進而消費分割槽會重新分配。如果採用RoundRobinAssignor策略,那麼此時的分配結果如下:

消費者C0:t0p0、t1p0、t2p0、t3p0

消費者C2:t0p1、t1p1、t2p1、t3p1

如分配結果所示,RoundRobinAssignor策略會按照消費者C0和C2進行重新輪詢分配。而如果此時使用的是StickyAssignor策略,那麼分配結果為:

消費者C0:t0p0、t1p1、t3p0、t2p0

消費者C2:t1p0、t2p1、t0p1、t3p1

可以看到分配結果中保留了上一次分配中對於消費者C0和C2的所有分配結果,並將原來消費者C1的“負擔”分配給了剩餘的兩個消費者C0和C2,最終C0和C2的分配還保持了均衡。

如果發生分割槽重分配,那麼對於同一個分割槽而言有可能之前的消費者和新指派的消費者不是同一個,對於之前消費者進行到一半的處理還要在新指派的消費者中再次復現一遍,這顯然很浪費系統資源。StickyAssignor策略如同其名稱中的“sticky”一樣,讓分配策略具備一定的“粘性”,儘可能地讓前後兩次分配相同,進而減少系統資源的損耗以及其它異常情況的發生。

到目前為止所分析的都是消費者的訂閱資訊都是相同的情況,我們來看一下訂閱資訊不同的情況下的處理。

舉例,同樣消費組內有3個消費者:

C0、C1、C2

叢集中有3個主題:

t0、t1、t2

這3個主題分別有

1、2、3個分割槽

也就是說叢集中有

t0p0、t1p0、t1p1、t2p0、t2p1、t2p2這6個分割槽

消費者C0訂閱了主題t0

消費者C1訂閱了主題t0和t1

消費者C2訂閱了主題t0、t1和t2

如果此時採用RoundRobinAssignor策略:

消費者C0:t0p0

消費者C1:t1p0

消費者C2:t1p1、t2p0、t2p1、t2p2

如果此時採用的是StickyAssignor策略:

消費者C0:t0p0

消費者C1:t1p0、t1p1

消費者C2:t2p0、t2p1、t2p2

此時消費者C0脫離了消費組,那麼RoundRobinAssignor策略的分配結果為:

消費者C1:t0p0、t1p1

消費者C2:t1p0、t2p0、t2p1、t2p2

StickyAssignor策略,那麼分配結果為:

消費者C1:t1p0、t1p1、t0p0

消費者C2:t2p0、t2p1、t2p2

可以看到StickyAssignor策略保留了消費者C1和C2中原有的5個分割槽的分配:

t1p0、t1p1、t2p0、t2p1、t2p2。

從結果上看StickyAssignor策略比另外兩者分配策略而言顯得更加的優異,這個策略的程式碼實現也是異常複雜。

kafka的offset怎麼維護

Kafka 0.9版本之前,consumer預設將offset儲存在Zookeeper中。

從0.9版本開始,consumer預設將offset儲存在Kafka一個內建的topic中,該topic為__consumer_offsets。

額外補充:實際開發場景中在Spark和Flink中,可以自己手動提交kafka的offset,或者是flink兩階段提交自動提交offset。

kafka為什麼這麼快

  1. Kafka本身是分散式叢集,同時採用分割槽技術,併發度高。
  2. 順序寫磁碟

Kafka的producer生產資料,要寫入到log檔案中,寫的過程是一直追加到檔案末端,為順序寫。官網有資料表明,同樣的磁碟,順序寫能到600M/s,而隨機寫只有100K/s。

  1. 零拷貝技術

零拷貝並不是不需要拷貝,而是減少不必要的拷貝次數。通常是說在IO讀寫過程中。

傳統IO流程:

第一次:將磁碟檔案,讀取到作業系統核心緩衝區。

第二次:將核心緩衝區的資料,copy到application應用程式的buffer。

第三步:將application應用程式buffer中的資料,copy到socket網路傳送緩衝區(屬於作業系統核心的緩衝區)

第四次:將socket buffer的資料,copy到網絡卡,由網絡卡進行網路傳輸。

傳統方式,讀取磁碟檔案並進行網路傳送,經過的四次資料copy是非常繁瑣的。實際IO讀寫,需要進行IO中斷,需要CPU響應中斷(帶來上下文切換),儘管後來引入DMA來接管CPU的中斷請求,但四次copy是存在“不必要的拷貝”的。

重新思考傳統IO方式,會注意到實際上並不需要第二個和第三個資料副本。應用程式除了快取資料並將其傳輸回套接字緩衝區之外什麼都不做。相反,資料可以直接從讀緩衝區傳輸到套接字緩衝區。

顯然,第二次和第三次資料copy 其實在這種場景下沒有什麼幫助反而帶來開銷,這也正是零拷貝出現的意義。

所以零拷貝是指讀取磁碟檔案後,不需要做其他處理,直接用網路傳送出去。

Kafka消費能力不足怎麼處理

  1. 如果是Kafka消費能力不足,則可以考慮增加Topic的分割槽數,並且同時提升消費組的消費者數量,消費者數=分割槽數。(兩者缺一不可)
  2. 如果是下游的資料處理不及時:提高每批次拉取的數量。批次拉取資料過少(拉取資料/處理時間<生產速度),使處理的資料小於生產的資料,也會造成資料積壓。

kafka事務是怎麼實現的

Kafka從0.11版本開始引入了事務支援。事務可以保證Kafka在Exactly Once語義的基礎上,生產和消費可以跨分割槽和會話,要麼全部成功,要麼全部失敗。

Producer事務

為了實現跨分割槽跨會話的事務,需要引入一個全域性唯一的Transaction ID,並將Producer獲得的PID和Transaction ID繫結。這樣當Producer重啟後就可以通過正在進行的Transaction ID獲得原來的PID。為了管理Transaction,Kafka引入了一個新的元件Transaction Coordinator。Producer就是通過和Transaction Coordinator互動獲得Transaction ID對應的任務狀態。Transaction Coordinator還負責將事務所有寫入Kafka的一個內部Topic,這樣即使整個服務重啟,由於事務狀態得到儲存,進行中的事務狀態可以得到恢復,從而繼續進行。

Consumer事務

對於Consumer而言,事務的保證就會相對較弱,尤其時無法保證Commit的資訊被精確消費。這是由於Consumer可以通過offset訪問任意資訊,而且不同的Segment File生命週期不同,同一事務的訊息可能會出現重啟後被刪除的情況。

Kafka中的資料是有序的嗎

單分割槽內有序。

多分割槽,分割槽與分割槽間無序。

Kafka可以按照時間消費資料嗎

可以,提供的API方法:

KafkaUtil.fetchOffsetsWithTimestamp(topic, sTime, kafkaProp)

Kafka單條日誌傳輸大小

kafka對於訊息體的大小預設為單條最大值是1M但是在我們應用場景中, 常常會出現一條訊息大於1M,如果不對kafka進行配置。則會出現生產者無法將訊息推送到kafka或消費者無法去消費kafka裡面的資料, 這時我們就要對kafka進行以下配置:server.properties

replica.fetch.max.bytes: 1048576  broker可複製的訊息的最大位元組數, 預設為1M
message.max.bytes: 1000012   kafka 會接收單個訊息size的最大限制, 預設為1M左右

message.max.bytes必須小於等於replica.fetch.max.bytes,否則就會導致replica之間資料同步失敗

Kafka引數優化

Broker引數配置(server.properties)
1、日誌保留策略配置
# 保留三天,也可以更短 (log.cleaner.delete.retention.ms)
log.retention.hours=72

2、Replica相關配置
default.replication.factor:1 預設副本1個

3、網路通訊延時
replica.socket.timeout.ms:30000 #當叢集之間網路不穩定時,調大該引數
replica.lag.time.max.ms= 600000# 如果網路不好,或者kafka叢集壓力較大,會出現副本丟失,然後會頻繁複制副本,導致叢集壓力更大,此時可以調大該引數。
Producer優化(producer.properties)
compression.type:none                 gzip  snappy  lz4  
#預設傳送不進行壓縮,推薦配置一種適合的壓縮演算法,可以大幅度的減緩網路壓力和Broker的儲存壓力。
Kafka記憶體調整(kafka-server-start.sh)
預設記憶體1個G,生產環境儘量不要超過6個G。
export KAFKA_HEAP_OPTS="-Xms4g -Xmx4g"

Kafka適合以下應用場景

  1. 日誌收集:一個公司可以用Kafka可以收集各種服務的log,通過kafka以統一介面服務的方式開放給各種consumer。
  2. 訊息系統:解耦生產者和消費者、快取訊息等。
  3. 使用者活動跟蹤:kafka經常被用來記錄web使用者或者app使用者的各種活動,如瀏覽網頁、搜尋、點選等活動,這些活動資訊被各個伺服器釋出到kafka的topic中,然後消費者通過訂閱這些topic來做實時的監控分析,亦可儲存到資料庫。
  4. 運營指標:kafka也經常用來記錄運營監控資料。包括收集各種分散式應用的資料,生產各種操作的集中反饋,比如報警和報告;
  5. 流式處理:比如spark和flink。


Exactly Once語義

將伺服器的ACK級別設定為-1,可以保證Producer到Server之間不會丟失資料,即At Least Once語義。相對的,將伺服器ACK級別設定為0,可以保證生產者每條訊息只會被髮送一次,即At Most Once語義。

At Least Once可以保證資料不丟失,但是不能保證資料不重複;

相對的,At Least Once可以保證資料不重複,但是不能保證資料不丟失。

但是,對於一些非常重要的資訊,比如說交易資料,下游資料消費者要求資料既不重複也不丟失,即Exactly Once語義。在0.11版本以前的Kafka,對此是無能為力的,只能保證資料不丟失,再在下游消費者對資料做全域性去重。對於多個下游應用的情況,每個都需要單獨做全域性去重,這就對效能造成了很大影響。

0.11版本的Kafka,引入了一項重大特性:冪等性。

開啟冪等性enable.idempotence=true。

所謂的冪等性就是指Producer不論向Server傳送多少次重複資料,Server端都只會持久化一條。冪等性結合At Least Once語義,就構成了Kafka的Exactly Once語義。即:

At Least Once + 冪等性 = Exactly Once

Kafka的冪等性實現其實就是將原來下游需要做的去重放在了資料上游。開啟冪等性的Producer在初始化的時候會被分配一個PID,發往同一Partition的訊息會附帶Sequence Number。而Broker端會對<PID, Partition, SeqNumber>做快取,當具有相同主鍵的訊息提交時,Broker只會持久化一條。

但是PID重啟就會變化,同時不同的Partition也具有不同主鍵,所以冪等性無法保證跨分割槽跨會話的Exactly Once。

補充,在流式計算中怎麼Exactly Once語義?以flink為例

  1. souce:使用執行ExactlyOnce的資料來源,比如kafka等

內部使用FlinkKafakConsumer,並開啟CheckPoint,偏移量會儲存到StateBackend中,並且預設會將偏移量寫入到topic中去,即_consumer_offsets Flink設定CheckepointingModel.EXACTLY_ONCE

  1. sink

儲存系統支援覆蓋也即冪等性:如Redis,Hbase,ES等 儲存系統不支援覆:需要支援事務(預寫式日誌或者兩階段提交),兩階段提交可參考Flink整合的kafka sink的實現。