1. 程式人生 > >更加深入理解Kafka--Producer篇(下)

更加深入理解Kafka--Producer篇(下)

批次

積累器在建立批次之前,就在堆上為它預分配一段空間,這段空間用於裝載訊息。訊息最終會順序落到記憶體塊中形成訊息集。批次的邏輯結構如下:
批次邏輯結構.jpg

5.0.1

* MemoryRecords即訊息集的抽象,它容納0到多條Record。
* Record則代表訊息在記憶體中的狀態,即按二進位制協議格式化之後的訊息結構,它是訊息集的元素。
* 使用者可通過compression.type配置壓縮方式,開啟壓縮可顯著增大記憶體使用率、同時減少網路開銷。Compressor負責壓縮訊息,它的屬性appendStream是個包裝流,其結構是DataOutputStream—>壓縮處理流—>ByteBufferStream。
壓縮器.png

5.0.2

批次失效會關閉訊息集使其變為只讀狀態,並引起Compressor關閉:釋放全部I/O資源並在開啟壓縮時在緩衝頭部位置填充協議元資料。關閉後緩衝將不再有訊息寫入,它被回給訊息集並flip後等待發送。

資料協議

批次是訊息儲存的最小物理單元,讀取時就只能按批次整塊讀取,因此如果沒有標準資料協議就無法對資料塊做反序列化。

Kafka把訊息分割成寫前日誌、協議頭和協議體三部分,協議頭和協議體合成協議正文。日誌標識訊息在批次中的相對順序和原始正文大小;訊息頭宣告CRC、魔數和屬性;最後訊息體記錄追加時間以及key和value值。

訊息物理結構.png

5.1.1

CRC即checkSum值,用於校驗訊息是否完整;魔數用於宣告所用協議版本;屬性佔1個位元組即8位,目前只使用了前三位,每一位代表一種壓縮協議,為0即不壓縮;key和value幾乎一致,前4個位元組標識內容長度,如果內容為-1,則表示無內容填入。

當開啟壓縮時,Compressor會對訊息集偏移在起始位置預留出報文頭長度的位置,在批次關閉後再將報文頭相關資料寫入,因為正文長度、payload長度以及訊息數量都只能在訊息只讀後確定。報文頭加上訊息集才是完整的壓縮報文。壓縮報文結構和訊息幾乎一致,也分日誌和正文兩個部分,但是在個別屬性上會有細微差異:1)offset分別被用於標識訊息數量;2)沒有key值,所有key長度都是-1;3)value長度是訊息集(壓縮後)的長度,payload就是訊息集本身。報文頭並不會被壓縮,因此可以很容易被讀取,程式識別報文的長度、壓縮協議、版本號以及CRC等屬性之後就可以選用合適的方式讀取一定長度的訊息以及校驗批次的完整性。

批次管理

批次建立後會逗留linger.ms時間,它集聚該段時間內歸屬該分組(區)的訊息。如果生產速率特別高又或者有超大訊息流入很快將分割槽打滿,則實際逗留時間會低於linger.ms。想象一下極端場景,批次大小預設16k,如果訊息以5k、12k間隔發,則記憶體實際利用率只有(5+12)/(2*16)。

另一方面,積累器擠出前先要做就緒節點檢查,擠出動作也只針對leader在這些節點上的分割槽批次,但節點ready to drain後,可能因為連線或者inflightRequests超限等問題,被從傳送就緒列表移除,從而導致這些節點的可傳送批次不會被擠出。它們始終佔據分組佇列的最高擠出優先順序,這會導致:1)後追加的訊息被積壓,即使連線恢復後新入的訊息也只能等待順序處理,整體投遞延時猛增。2)批次佔據的記憶體得不到釋放,有可能發生雪崩:因為只有追加沒有擠出,問題節點的批次有可能佔滿全部記憶體空間導致其他正常節點分割槽無法為新批次申請空間。Kafka提供請求超時timeout.ms解決這個問題,從逗留截止開始計算批次超時則被廢棄–釋放記憶體空間並從分組佇列移除。

理想狀況下,單位時間內追入和擠出應該恰好相等且記憶體被充分使用。長期觀察下調好linger.ms、batch.size、timeout.ms以及batch.size和buffer.memory這幾個引數將有助於達到這個目標。

記憶體管理

訊息集記憶體直接分配在堆上,如果對它不加以限制在訊息生產速率足夠高時很可能頻繁出現fgc乃至oom,另一方面頻繁的記憶體申請和釋放操作也很吃系統資源,因此Kafka自建了記憶體池BufferPool管理記憶體。

記憶體池有四個關鍵屬性:totalMemory代表記憶體池上限,由buffer.memory決定;poolableSize指池化記憶體塊大小,由batch.size設定;free和availableMemory則分別代表池化記憶體和閒置記憶體大小。注意free和available的區別,前者是已申請但未使用,後者是未申請未使用,它們之間關係:totalMemory= 可使用空間+已使用空間,可使用空間=availableMemory+free.size()*poolableSize代表。

只有固定大小的記憶體塊被釋放後才會進入池化列表,非常規釋放後只會增加可用記憶體大小,而釋放記憶體則由虛擬機器回收。因此如果超大訊息比較多,依然有可能會引起fgc乃至oom。

積累器通過記憶體池預分配訊息集記憶體,如果沒有足夠記憶體則使用者主執行緒被放入有序佇列並進入等待。批在批次done時釋放出部分空間,同時喚醒隊首執行緒,如果沒有釋放出足夠的空間則繼續進入等待,如果已經釋放出足夠空間,分配空間且執行緒出隊。
記憶體池.png

5.3.1