1. 程式人生 > 其它 >通過canal+kafka將mysql資料匯入StarRocks

通過canal+kafka將mysql資料匯入StarRocks

 背景

在支援客戶中,我們發現有一些客戶公司已經存在一些資料通道,不允許業務直接消費MySQL Binlog,所有的資料消費都是從Kafka中獲取,所以寫這篇文件分享下如何消費Kafka中canal格式的資料寫到到starrocks,實現CDC。 資料流向
Mysql Binlog-->Canal-->kafka-->Flink SQL-->StarRocks

環境準備

1. Canal

關於canal相關的配置這裡就不贅述了,建議大家可以參考使用canal從mysql同步binlog匯入StarRocks安裝和配置下canal以及依賴環境

2. Flink

這裡我們需要利用flink sql完成資料的讀取和寫入,所以需要大家安裝flink服務。以下介紹單機版Flink安裝教程。(如果公司已有flink叢集服務,可跳過這一部分) 啟動Flink服務:
cd flink-xxx
./bin/start-cluster.sh

 

DDL

Kafka中資料樣例
{
"data":[
{
"id":"2f2192e9-f8b5-4332-a96f-192b05c9e6bc",
"agent_id":"16",
"http_port":"8031",
"rpc_port":"9020",
"query_port":"8306",
"edit_log_port":"9010",
"meta_dir":"",
"absolute_meta_dir":"/home/disk1/sr/data/sr/meta",
"log_dir":"",
"absolute_log_dir":"/home/disk1/sr/dorisdb-manager-20211008/fe-2f2192e9-f8b5-4332-a96f-192b05c9e6bc/log",
"role":"FOLLOWER",
"install_path":"/home/disk1/sr/dorisdb-manager-20211008",
"absolute_migrate_path":"/home/disk1/sr/app/StarRocks/SE/StarRocks-1.18.3/fe",
"deleted":"0",
"deleted_at":"0",
"created_at":"1633759183484",
"updated_at":"1634240355691"
}
],
"database":"test",
"es":1634240355000,
"id":1076,
"isDdl":false,
"mysqlType":{
"id":"varchar(48)",
"agent_id":"int(11)",
"http_port":"int(11)",
"rpc_port":"int(11)",
"query_port":"int(11)",
"edit_log_port":"int(11)",
"meta_dir":"text",
"absolute_meta_dir":"text",
"log_dir":"text",
"absolute_log_dir":"text",
"role":"varchar(32)",
"install_path":"text",
"absolute_migrate_path":"text",
"deleted":"tinyint(1)",
"deleted_at":"bigint(20)",
"created_at":"bigint(20)",
"updated_at":"bigint(20)"
},
"old":[
{
"updated_at":"1634240295633"
}
],
"pkNames":[
"id"
],
"sql":"",
"sqlType":{
"id":12,
"agent_id":4,
"http_port":4,
"rpc_port":4,
"query_port":4,
"edit_log_port":4,
"meta_dir":2005,
"absolute_meta_dir":2005,
"log_dir":2005,
"absolute_log_dir":2005,
"role":12,
"install_path":2005,
"absolute_migrate_path":2005,
"deleted":-7,
"deleted_at":-5,
"created_at":-5,
"updated_at":-5
},
"table":"fe_instances",
"ts":1634240355886,
"type":"UPDATE"
}

  

StarRocks

create database canaltest;
CREATE TABLE IF NOT EXISTS `canaltest`.`canal_test_sink` (
`id` STRING NOT NULL,
`agent_id` int(11) NULL,
`http_port` int(11) NULL,
`rpc_port` int(11) NULL,
`query_port` int(11),
`edit_log_port` int(11),
`meta_dir` STRING,
`absolute_meta_dir` STRING,
`log_dir` STRING,
`absolute_log_dir` STRING,
`role` varchar(32),
`install_path` STRING,
`absolute_migrate_path` STRING,
`deleted` tinyint(1),
`deleted_at` bigint(20),
`created_at` bigint(20),
`updated_at` bigint(20)
) ENGINE=OLAP
PRIMARY KEY(`id`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`id`) BUCKETS 3
PROPERTIES (
"replication_num" = "3",
"in_memory" = "false",
"storage_format" = "DEFAULT"
);

  

Flink SQL 可以寫到檔案flink-create.sql中
CREATE DATABASE IF NOT EXISTS `testdb`;
 
CREATE TABLE IF NOT EXISTS `testdb`.`canal_test_source` (
`id` STRING NOT NULL,
`agent_id` int NULL,
`http_port` int NULL,
`rpc_port` int NULL,
`query_port` int,
`edit_log_port` int,
`meta_dir` STRING,
`absolute_meta_dir` STRING,
`log_dir` STRING,
`absolute_log_dir` STRING,
`role` varchar,
`install_path` STRING,
`absolute_migrate_path` STRING,
`deleted` tinyint,
`deleted_at` bigint,
`created_at` bigint,
`updated_at` bigint
) with (
'connector' = 'kafka',
'topic' = 'canal_test', #kafka topic名字
'properties.bootstrap.servers' = '$kafka_host:9092', #kafka主機名
'properties.group.id' = 'canal_group', #kafka消費組
'format' = 'canal-json' -- 使用 canal-json 格式
);
 
