1. 程式人生 > >Kafka數據遷移

Kafka數據遷移

sql語句 follow 同學 提高 http 正常的 lis 允許 多個實例

1.概述

Kafka的使用場景非常廣泛,一些實時流數據業務場景,均依賴Kafka來做數據分流。而在分布式應用場景中,數據遷移是一個比較常見的問題。關於Kafka集群數據如何遷移,今天筆者將為大家詳細介紹。

2.內容

本篇博客為大家介紹兩種遷移場景,分別是同集群數據遷移、跨集群數據遷移。如下圖所示:

技術分享圖片

2.1 同集群遷移

同集群之間數據遷移,比如在已有的集群中新增了一個Broker節點,此時需要將原來集群中已有的Topic的數據遷移部分到新的集群中,緩解集群壓力。

將新的節點添加到Kafka集群很簡單,只需為它們分配一個唯一的Broker ID,並在新服務器上啟動Kafka。但是,這些新服務器節點不會自動分配任何數據分區,因此除非將分區移動到新增的節點,否則在創建新Topic之前新節點不會執行任何操作。因此,通常在將新服務器節點添加到Kafka集群時,需要將一些現有數據遷移到這些新的節點。

遷移數據的過程是手動啟動的,執行過程是完全自動化的。在Kafka後臺服務中,Kafka將添加新服務器作為其正在遷移的分區的Follower,並允許新增節點完全復制該分區中的現有數據。當新服務器節點完全復制此分區的內容並加入同步副本(ISR)時,其中一個現有副本將刪除其分區的數據。

Kafka系統提供了一個分區重新分配工具(kafka-reassign-partitions.sh),該工具可用於在Broker之間遷移分區。理想情況下,將確保所有Broker的數據和分區均勻分配。分區重新分配工具無法自動分析Kafka群集中的數據分布並遷移分區以實現均勻的負載均衡。因此,管理員在操作的時候,必須弄清楚應該遷移哪些Topic或分區。

分區重新分配工具可以在3種互斥模式下運行:

  • --generate:在此模式下,給定Topic列表和Broker列表,該工具會生成候選重新??分配,以將指定Topic的所有分區遷移到新Broker中。此選項僅提供了一種方便的方法,可在給定Topic和目標Broker列表的情況下生成分區重新分配計劃。
  • --execute:在此模式下,該工具將根據用戶提供的重新分配計劃啟動分區的重新分配。 (使用--reassignment-json-file選項)。由管理員手動制定自定義重新分配計劃,也可以使用--generate選項提供。
  • --verify:在此模式下,該工具將驗證最後一次--execute期間列出的所有分區的重新分配狀態。狀態可以有成功、失敗或正在進行等狀態。

2.1.1 遷移過程實現

分區重新分配工具可用於將一些Topic從當前的Broker節點中遷移到新添加的Broker中。這在擴展現有集群時通常很有用,因為將整個Topic移動到新的Broker變得更容易,而不是一次移動一個分區。當執行此操作時,用戶需要提供已有的Broker節點的Topic列表,以及到新節點的Broker列表(源Broker到新Broker的映射關系)。然後,該工具在新的Broker中均勻分配給指定Topic列表的所有分區。在遷移過程中,Topic的復制因子保持不變。

現有如下實例,將Topic為ke01,ke02的所有分區從Broker1中移動到新增的Broker2和Broker3中。由於該工具接受Topic的輸入列表作為JSON文件,因此需要明確遷移的Topic並創建json文件,如下所示:

> cat topic-to-move.json
{"topics": [{"topic": "ke01"},
            {"topic": "ke02"}],
"version":1
}

準備好JSON文件,然後使用分區重新分配工具生成候選分配,命令如下:

> bin/kafka-reassign-partitions.sh --zookeeper dn1:2181 --topics-to-move-json-file topics-to-move.json --broker-list "1,2" --generate

執行命名之前,Topic(ke01、ke02)的分區如下圖所示:

技術分享圖片

技術分享圖片

