kafka 分割槽重新分配指令碼
阿新 • • 發佈:2020-06-28
注:
適用於kafka叢集線上擴縮容;分割槽重新分配
cat kafka_partition_reassignment.sh
#!/bin/bash KAFKA_BIN="/app/kafka_2.12-2.1.0/bin" zk_server="127.0.0.1:2181" reassignment_json_file="part.json" reassignment_borker_ids="200,201,202,203,204,205,206,207,208" #填寫叢集brokerID,擴容的話增加ID,縮容減少ID即可。 topic_list=`$KAFKA_BIN/kafka-topics.sh --list --zookeeper $zk_server|grep -v __consumer_offsets` # kafka叢集所有topic分割槽分配 #topic_list=$1 #單個topic 分割槽分配使用該引數 for n in `echo $topic_list` do cat >move.json <<EOF {"topics": [{"topic": "$n"}], "version":1 } EOF $KAFKA_BIN/kafka-reassign-partitions.sh --zookeeper $zk_server --topics-to-move-json-file move.json --broker-list "$reassignment_borker_ids" --generate|tail -1 >$reassignment_json_file $KAFKA_BIN/kafka-reassign-partitions.sh --zookeeper $zk_server --reassignment-json-file $reassignment_json_file --execute sleep 10 for n in `seq 10000` do process_num=`$KAFKA_BIN/kafka-reassign-partitions.sh --zookeeper $zk_server --reassignment-json-file $reassignment_json_file --verify|awk '{print $NF}'|grep "progress"|wc -l` if [ "$process_num" -eq 0 ];then break else $KAFKA_BIN/kafka-reassign-partitions.sh --zookeeper $zk_server --reassignment-json-file $reassignment_json_file --verify sleep 10 fi done done echo "檢視進度:$KAFKA_BIN/kafka-reassign-partitions.sh --zookeeper $zk_server --reassignment-json-file $reassignment_json_file --verify"