1. 程式人生 > >Kafka 資料遷移(增加節點和減少節點均適用)

Kafka 資料遷移(增加節點和減少節點均適用)

當Kafka 減少Broker節點後,需要把資料分割槽遷移到其他節點上,以下將介紹我的一次遷移驗證過程。

前3步為環境準備,實際資料操作看第4步即可

增加Broker節點,也可以採用步驟4相同的方法進行重新分割槽

方案思想:使用kafka-reassign-partitions命令,把partition重新分配到指定的Broker上

 

1、建立測試topic,具有3個分割槽,2個副本

kafka-topics --create --topic test-topic \
--zookeeper cdh-002/kafka \
--replication-factor 2 --partitions 3

2、檢視test-topic

kafka-topics --describe --zookeeper cdh-002/kafka --topic test-topic

3、產生若干條資料

kafka-console-producer --topic test-topic \
--broker-list cdh-004:9092

 

4、kafka-reassign-partitions重分割槽(假設需要減少節點broker 75,本測試通過關閉對應kafka broker節點模擬)

 

(1) 新建檔案topic-to-move.json ,比加入如下內容

{"topics": [{"topic":"test-topic"}], "version": 1}

(2) 使用--generate生成遷移計劃,broker-list根據自己環境設定,我的環境由於broker 75掛掉了,只剩下76和77

kafka-reassign-partitions --zookeeper cdh-002/kafka \
--topics-to-move-json-file /opt/lb/topic-to-move.json \
--broker-list "76,77" --generate

 

輸出日誌:(從日誌可知各個分割槽副本所在的Broker節點,以及建議的副本分佈)

Current partition replica assignment  (當前分割槽副本分佈)

{"version":1,"partitions":[{"topic":"test-topic","partition":0,"replicas":[76,77]},{"topic":"test-topic","partition":2,"replicas":[75,76]},{"topic":"test-topic","partition":1,"replicas":[77,75]}]}

 

Proposed partition reassignment configuration  (建議分割槽副本分佈)

{"version":1,"partitions":[{"topic":"test-topic","partition":0,"replicas":[76,77]},{"topic":"test-topic","partition":2,"replicas":[76,77]},{"topic":"test-topic","partition":1,"replicas":[77,76]}]}

 

(3) 新建檔案kafka-reassign-execute.json,並把建議的分割槽副本分佈配置拷貝到新建檔案中

(4) 使用--execute執行遷移計劃  (有資料移動,broker 75上的資料會移到broker 76和77上,如果資料量大,執行的時間會比較久,耐心等待即可)

kafka-reassign-partitions --zookeeper cdh-002/kafka \
--reassignment-json-file /opt/lb/kafka-reassign-execute.json \
--execute

(5)使用-verify檢視遷移進度

kafka-reassign-partitions --zookeeper cdh-002/kafka \
--reassignment-json-file /opt/lb/kafka-reassign-execute.json \
--verify

(6)通過消費者驗證,可知,並未丟失資料。注意需要加--from-beginning。(此時broker 75和77同時宕機,也不會丟失資料,因為76上有了所有分割槽的副本)

kafka-console-consumer --topic test-topic --from-beginning --zookeeper cdh-002/kafka

另外一種驗證方法是:  (生產上一般都用這個)

另外一種驗證方法就是通過檢視Kafka儲存路徑來確認,是否有遷移資料

[[email protected] ~]# cd /var/local/kafka/data/
[[email protected] data]# ll
rwxr-xr-x 2 kafka kafka  110 Oct 23 14:21 test-topic-0
drwxr-xr-x 2 kafka kafka  110 Oct 23 14:52 test-topic-1
drwxr-xr-x 2 kafka kafka  110 Oct 23 14:21 test-topic-2