執行完成命令之後,控制臺出現如下信息:

技術分享圖片

該工具生成一個候選分配,將所有分區從Topic ke01,ke02移動到Broker1和Broker2。需求註意的是,此時分區移動尚未開始,它只是告訴你當前的分配和建議。保存當前分配,以防你想要回滾它。新的賦值應保存在JSON文件(例如expand-cluster-reassignment.json)中,以使用--execute選項執行。JSON文件如下:

{"version":1,"partitions":[{"topic":"ke02","partition":0,"replicas":[2]},{"topic":"ke02","partition":1,"replicas":[1]},{"topic":"ke02","partition":2,"replicas":[2]},{"topic":"ke01","partition":0,"replicas":[2]},{"topic":"ke01","partition":1,"replicas":[1]},{"topic":"ke01","partition":2,"replicas":[2]}]}

執行命令如下所示:

> ./kafka-reassign-partitions.sh --zookeeper dn1:2181 --reassignment-json-file expand-cluster-reassignment.json --execute

最後,--verify選項可與該工具一起使用,以檢查分區重新分配的狀態。需要註意的是,相同的expand-cluster-reassignment.json(與--execute選項一起使用)應與--verify選項一起使用,執行命令如下:

> ./kafka-reassign-partitions.sh --zookeeper dn1:2181 --reassignment-json-file expand-cluster-reassignment.json --verify

執行結果如下圖所示:

技術分享圖片

同時,我們可以通過Kafka Eagle工具來查看Topic的分區情況。

技術分享圖片

技術分享圖片

2.2 跨集群遷移

這裏跨集群遷移,我們指的是在Kafka多個集群之間復制數據“鏡像”的過程,以避免與單個集群中的節點之間發生的復制混淆。 Kafka附帶了一個用於在Kafka集群之間鏡像數據的工具。該工具從源集群使用並生成到目標集群。這種鏡像的一個常見用例是在另一個數據中心提供副本。

另外,你可以運行許多此類鏡像進程以提高吞吐量和容錯(如果一個進程終止,其他進程將占用額外負載)。將從源集群中的Topic讀取數據,並將其寫入目標集群中具有相同名稱的主題。事實上,“鏡像”數據只不過是一個Kafka將消費者和生產者聯系在了一起。

源集群和目標集群是完全獨立的實體,它們可以具有不同數量的分區,並且偏移量將不相同。出於這個原因,鏡像集群並不是真正意圖作為容錯機制(因為消費者的位置會有所不同);為此,建議使用正常的集群內復制。但是,鏡像進程將保留並使用消息Key進行分區,因此可以按Key保留順序。

下面是一個跨集群的單Topic實例,命令如下:

> ./kafka-mirror-maker.sh --consumer.config consumer.properties --producer.config producer.properties --whitelist ke03

需要註意的是,consumer.properties文件配置源Kafka集群Broker地址,producer.properties文件配置目標Kafka集群地址。如果需要遷移多個Topic,可以使用 --whitelist ‘A|B‘,如果需要遷移所有的Topic,可以使用 --whitelist ‘*‘。

3.結果預覽

執行跨集群遷移命令後,目標集群中使用Kafka Eagle中查看Topic Size大小看是否與源集群的Topic Size大小相等,或者使用SQL語句,驗證是否有數據遷移過來,結果如下圖所示:

技術分享圖片

4.總結

跨集群遷移數據的本質是,Kafka啟動了消費者讀取源集群數據,並將消費後的數據寫入到目標集群,在遷移的過程中,可以啟動多個實例,提供遷出的吞吐量。

5.結束語

這篇博客就和大家分享到這裏,如果大家在研究學習的過程當中有什麽問題,可以加群進行討論或發送郵件給我,我會盡我所能為您解答,與君共勉!

另外,博主出書了《Kafka並不難學》,喜歡的朋友或同學, 可以在公告欄那裏點擊購買鏈接購買博主的書進行學習,在此感謝大家的支持。

Kafka數據遷移