1. 程式人生 > >kafka命令(整理)

kafka命令(整理)

1、啟動Kafka:       bin/kafka-server-start.sh config/server.properties
2、建立topic:       bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 24 --topic topic_test 
3、傳送訊息:       bin/kafka-console-producer.sh --broker-list localhost:9092 --topic page_visits

4、消費訊息:        bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic page_visits --from-beginning(這個會從頭開始消費)         bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic page_visits (消費從未消費過的訊息)
5、刪除topic:
       bin/kafka-topics.sh --zookeeper zk_host:port --delete --topic my_topic_name
6、檢視topic:       ./kafka-topics.sh --zookeeper  zk_host:port --list
7、增加kafka分割槽:        ./kafka-topics.sh --zookeeper 
  zk_host:port --alter  --topic us_forward  --partitions 24
8、檢視topic的分割槽:       ./kafka-topics.sh --zookeeper  zk_host:port --describe --topic us_general
9、檢視各個消費組的偏移量:       ./kafka-run-class.sh  kafka.tools.ConsumerOffsetChecker --zookeeper  zk_host:port -group test_groupid
10、重新平衡分割槽和領導者:         ./kafka-preferred-replica-election.sh --zookeeper  zk_host:port
11、描述kafka的topic        ./bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic test-topic 
11、擴充套件叢集:         

1)、先把要轉移的topic寫成json格式topics-to-move.json

{"topics": [

{"topic": "SYNC_DATABASE_UPDATE"},

{"topic": "SYNC_REALTIME_ALARM"},

{"topic": "SYNC_STATION_INFO"},

{"topic": "SYNC_VEHICLE_REG"},

{"topic": "ds_ctrlreq"},

{"topic": "us_ctrlrsp"},

{"topic": "us_forward"},

{"topic": "us_general"},

{"topic": "us_packet"}

],

 "version":1

}

2)、用指令碼生成一個自動分割槽的計劃

./kafka-reassign-partitions.sh --zookeeper 192.168.2.70 --generate --topics-to-move-json-file topics-to-move.json  --broker-list 1,2,3 

3)、把生成的建議分割槽儲存成json  topics-to-move-execute.json

4)、執行分割槽計劃

/kafka-reassign-partitions.sh --zookeeper 192.168.2.70 --reassignment-json-file topics-to-move-execute.json --execute

5)、檢查叢集擴充套件是否完成

/kafka-reassign-partitions.sh --zookeeper 192.168.2.70 --reassignment-json-file topics-to-move-execute.json --verify

12、增加副本數

1)、編寫擴充套件檔案,可以利用擴充套件叢集的方式匯出現在的分佈然後做修改

{ "version":1,

"partitions":[{"topic":"foo","partition":0,"replicas":[5,6,7]}]

}

2)、執行分割槽計劃

/kafka-reassign-partitions.sh --zookeeper 192.168.2.70 --reassignment-json-file topics-to-move-execute.json --execute

3)、檢查叢集擴充套件是否完成

/kafka-reassign-partitions.sh --zookeeper 192.168.2.70 --reassignment-json-file topics-to-move-execute.json --verify