Kafka監控工具KafkaOffsetMonitor配置及使用
一、KafkaOffsetMonitor簡述
KafkaOffsetMonitor是Kafka的一款客戶端消費監控工具,用來實時監控Kafka服務的Consumer以及它們所在的Partition中的Offset,我們可以瀏覽當前的消費者組,並且每個Topic的所有Partition的消費情況都可以一目瞭然。
二、KafkaOffsetMonitor下載
KafkaOffsetMonitor託管在Github上,可以通過Github下載。 下載地址:https://github.com/quantifind/KafkaOffsetMonitor/releases
或者下載百度網盤:連結:https://pan.baidu.com/s/1geEBEvT 密碼:jaeu
三、KafkaOffsetMonitor啟動
將下載下來的KafkaOffsetMonitor jar包上傳到linux上,可以新建一個目錄KafkaMonitor,用於存放KafkaOffsetMonitor-assembly-0.2.0.jar進入到KafkaMonitor目錄下,通過java編譯命令來執行這個jar包:
[[email protected] KafkaMonitor]# java -cp KafkaOffsetMonitor-assembly-0.2.0.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb --zk 10.0.0.50:12181,10.0.0.60:12181,10.0.0.70:12181 --port 8088 --refresh 5.seconds --retain 1.days 按回車後,可以看到控制檯輸出: serving resources from: jar:file:/data/KafkaMonitor/KafkaOffsetMonitor-assembly-0.2.0.jar!/offsetapp SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. 2018-01-05 21:17:36.267:INFO:oejs.Server:jetty-7.x.y-SNAPSHOT log4j:WARN No appenders could be found for logger (org.I0Itec.zkclient.ZkConnection). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. 2018-01-05 21:17:36.630:INFO:oejsh.ContextHandler:started o.e.j.s.ServletContextHandler{/,jar:file:/data/KafkaMonitor/KafkaOffsetMonitor-assembly-0.2.0.jar!/offsetapp} 2018-01-05 21:17:36.662:INFO:oejs.AbstractConnector:Started[email protected]:8088
如果沒有指定埠,則預設會開啟一個隨機埠。
引數說明: zk :zookeeper主機地址,如果有多個,用逗號隔開 port :應用程式埠 refresh :應用程式在資料庫中重新整理和儲存點的頻率 retain :在db中保留多長時間 dbName :儲存的資料庫檔名,預設為offsetapp
為了更方便的啟動KafkaOffsetMonitor,可以寫一個啟動指令碼來直接執行,我這裡新建一個名為:kafka-monitor-start.sh的指令碼,然後編輯這個指令碼:
[[email protected] KafkaMonitor]# vim kafka-monitor-start.sh java -Xms512M -Xmx512M -Xss1024K -XX:PermSize=256m -XX:MaxPermSize=512m -cp KafkaOffsetMonitor-assembly-0.2.0.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb \ --port 8088 \ --zk 10.0.0.50:12181,10.0.0.60:12181,10.0.0.70:12181 \ --refresh 5.minutes \ --retain 1.day >/dev/null 2>&1;
然後退出儲存即可,接下來修改一下kafka-monitor-start.sh的許可權
[[email protected] KafkaMonitor]# chmod +x kafka-monitor-start.sh
啟動KafkaOffsetMonitor:
[[email protected] KafkaMonitor]# nohup /data/KafkaMonitor/kafka-monitor-start.sh & [1] 6551 [[email protected] KafkaMonitor]# lsof -i:8088 COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME java 6552 root 16u IPv6 26047 0t0 TCP *:radan-http (LISTEN)
四、KafkaOffsetMonitor Web UI
在遊覽器中輸入:http://ip:port即可以檢視KafkaOffsetMonitor Web UI,如下圖:
在下圖中有一個Visualizations選項卡,點選其中的Cluster Overview可以檢視當前Kafka叢集的Broker情況
五、簡單的Producer
1、新建一個Topic
首先為本次試驗新建一個Topic,命令如下:
bin/kafka-topics.sh \ --create \ --zookeeper 10.0.0.50:12181 \ --replication-factor 3 \ --partition 3 \ --topic kafkamonitor-simpleproducer
2、新建SimpleProducer程式碼
在上一篇文章中提到的Producer封裝Github程式碼的基礎上,寫了一個往kafkamonitor-simpleproducer傳送message的java程式碼。
import com.ckm.kafka.producer.impl.KafkaProducerToolImpl; import com.ckm.kafka.producer.inter.KafkaProducerTool; /** * Created by ckm on 2016/8/30. */ public class SimpleProducer { public static void main(String[] args) { KafkaProducerTool kafkaProducerTool = new KafkaProducerToolImpl(); int i = 0; String message = ""; while (true) { message = "test-simple-producer : " + i ++; kafkaProducerTool.publishMessage("kafkamonitor-simpleproducer", message); } } }
程式執行效果:
3、ConsoleConsumer消費該topic
用kafka自帶的ConsoleConsumer消費kafkamonitor-simpleproducer中的message。
bin/kafka-console-consumer.sh --zookeeper m000:2181 --from-beginning --topic kafkamonitor-simpleproducer
消費截圖如下:
4、KafkaOffsetMonitor頁面
(1)在Topic List選項卡中,我們可以看到剛才新建的kafkamonitor-simpleproducer
(2)點開後,能看到有一個console-consumer正在消費該topic
(3)繼續進入該Consumer,可以檢視該Consumer當前的消費狀況
這張圖片的左上角顯示了當前Topic的生產速率,右上角顯示了當前Consumer的消費速率。
圖片中還有三種顏色的線條,藍色的表示當前Topic中的Message數目,灰色的表示當前Consumer消費的offset位置,紅色的表示藍色灰色的差值,即當前Consumer滯後於Producer的message數目。
(4)看一眼各partition中的message消費情況
從上圖可以看到,當前有3個Partition,每個Partition中的message數目分佈很不均勻。這裡可以與接下來的自定義Producer的情況進行一個對比。
六、自定義Partitioner的Producer
1、新建一個Topic
bin/kafka-topics.sh \ --create \ --zookeeper 10.0.0.50:12181 \ --replication-factor 3 \ --partition 3 \ --topic kafkamonitor-partitionedproducer
2、Partitioner程式碼
邏輯很簡單,迴圈依次往各Partition中傳送message。
import kafka.producer.Partitioner; /** * Created by ckm on 2018/1/8. */ public class TestPartitioner implements Partitioner { public TestPartitioner() { } @Override public int partition(Object key, int numPartitions) { int intKey = (int) key; return intKey % numPartitions; } }
3、Producer程式碼
將自定義的Partitioner設定到Producer,其他呼叫過程和二中類似。
import com.ckm.kafka.producer.impl.KafkaProducerToolImpl; import com.ckm.kafka.producer.inter.KafkaProducerTool; /** * Created by ckm on 2016/8/30. */ public class PartitionedProducer { public static void main(String[] args) { KafkaProducerTool kafkaProducerTool = new KafkaProducerToolImpl(); kafkaProducerTool.getProducerProperties().put("partitioner.class", "TestPartitioner"); int i = 0; String message = ""; while (true) { message = "test-partitioner-producer : " + i; System.out.println(message); kafkaProducerTool.publishPartitionedMessage("kafkamonitor-partitionedproducer", i + "", message); i ++; } } }
程式碼執行效果如下圖:
4、ConsoleConsumer消費Message
bin/kafka-console-consumer.sh --zookeeper 10.0.0.50:12181 --from-beginning --topic kafkamonitor-partitionedproducer
消費效果如下圖:
5、KafkaOffsetMonitor頁面
其他頁面與上面的類似,這裡只觀察一下每個partition中的message數目與第二節中的對比。可以看到這裡每個Partition中message分別是很均勻的。
注意事項: 注意這裡有一個坑,預設情況下Producer往一個不存在的Topic傳送message時會自動建立這個Topic。由於在這個封裝中,有同時傳遞message和topic的情況,如果呼叫方法時傳入的引數反了,將會在Kafka叢集中自動建立Topic。在正常情況下,應該是先把Topic根據需要建立好,然後Producer往該Topic傳送Message,最好把Kafka這個預設自動建立Topic的功能關掉。 那麼,假設真的不小心建立了多餘的Topic,在刪除時,會出現“marked for deletion”提示,只是將該topic標記為刪除,使用list命令仍然能看到。如果需要調整這兩個功能的話,在server.properties中配置如下兩個引數:
引數 | 預設值 | 作用 |
---|---|---|
auto.create.topics.enable | true | Enable auto creation of topic on the server |
delete.topic.enable | false | Enables delete topic. Delete topic through the admin tool will have no effect if this config is turned off |
七,KafkaOffsetMonitor 總結
KafkaOffsetMonitor:程式一個jar包的形式執行,部署較為方便。只有監控功能,使用起來也較為安全。 除了KafkaOffsetMonitor,Kafka監控工具還有另外兩款: Kafka Web Console:監控功能較為全面,可以預覽訊息,監控Offset、Lag等資訊,但存在bug,不建議在生產環境中使用。 Kafka Manager:偏向Kafka叢集管理,若操作不當,容易導致叢集出現故障。對Kafka實時生產和消費訊息是通過JMX實現的。沒有記錄Offset、Lag等資訊。