CREATE TABLE IF NOT EXISTS `testdb`.`canal_test_sink` (
`id` STRING NOT NULL,
`agent_id` int NULL,
`http_port` int NULL,
`rpc_port` int NULL,
`query_port` int NULL,
`edit_log_port` int,
`meta_dir` STRING,
`absolute_meta_dir` STRING,
`log_dir` STRING,
`absolute_log_dir` STRING,
`role` STRING,
`install_path` STRING,
`absolute_migrate_path` STRING,
`deleted` tinyint,
`deleted_at` bigint,
`created_at` bigint,
`updated_at` bigint,
PRIMARY KEY(`id`)
NOT ENFORCED
) with (
'load-url' = '$fe_host:8030',
'sink.properties.row_delimiter' = '\x02',
'username' = 'root',
'database-name' = 'canaltest',
'sink.properties.column_separator' = '\x01',
'jdbc-url' = 'jdbc:mysql://$fe_host:9030',
'password' = '',
'sink.buffer-flush.interval-ms' = '15000',
'connector' = 'starrocks',
'table-name' = 'canal_test_sink' #starrocks中的表名
);
 
INSERT INTO `testdb`.`canal_test_sink` SELECT * FROM `testdb`.`canal_test_source`;

  

 

啟動任務測試

cd flink-xxx
./bin/sql-client.sh -f flink-create.sql

  

#檢視任務狀態
./bin/flink list

  

#輸出如下圖表示正常啟動 Waiting for response... ------------------ Running/Restarting Jobs ------------------- 18.03.2022 09:48:34 : 4a2c5035ca292fef9691524c731122c2 : insert-into_default_catalog.test.canal_test_sink (RUNNING) -------------------------------------------------------------- No scheduled jobs. 確認資料是否已經匯入starrocks中 select * from canaltest.canal_test_sink;  

常見問題排查

1. Flink任務沒有報錯的時候

第一步:確認binlog是否開啟,可以通過 SHOW VARIABLES LIKE 'log_bin'檢視; 第二步:確認flink、flink-cdc、flink-starrocks-connector和mysql版本(MySQL版本為5.7和8.0.X)是否滿足要求,flink、flink-cdc和flink-starrocks-connector的大版本需要一致,例如都是1.13版本 第三步:逐步判斷是查源表還是寫starrocks的問題,這裡利用下面的sql檔案演示一下,該檔案是上面生成步驟生成的flink-create.sql 安裝的Flink目錄下執行下面語句進入flink-sql bin/sql-client.sh 首先驗證讀取source表是否正常 #分別把上面的sql貼上進來判斷是查詢源表的問題還是寫入到starrocks的問題
CREATE DATABASE IF NOT EXISTS `testdb`;
 
CREATE TABLE IF NOT EXISTS `testdb`.`canal_test_source` (
`id` STRING NOT NULL,
`agent_id` int NULL,
`http_port` int NULL,
`rpc_port` int NULL,
`query_port` int,
`edit_log_port` int,
`meta_dir` STRING,
`absolute_meta_dir` STRING,
`log_dir` STRING,
`absolute_log_dir` STRING,
`role` varchar,
`install_path` STRING,
`absolute_migrate_path` STRING,
`deleted` tinyint,
`deleted_at` bigint,
`created_at` bigint,
`updated_at` bigint
) with (
'connector' = 'kafka',
'topic' = 'canal_test', #kafka topic名字
'properties.bootstrap.servers' = '$kafka_host:9092',
'properties.group.id' = 'canal_group',
'format' = 'canal-json' -- 使用 canal-json 格式
);

  

#驗證source是否正常
select * from `testdb`.`canal_test_source`

  

再驗證寫入starrocks是否正常
CREATE TABLE IF NOT EXISTS `testdb`.`canal_test_sink` (
`id` STRING NOT NULL,
`agent_id` int NULL,
`http_port` int NULL,
`rpc_port` int NULL,
`query_port` int NULL,
`edit_log_port` int,
`meta_dir` STRING,
`absolute_meta_dir` STRING,
`log_dir` STRING,
`absolute_log_dir` STRING,
`role` STRING,
`install_path` STRING,
`absolute_migrate_path` STRING,
`deleted` tinyint,
`deleted_at` bigint,
`created_at` bigint,
`updated_at` bigint,
PRIMARY KEY(`id`)
NOT ENFORCED
) with (
'load-url' = '$fe_host:8030',
'sink.properties.row_delimiter' = '\x02',
'username' = 'root',
'database-name' = 'canaltest',
'sink.properties.column_separator' = '\x01',
'jdbc-url' = 'jdbc:mysql://$fe_host:9030',
'password' = '',
'sink.buffer-flush.interval-ms' = '15000',
'connector' = 'starrocks',
'table-name' = 'canal_test_sink'
);
 
INSERT INTO `testdb`.`canal_test_sink` SELECT * FROM `testdb`.`canal_test_source`;

2. Flink任務出錯

第一步:確認flink叢集是否有啟動,可能有的同學本地下載的flink沒有啟動,需要./bin/start-cluster.sh啟動下flink 第二步:根據具體的報錯再具體分析