深入理解Kafka必知必會(3)
Kafka中的事務是怎麼實現的?
Kafka中的事務可以使應用程式將消費訊息、生產訊息、提交消費位移當作原子操作來處理,同時成功或失敗,即使該生產或消費會跨多個分割槽。
生產者必須提供唯一的transactionalId,啟動後請求事務協調器獲取一個PID,transactionalId與PID一一對應。
每次傳送資料給<Topic, Partition>前,需要先向事務協調器傳送AddPartitionsToTxnRequest,事務協調器會將該<Transaction, Topic, Partition>存於__transaction_state內,並將其狀態置為BEGIN。
在處理完 AddOffsetsToTxnRequest 之後,生產者還會發送 TxnOffsetCommitRequest 請求給 GroupCoordinator,從而將本次事務中包含的消費位移資訊 offsets 儲存到主題 __consumer_offsets 中
一旦上述資料寫入操作完成,應用程式必須呼叫KafkaProducer的commitTransaction方法或者abortTransaction方法以結束當前事務。無論呼叫 commitTransaction() 方法還是 abortTransaction() 方法,生產者都會向 TransactionCoordinator 傳送 EndTxnRequest 請求。
- 將 PREPARE_COMMIT 或 PREPARE_ABORT 訊息寫入主題 __transaction_state
- 通過 WriteTxnMarkersRequest 請求將 COMMIT 或 ABORT 資訊寫入使用者所使用的普通主題和 __consumer_offsets
- 將 COMPLETE_COMMIT 或 COMPLETE_ABORT 資訊寫入內部主題 __transaction_state標明該事務結束
在消費端有一個引數isolation.level,設定為“read_committed”,表示消費端應用不可以看到尚未提交的事務內的訊息。如果生產者開啟事務並向某個分割槽值傳送3條訊息 msg1、msg2 和 msg3,在執行 commitTransaction() 或 abortTransaction() 方法前,設定為“read_committed”的消費端應用是消費不到這些訊息的,不過在 KafkaConsumer 內部會快取這些訊息,直到生產者執行 commitTransaction() 方法之後它才能將這些訊息推送給消費端應用。反之,如果生產者執行了 abortTransaction() 方法,那麼 KafkaConsumer 會將這些快取的訊息丟棄而不推送給消費端應用。
失效副本是指什麼?有那些應對措施?
正常情況下,分割槽的所有副本都處於 ISR 集合中,但是難免會有異常情況發生,從而某些副本被剝離出 ISR 集合中。在 ISR 集合之外,也就是處於同步失效或功能失效(比如副本處於非存活狀態)的副本統稱為失效副本,失效副本對應的分割槽也就稱為同步失效分割槽,即 under-replicated 分割槽。
Kafka 從 0.9.x 版本開始就通過唯一的 broker 端引數 replica.lag.time.max.ms 來抉擇,當 ISR 集合中的一個 follower 副本滯後 leader 副本的時間超過此引數指定的值時則判定為同步失敗,需要將此 follower 副本剔除出 ISR 集合。replica.lag.time.max.ms 引數的預設值為10000。
在 0.9.x 版本之前,Kafka 中還有另一個引數 replica.lag.max.messages(預設值為4000),它也是用來判定失效副本的,當一個 follower 副本滯後 leader 副本的訊息數超過 replica.lag.max.messages 的大小時,則判定它處於同步失效的狀態。它與 replica.lag.time.max.ms 引數判定出的失效副本取並集組成一個失效副本的集合,從而進一步剝離出分割槽的 ISR 集合。
Kafka 原始碼註釋中說明了一般有這幾種情況會導致副本失效:
- follower 副本程序卡住,在一段時間內根本沒有向 leader 副本發起同步請求,比如頻繁的 Full GC。
- follower 副本程序同步過慢,在一段時間內都無法追趕上 leader 副本,比如 I/O 開銷過大。
- 如果通過工具增加了副本因子,那麼新增加的副本在趕上 leader 副本之前也都是處於失效狀態的。
- 如果一個 follower 副本由於某些原因(比如宕機)而下線,之後又上線,在追趕上 leader 副本之前也處於失效狀態。
應對措施
我們用UnderReplicatedPartitions代表leader副本在當前Broker上且具有失效副本的分割槽的個數。
如果叢集中有多個Broker的UnderReplicatedPartitions保持一個大於0的穩定值時,一般暗示著叢集中有Broker已經處於下線狀態。這種情況下,這個Broker中的分割槽個數與叢集中的所有UnderReplicatedPartitions(處於下線的Broker是不會上報任何指標值的)之和是相等的。通常這類問題是由於機器硬體原因引起的,但也有可能是由於作業系統或者JVM引起的 。
如果叢集中存在Broker的UnderReplicatedPartitions頻繁變動,或者處於一個穩定的大於0的值(這裡特指沒有Broker下線的情況)時,一般暗示著叢集出現了效能問題,通常這類問題很難診斷,不過我們可以一步一步的將問題的範圍縮小,比如先嚐試確定這個效能問題是否只存在於叢集的某個Broker中,還是整個叢集之上。如果確定叢集中所有的under-replicated分割槽都是在單個Broker上,那麼可以看出這個Broker出現了問題,進而可以針對這單一的Broker做專項調查,比如:作業系統、GC、網路狀態或者磁碟狀態(比如:iowait、ioutil等指標)。
多副本下,各個副本中的HW和LEO的演變過程
某個分割槽有3個副本分別位於 broker0、broker1 和 broker2 節點中,假設 broker0 上的副本1為當前分割槽的 leader 副本,那麼副本2和副本3就是 follower 副本,整個訊息追加的過程可以概括如下:
- 生產者客戶端傳送訊息至 leader 副本(副本1)中。
- 訊息被追加到 leader 副本的本地日誌,並且會更新日誌的偏移量。
- follower 副本(副本2和副本3)向 leader 副本請求同步資料。
- leader 副本所在的伺服器讀取本地日誌,並更新對應拉取的 follower 副本的資訊。
- leader 副本所在的伺服器將拉取結果返回給 follower 副本。
- follower 副本收到 leader 副本返回的拉取結果,將訊息追加到本地日誌中,並更新日誌的偏移量資訊。
某一時刻,leader 副本的 LEO 增加至5,並且所有副本的 HW 還都為0。
之後 follower 副本(不帶陰影的方框)向 leader 副本拉取訊息,在拉取的請求中會帶有自身的 LEO 資訊,這個 LEO 資訊對應的是 FetchRequest 請求中的 fetch_offset。leader 副本返回給 follower 副本相應的訊息,並且還帶有自身的 HW 資訊,如上圖(右)所示,這個 HW 資訊對應的是 FetchResponse 中的 high_watermark。
此時兩個 follower 副本各自拉取到了訊息,並更新各自的 LEO 為3和4。與此同時,follower 副本還會更新自己的 HW,更新 HW 的演算法是比較當前 LEO 和 leader 副本中傳送過來的HW的值,取較小值作為自己的 HW 值。當前兩個 follower 副本的 HW 都等於0(min(0,0) = 0)。
接下來 follower 副本再次請求拉取 leader 副本中的訊息,如下圖(左)所示。
此時 leader 副本收到來自 follower 副本的 FetchRequest 請求,其中帶有 LEO 的相關資訊,選取其中的最小值作為新的 HW,即 min(15,3,4)=3。然後連同訊息和 HW 一起返回 FetchResponse 給 follower 副本,如上圖(右)所示。注意 leader 副本的 HW 是一個很重要的東西,因為它直接影響了分割槽資料對消費者的可見性。
兩個 follower 副本在收到新的訊息之後更新 LEO 並且更新自己的 HW 為3(min(LEO,3)=3)。
Kafka在可靠性方面做了哪些改進?(HW, LeaderEpoch)
HW
HW 是 High Watermark 的縮寫,俗稱高水位,它標識了一個特定的訊息偏移量(offset),消費者只能拉取到這個 offset 之前的訊息。
分割槽 ISR 集合中的每個副本都會維護自身的 LEO,而 ISR 集合中最小的 LEO 即為分割槽的 HW,對消費者而言只能消費 HW 之前的訊息。
leader epoch
leader epoch 代表 leader 的紀元資訊(epoch),初始值為0。每當 leader 變更一次,leader epoch 的值就會加1,相當於為 leader 增設了一個版本號。
每個副本中還會增設一個向量 <LeaderEpoch => StartOffset>,其中 StartOffset 表示當前 LeaderEpoch 下寫入的第一條訊息的偏移量。
假設有兩個節點A和B,B是leader節點,裡面的資料如圖:
A發生重啟,之後A不是先忙著截斷日誌而是先發送OffsetsForLeaderEpochRequest請求給B,B作為目前的leader在收到請求之後會返回當前的LEO(LogEndOffset,注意圖中LE0和LEO的不同),與請求對應的響應為OffsetsForLeaderEpochResponse。如果 A 中的 LeaderEpoch(假設為 LE_A)和 B 中的不相同,那麼 B 此時會查詢 LeaderEpoch 為 LE_A+1 對應的 StartOffset 並返回給 A
如上圖所示,A 在收到2之後發現和目前的 LEO 相同,也就不需要截斷日誌了,以此來保護資料的完整性。
再如,之後 B 發生了宕機,A 成為新的 leader,那麼對應的 LE=0 也變成了 LE=1,對應的訊息 m2 此時就得到了保留。後續的訊息都可以以 LE1 為 LeaderEpoch 陸續追加到 A 中。這個時候A就會有兩個LE,第二LE所記錄的Offset從2開始。如果B恢復了,那麼就會從A中獲取到LE+1的Offset為2的值返回給B。
再來看看LE如何解決資料不一致的問題:
當前 A 為 leader,B 為 follower,A 中有2條訊息 m1 和 m2,而 B 中有1條訊息 m1。假設 A 和 B 同時“掛掉”,然後 B 第一個恢復過來併成為新的 leader。
之後 B 寫入訊息 m3,並將 LEO 和 HW 更新至2,如下圖所示。注意此時的 LeaderEpoch 已經從 LE0 增至 LE1 了。
緊接著 A 也恢復過來成為 follower 並向 B 傳送 OffsetsForLeaderEpochRequest 請求,此時 A 的 LeaderEpoch 為 LE0。B 根據 LE0 查詢到對應的 offset 為1並返回給 A,A 就截斷日誌並刪除了訊息 m2,如下圖所示。之後 A 傳送 FetchRequest 至 B 請求來同步資料,最終A和B中都有兩條訊息 m1 和 m3,HW 和 LEO都為2,並且 LeaderEpoch 都為 LE1,如此便解決了資料不一致的問題。
為什麼Kafka不支援讀寫分離?
因為這樣有兩個明顯的缺點:
- 資料一致性問題。資料從主節點轉到從節點必然會有一個延時的時間視窗,這個時間視窗會導致主從節點之間的資料不一致。
- 延時問題。資料從寫入主節點到同步至從節點中的過程需要經歷網路→主節點記憶體→主節點磁碟→網路→從節點記憶體→從節點磁碟這幾個階段。對延時敏感的應用而言,主寫從讀的功能並不太適用。
對於Kafka來說,必要性不是很高,因為在Kafka叢集中,如果存在多個副本,經過合理的配置,可以讓leader副本均勻的分佈在各個broker上面,使每個 broker 上的讀寫負載都是一樣的。
Kafka中的延遲佇列怎麼實現
在傳送延時訊息的時候並不是先投遞到要傳送的真實主題(real_topic)中,而是先投遞到一些 Kafka 內部的主題(delay_topic)中,這些內部主題對使用者不可見,然後通過一個自定義的服務拉取這些內部主題中的訊息,並將滿足條件的訊息再投遞到要傳送的真實的主題中,消費者所訂閱的還是真實的主題。
如果採用這種方案,那麼一般是按照不同的延時等級來劃分的,比如設定5s、10s、30s、1min、2min、5min、10min、20min、30min、45min、1hour、2hour這些按延時時間遞增的延時等級,延時的訊息按照延時時間投遞到不同等級的主題中,投遞到同一主題中的訊息的延時時間會被強轉為與此主題延時等級一致的延時時間,這樣延時誤差控制在兩個延時等級的時間差範圍之內(比如延時時間為17s的訊息投遞到30s的延時主題中,之後按照延時時間為30s進行計算,延時誤差為13s)。雖然有一定的延時誤差,但是誤差可控,並且這樣只需增加少許的主題就能實現延時佇列的功能。
傳送到內部主題(delay_topic_*)中的訊息會被一個獨立的 DelayService 程序消費,這個 DelayService 程序和 Kafka broker 程序以一對一的配比進行同機部署(參考下圖),以保證服務的可用性。
針對不同延時級別的主題,在 DelayService 的內部都會有單獨的執行緒來進行訊息的拉取,以及單獨的 DelayQueue(這裡用的是 JUC 中 DelayQueue)進行訊息的暫存。與此同時,在 DelayService 內部還會有專門的訊息傳送執行緒來獲取 DelayQueue 的訊息並轉發到真實的主題中。從消費、暫存再到轉發,執行緒之間都是一一對應的關係。如下圖所示,DelayService 的設計應當儘量保持簡單,避免鎖機制產生的隱患。
為了保障內部 DelayQueue 不會因為未處理的訊息過多而導致記憶體的佔用過大,DelayService 會對主題中的每個分割槽進行計數,當達到一定的閾值之後,就會暫停拉取該分割槽中的訊息。
因為一個主題中一般不止一個分割槽,分割槽之間的訊息並不會按照投遞時間進行排序,DelayQueue的作用是將訊息按照再次投遞時間進行有序排序,這樣下游的訊息傳送執行緒就能夠按照先後順序獲取最先滿足投遞條件的訊息。
Kafka中怎麼實現死信佇列和重試佇列?
死信可以看作消費者不能處理收到的訊息,也可以看作消費者不想處理收到的訊息,還可以看作不符合處理要求的訊息。比如訊息內包含的訊息內容無法被消費者解析,為了確保訊息的可靠性而不被隨意丟棄,故將其投遞到死信佇列中,這裡的死信就可以看作消費者不能處理的訊息。再比如超過既定的重試次數之後將訊息投入死信佇列,這裡就可以將死信看作不符合處理要求的訊息。
重試佇列其實可以看作一種回退佇列,具體指消費端消費訊息失敗時,為了防止訊息無故丟失而重新將訊息回滾到 broker 中。與回退佇列不同的是,重試佇列一般分成多個重試等級,每個重試等級一般也會設定重新投遞延時,重試次數越多投遞延時就越大。
理解了他們的概念之後我們就可以為每個主題設定重試佇列,訊息第一次消費失敗入重試佇列 Q1,Q1 的重新投遞延時為5s,5s過後重新投遞該訊息;如果訊息再次消費失敗則入重試佇列 Q2,Q2 的重新投遞延時為10s,10s過後再次投遞該訊息。
然後再設定一個主題作為死信佇列,重試越多次重新投遞的時間就越久,並且需要設定一個上限,超過投遞次數就進入死信佇列。重試佇列與延時佇列有相同的地方,都需要設定延時級別。
Kafka中怎麼做訊息審計?
訊息審計是指在訊息生產、儲存和消費的整個過程之間對訊息個數及延遲的審計,以此來檢測是否有資料丟失、是否有資料重複、端到端的延遲又是多少等內容。
目前與訊息審計有關的產品也有多個,比如 Chaperone(Uber)、Confluent Control Center、Kafka Monitor(LinkedIn),它們主要通過在訊息體(value 欄位)或在訊息頭(headers 欄位)中內嵌訊息對應的時間戳 timestamp 或全域性的唯一標識 ID(或者是兩者兼備)來實現訊息的審計功能。
內嵌 timestamp 的方式主要是設定一個審計的時間間隔 time_bucket_interval(可以自定義設定幾秒或幾分鐘),根據這個 time_bucket_interval 和訊息所屬的 timestamp 來計算相應的時間桶(time_bucket)。
內嵌 ID 的方式就更加容易理解了,對於每一條訊息都會被分配一個全域性唯一標識 ID。如果主題和相應的分割槽固定,則可以為每個分割槽設定一個全域性的 ID。當有訊息傳送時,首先獲取對應的 ID,然後內嵌到訊息中,最後才將它傳送到 broker 中。消費者進行消費審計時,可以判斷出哪條訊息丟失、哪條訊息重複。
Kafka中怎麼做訊息軌跡?
訊息軌跡指的是一條訊息從生產者發出,經由 broker 儲存,再到消費者消費的整個過程中,各個相關節點的狀態、時間、地點等資料匯聚而成的完整鏈路資訊。生產者、broker、消費者這3個角色在處理訊息的過程中都會在鏈路中增加相應的資訊,將這些資訊匯聚、處理之後就可以查詢任意訊息的狀態,進而為生產環境中的故障排除提供強有力的資料支援。
對訊息軌跡而言,最常見的實現方式是封裝客戶端,在保證正常生產消費的同時新增相應的軌跡資訊埋點邏輯。無論生產,還是消費,在執行之後都會有相應的軌跡資訊,我們需要將這些資訊儲存起來。
我們同樣可以將軌跡資訊儲存到 Kafka 的某個主題中,比如下圖中的主題 trace_topic。
生產者在將訊息正常傳送到使用者主題 real_topic 之後(或者消費者在拉取到訊息消費之後)會將軌跡資訊傳送到主題 trace_topic 中。
怎麼計算Lag?(注意read_uncommitted和read_committed狀態下的不同)
如果消費者客戶端的 isolation.level 引數配置為“read_uncommitted”(預設),它對應的 Lag 等於HW – ConsumerOffset 的值,其中 ConsumerOffset 表示當前的消費位移。
如果這個引數配置為“read_committed”,那麼就要引入 LSO 來進行計算了。LSO 是 LastStableOffset 的縮寫,它對應的 Lag 等於 LSO – ConsumerOffset 的值。
- 首先通過 DescribeGroupsRequest 請求獲取當前消費組的元資料資訊,當然在這之前還會通過 FindCoordinatorRequest 請求查詢消費組對應的 GroupCoordinator。
- 接著通過 OffsetFetchRequest 請求獲取消費位移 ConsumerOffset。
- 然後通過 KafkaConsumer 的 endOffsets(Collection partitions)方法(對應於 ListOffsetRequest 請求)獲取 HW(LSO)的值。
- 最後通過 HW 與 ConsumerOffset 相減得到分割槽的 Lag,要獲得主題的總體 Lag 只需對旗下的各個分割槽累加即可。
Kafka有哪些指標需要著重關注?
比較重要的 Broker 端 JMX 指標:
- BytesIn/BytesOut:即 Broker 端每秒入站和出站位元組數。你要確保這組值不要接近你的網路頻寬,否則這通常都表示網絡卡已被“打滿”,很容易出現網路丟包的情形。
- NetworkProcessorAvgIdlePercent:即網路執行緒池執行緒平均的空閒比例。通常來說,你應該確保這個 JMX 值長期大於 30%。如果小於這個值,就表明你的網路執行緒池非常繁忙,你需要通過增加網路執行緒數或將負載轉移給其他伺服器的方式,來給該 Broker 減負。
- RequestHandlerAvgIdlePercent:即 I/O 執行緒池執行緒平均的空閒比例。同樣地,如果該值長期小於 30%,你需要調整 I/O 執行緒池的數量,或者減少 Broker 端的負載。
- UnderReplicatedPartitions:即未充分備份的分割槽數。所謂未充分備份,是指並非所有的 Follower 副本都和 Leader 副本保持同步。一旦出現了這種情況,通常都表明該分割槽有可能會出現資料丟失。因此,這是一個非常重要的 JMX 指標。
- ISRShrink/ISRExpand:即 ISR 收縮和擴容的頻次指標。如果你的環境中出現 ISR 中副本頻繁進出的情形,那麼這組值一定是很高的。這時,你要診斷下副本頻繁進出 ISR 的原因,並採取適當的措施。
- ActiveControllerCount:即當前處於啟用狀態的控制器的數量。正常情況下,Controller 所在 Broker 上的這個 JMX 指標值應該是 1,其他 Broker 上的這個值是 0。如果你發現存在多臺 Broker 上該值都是 1 的情況,一定要趕快處理,處理方式主要是檢視網路連通性。這種情況通常表明叢集出現了腦裂。腦裂問題是非常嚴重的分散式故障,Kafka 目前依託 ZooKeeper 來防止腦裂。但一旦出現腦裂,Kafka 是無法保證正常工作的。
Kafka的那些設計讓它有如此高的效能?
- 分割槽
kafka是個分散式叢集的系統,整個系統可以包含多個broker,也就是多個伺服器例項。每個主題topic會有多個分割槽,kafka將分割槽均勻地分配到整個叢集中,當生產者向對應主題傳遞訊息,訊息通過負載均衡機制傳遞到不同的分割槽以減輕單個伺服器例項的壓力。
一個Consumer Group中可以有多個consumer,多個consumer可以同時消費不同分割槽的訊息,大大的提高了消費者的並行消費能力。但是一個分割槽中的訊息只能被一個Consumer Group中的一個consumer消費。
網路傳輸上減少開銷
批量傳送:
在傳送訊息的時候,kafka不會直接將少量資料傳送出去,否則每次傳送少量的資料會增加網路傳輸頻率,降低網路傳輸效率。kafka會先將訊息快取在記憶體中,當超過一個的大小或者超過一定的時間,那麼會將這些訊息進行批量傳送。
端到端壓縮:
當然網路傳輸時資料量小也可以減小網路負載,kafaka會將這些批量的資料進行壓縮,將一批訊息打包後進行壓縮,傳送broker伺服器後,最終這些資料還是提供給消費者用,所以資料在伺服器上還是保持壓縮狀態,不會進行解壓,而且頻繁的壓縮和解壓也會降低效能,最終還是以壓縮的方式傳遞到消費者的手上。順序讀寫
kafka將訊息追加到日誌檔案中,利用了磁碟的順序讀寫,來提高讀寫效率。零拷貝技術
零拷貝將檔案內容從磁碟通過DMA引擎複製到核心緩衝區,而且沒有把資料複製到socket緩衝區,只是將資料位置和長度資訊的描述符複製到了socket快取區,然後直接將資料傳輸到網路介面,最後傳送。這樣大大減小了拷貝的次數,提高了效率。kafka正是呼叫linux系統給出的sendfile系統呼叫來使用零拷貝。Java中的系統呼叫給出的是FileChannel.transferTo介面。
- 優秀的檔案儲存機制
如果分割槽規則設定得合理,那麼所有的訊息可以均勻地分佈到不同的分割槽中,這樣就可以實現水平擴充套件。不考慮多副本的情況,一個分割槽對應一個日誌(Log)。為了防止 Log 過大,Kafka 又引入了日誌分段(LogSegment)的概念,將 Log 切分為多個 LogSegment,相當於一個巨型檔案被平均分配為多個相對較小的檔案,這樣也便於訊息的維護和清理。
Kafka 中的索引檔案以稀疏索引(sparse index)的方式構造訊息的索引,它並不保證每個訊息在索引檔案中都有對應的索引項。每當寫入一定量(由 broker 端引數 log.index.interval.bytes 指定,預設值為4096,即 4KB)的訊息時,偏移量索引檔案和時間戳索引檔案分別增加一個偏移量索引項和時間戳索引項,增大或減小 log.index.interval.bytes 的值,對應地可以增加或縮小索引項的密度