為線上資料庫構建基於 Kudu 的實時資料同步
阿新 • • 發佈:2021-10-09
簡述
Kudu 是 Cloudera 開源的新型列式儲存系統,是 Apache Hadoop 生態圈的成員之一。它專門為了對快速變化的資料進行快速的分析,填補了以往Hadoop 儲存層的空缺。
本篇文章將會介紹幾種資料資料同步到 Kudu 的方案選擇,然後從功能和使用角度介紹 CloudCanal 如何幫助我們解決資料實時同步到 Kudu。
幾種方案
Kudu 是一個儲存層組建,若要同步資料到 Kudu 的可以有三種選擇
- (基於RDB)選用類似 DataX 這類開源中介軟體,同時 Kudu 搭建上層 SQL 引擎。可選的 SQL 引擎有:Hive、Impala
- (基於MQ)選用 Kafka 生態,基於 Kafka + Flume + Kudu 實現資料同步。
- (基於編碼)開發對應的資料遷移程式,直接將資料寫入 Kudu 儲存。
如何選擇?
基於RDB方案
- 需要 Kudu 上層有一個 SQL 引擎。在官方文件上提到了兩種使用Kudu 的方式
- Hive 是基於 MapReduce 架構、Impala 是基於 MPP 架構
- 單純的資料寫入並不需要複雜的計算邏輯,選擇 MPP 架構顯然更適合一些。
- 這種情況下推薦 Impala + Kudu + DataX
基於MQ方案
- 比較適合源端資料變化有多個不同的消費者,資料同步僅僅是其中一條鏈路。
基於編碼方案
- 比較適合在資料同步過程中需要對資料進行加工。
同步的技術點
建表
- 對於 Kudu 上的建表需要通過 kudu-client 來進行,例如:
List<ColumnSchema> columns = new ArrayList(2); columns.add(new ColumnSchema.ColumnSchemaBuilder("key", Type.INT32).key(true).build()); columns.add(new ColumnSchema.ColumnSchemaBuilder("value", Type.STRING).build()); List<String> rangeKeys = new ArrayList<>(); rangeKeys.add("key"); Schema schema = new Schema(columns); client.createTable(tableName, schema, new CreateTableOptions().setRangePartitionColumns(rangeKeys));
- Kudu 由於儲存引擎限制,每張表必須要指定 Partition Column。
- 被設定為 Partition 的列不允許 update,如若修改 Partition Column 列的值。需要刪除整行資料重新插入。
資料寫入
- Kudu 資料寫入 Kudu 支援 Insert、Update、Delete、Upsert 四種操作
KuduTable table = client.openTable(tableName);
KuduSession session = client.newSession();
session.setTimeoutMillis(60000);
for (int i = 0; i < 3; i++) {
Insert insert = table.newInsert();
PartialRow row = insert.getRow();
row.addInt(0, i);
row.addString(1, "value " + i);
session.apply(insert);
}
session.flush();
session.close();
資料型別
- Kudu 1.6 之前,不支援 Decimal。因此源端資料型別如果是浮點數只可以選擇:float、double、int、string 來承載
- Kudu 1.15 開始,提供了 varchar 型別可以與 資料庫的 varchar 相互對應。在此之前應當選擇 string 型別。
基於 CloudCanal 快速實現資料同步
前面介紹的三種方式中 RDB方案要求增加配置 SQL 引擎、MQ 方案 則要求增加 Kafka 和 Flume。它們的資料同步鏈路都較長一旦出現問題,比較不容易排查原因所在。CloudCanal 是採用了第三種方式。
- 從源端直接訂閱資料變更
- 通過 kudu-client 直接將資料寫入到 Kudu 儲存引擎
- 整個過程無需 MQ 或 SQL 引擎的支援。
對比前面兩種方式具有下列幾個優勢
- 資源開銷小
- 鏈路簡單高效
- 產品化操作便捷
舉個"栗子"
準備 CLOUDCANAL
新增資料來源
- 登入 CloudCanal 平臺
- 資料來源管理 -> 新增資料來源
- 選擇 自建資料來源 ,並填寫相關資料庫資訊,其中 網路地址 請按提示帶上埠號
- 這裡需要有兩點提示
- kudu-client 預設訪問的是 7051 埠,並非提供 Web 介面的 8051
- 如果 Kudu 是叢集化部署,那麼在配置網路地址時需要填寫叢集 ip:port 列表。逗號間隔。
同步任務
- 任務管理管理 -> 建立任務
- 在建立任務的時候,預設高階選項中會要求至少 3副本。可以根據 Kudu 叢集情況來修改對應配置。
- 任務型別環節選擇 資料同步
- 在表選擇環節可以選擇要同步的源端表以及同步的動作
- 最後到確認環節建立任務即可。
- 在結構遷移任務跑完之後,就可以看到 Kudu 控制檯上已經把對應的 Kudu 表全部建立完畢
在最後舉例使用 Impala 來查詢位於 Kudu 中的資料,如下是建表語句:
CREATE EXTERNAL TABLE `canal_test_case_column_default` STORED AS KUDU
TBLPROPERTIES(
'kudu.table_name' = 'my-j52hri3880d6dka.canal_test_case.column_default',
'kudu.master_addresses' = '192.168.0.254:7051')
能力和限制
- 支援 MySQL、PostgreSQL、Oracle 作為源端到 Kudu 的同步
- 支援主鍵變更同步,轉換為 Kudu 刪除在插入
- 對端 Kudu 要求 1.6 版本
- 不支援源端 無主鍵表
- 不支援 DDL 增量同步
總結
本文簡單介紹了幾種將資料同步到 Kudu 的方式,以及基於 CloudCanal 是如何實現實時同步資料到 Kudu。各位小夥伴,如果覺得還不錯,請點贊、評論加轉發吧。
更多精彩
- 5分鐘搞定 MySQL 到 ElasticSearch 遷移同步-CloudCanal 實戰
- 5分鐘搞定 MySQL 到 MySQL 異構線上資料遷移同步-CloudCanal 實戰
- 5分鐘搞定 MySQL 到 ClickHouse 實時資料同步-CloudCanal 實戰
- MySQL 到 ElasticSearch 實時同步構建資料檢索服務的選型與思考
- 構建基於Kafka中轉的混合雲線上資料生態-cloudcanal實戰
- 5分鐘搞定 MySQL 到 TiDB 的資料同步 - CloudCanal實戰