三、消息處理過程與集群維護
一、Kafka消息組織原理
1.磁盤重認識
當需要從磁盤讀取數據時,要確定讀的數據在哪個磁道,哪個扇區:
首先必須找到柱面,即磁頭需要移動對準相應磁道,這個過程叫做尋道,所耗費時間叫做尋道時間;
然後目標扇區旋轉到磁頭下,這個過程耗費的時間叫做旋轉時間;
一次訪盤請求(讀/寫)完成過程由三個動作組成
尋道(時間):磁頭移動定位到指定磁道;
旋轉延遲(時間):等待指定扇區從磁頭下旋轉經過;
數據傳輸(時間):數據在磁盤、內存與網絡之間的實際傳輸
由於存儲介質的特性,磁盤本身存取就比主存慢,再加上機械運動耗費,磁盤的存取速度往往是主存的幾百分之一甚至幾千分支一
根據數據的局部性原理 ,有以下兩種方法
•預讀或者提前讀;
•合並寫——多個邏輯上的寫操作合並成一個大的物理寫操作中;
即采用磁盤順序讀寫(不需要尋道時間,只需很少的旋轉時間)。實驗結果:在一個6 7200rpm SATA RAID-5 的磁盤陣列上線性寫的速度大概是300M/秒,但是隨機寫的速度只有50K/秒,兩者相差將近10000倍。
2.Kafka消息的寫入原理
一般的將數據從文件傳到套接字的路徑:
1.操作系統將數據從磁盤讀到內核空間的頁緩存中;
2.應用將數據從內核空間讀到用戶空間的緩存中;
3.應用將數據寫回內存空間的套接字緩存中
4.操作系統將數據從套接字緩存寫到網卡緩存中,以便將數據經網絡發出;
這樣做明顯是低效的,這裏有四次拷貝,兩次系統調用。如果使用sendfile(Java 為: FileChannel.transferTo api),兩次拷貝可以被避免:允許操作系統將數據直接從頁緩存發送到網絡上。優化後,只有最後一步將數據拷貝到網卡緩存中是需要的。
Kafka topic信息
Kafka 消息文件存儲 (tree)3.Kafka消息的刪除原理
從最久的日誌段開始刪除(按日誌段為單位進行刪除),然後逐步向前推進,直到某個日誌段不滿足條件為止,刪除條件:
不能是當前激活日誌段;
大小不能小於日誌段的最小大小(配置項log.segment.bytes配置)
要刪除的是否是所有日誌段,如果是的話直接調用roll方法進行切分,因為Kafka至少要保留一個日誌段;
二、Kafka消息檢索原理
1.Kafka消息的segment file的組成和物理結構
2.Kafka消息的index file的組成和物理結構
3.Kafka消息檢索過程
以讀取offset=368776的message為例,需要通過下面2個步驟查找:
第一步查找segment file;
以上圖為例,其中00000000000000000000.index表示最開始的文件,起始偏移量(offset)為0.第二個文件00000000000000368769.index的消息量起始偏移量為368770 = 368769 + 1。只要根據offset二分查找文件列表,就可以快速定位到具體文件。當offset=368776時定位到00000000000000368769.index|log
第二步通過segment file查找message;
算出368776-368770=6,取00000000000000368769.index文件第三項(6,1407),得出從00000000000000368769.log文件頭偏移1407字節讀取一條消息即可
三、Kafka集群維護
1.Kafka集群基本信息實時查看和修改
集群信息實時查看(topic工具):
列出集群當前所有可用的topic:
bin/kafka-topics.sh --list –zookeeper zookeeper_address
查看集群特定topic 信息:
bin/kafka-topics.sh --describe --zookeeper zookeeper_address
--topic topic_name
集群信息實時修改(topic工具):
創建topic:
bin/kafka-topics.sh --create --zookeeper zookeeper_address --replication-factor 1 --partitions 1 --topic topic_name
增加(不能減少) partition(最後的4是增加後的值):
bin/kafka-topics.sh --zookeeper zookeeper_address --alter –topic topic_name --partitions 4
Topic-level configuration 配置都能修改
2.Kafka集群leader平衡機制
每個partitiion的所有replicas叫做“assigned replicas”,“assigned replicas”中的第一個replicas叫“preferred replica”,剛創建的topic一般“preferred replica”是leader。下圖中Partition 0的broker 2就是preferred replica”,默認會成為該分區的leader。
集群leader平衡:
bin/kafka-preferred-replica-election.sh –zookeeper zookeeper_address
auto.leader.rebalance.enable=true
3.Kafka集群分區日誌遷移
遷移topic數據到其他broker,請遵循下面四步:
寫json文件,文件格式如下:
cat topics-to-move.json
{"topics": [{"topic": "foo1"},
{"topic": "foo2"}],
"version":1
}
使用–generate生成遷移計劃(下面的操作是將topic: foo1和foo2移動到broker 5,6):
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --topics-to-move-json-file topics-to-move.json --broker-list "5,6" –generate
這一步只是生成計劃,並沒有執行數據遷移;
使用–execute執行計劃:
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file expand-cluster-reassignment.json –execute
執行前最好保存當前的分配情況,以防出錯回滾
•使用–verify驗證是否已經遷移完成 遷移某個topic的某些特定的partition數據到其他broker,步驟與上面一樣,但是json文件如下面所示: cat custom-reassignment.json {"version":1,"partitions":[{"topic":"foo1","partition":0,"replicas":[5,6]},{"topic":"foo2","partition":1,"replicas":[2,3]}]} 可以指定到topic的分區編號 kafka-reassign-partitions.sh工具會復制磁盤上的日誌文件,只有當完全復制完成,才會刪除遷移前磁盤上的日誌文件。執行分區日誌遷移需要註意: •kafka-reassign-partitions.sh 工具的粒度只能到broker,不能到broker的目錄(如果broker上面配置了多個目錄,是按照磁盤上面已駐留的分區數來均勻分配的),所以,如果topic之間的數據,或者topic的partition之間的數據本身就不均勻,很有可能造成磁盤數據的不均勻: •對於分區數據較多的分區遷移數據會花大量的時間,所以建議在topic數據量較少或磁盤有效數據較少的情況下執行數據遷移操作;
進行分區遷移時最好先保留一個分區在原來的磁盤,這樣不會影響正常的消費和生產,如果目的是將分區5(brober1,5)遷移到borker2,3。可以先將5遷移到2,1,最後再遷移到2,3。而不是一次將1,5遷移到2,3。因為一次遷移所有的副本,無法正常消費和生產,部分遷移則可以正常消費和生產
四、Kafka集群監控
1.Kafka Offset Monitor介紹
在生產環境需要集群高可用,所以需要對Kafka集群進行監控。Kafka Offset Monitor可以監控Kafka集群以下幾項:
Kafka集群當前存活的broker集合;
Kafka集群當前活動topic集合;
消費者組列表
Kafka集群當前consumer按組消費的offset lag數(即當前topic當前分區目前有多少消息積壓而沒有及時消費)
部署Kafka Offset Minotor: •github下載jar包KafkaOffsetMonitor-assembly-0.2.0.jar : https://github.com/quantifind/KafkaOffsetMonitor/releases •啟動Kafka Offset Minotor : java -cp KafkaOffsetMonitor-assembly-0.2.0.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb --zk zk-01,zk-02 --refresh 5.minutes --retain 1.day &
2.Kafka Offset Monitor使用
3.Kafka Manager介紹
Kafka Manager由雅虎開源,提供以下功能:
管理幾個不同的集群;
容易地檢查集群的狀態(topics, brokers, 副本的分布, 分區的分布) ;
選擇副本
基於集群的當前狀態產生分區分配
重新分配分區
Kafka Manager的安裝,方法二:
下載打包好的Kafka manager(也可以在課程源代碼處下載):
https://github.com/scootli/kafka-manager-1.0-SNAPSHOT/tree/master/kafka-manager-1.0-SNAPSHOT
下載後解壓
修改conf/application.conf,把Kafka-manager.zkhosts改為自己的zookeeper服務器地址
bin/kafka-manager -Dconfig.file=conf/application.conf -Dhttp.port=8007 &
4.Kafka Manager使用
三、消息處理過程與集群維護