1. 程式人生 > 其它 >使用最新版flink tidb cdc 同步資料到StarRocks

使用最新版flink tidb cdc 同步資料到StarRocks

  Flink CDC 2.2 版本新增了 OceanBase CE,PolarDB-X,SqlServer,TiDB 四種資料來源接入。其中新增 OceanBase CDC,SqlServer CDC,TiDB CDC 三個聯結器,而 PolarDB-X 的支援則是通過對 MySQL CDC 聯結器進行相容適配實現。 釋出原文連結:https://blog.csdn.net/weixin_44904816/article/details/123836091 最近在測tidb原始的ticdc(PingCap提供)匯入資料到kafka,再使用flink StarRocks cdc匯入到StarRocks過程中碰到較多問題,剛好可以使用flink新發布的flink tidb cdc來對接,實現從tidb資料同步到StarRocks。  

1.tidb環境部署

  1. 下載tidb部署安裝包tidb-community-server-v4.0.9-linux-amd64.tar.gz,下載地址
        https://download.pingcap.org/tidb-community-server-v4.0.9-linux-amd64.tar.gz
  1. 安裝環境
tar -xvf tidb-community-server-v4.0.9-linux-amd64.tar.gz cd tidb-community-server-v4.0.9-linux-amd64 sh local_install.sh
  1. 使用tidb playground模式啟動tidb單節點測試環境
(客戶成功)sr@cs03:~/.tiup/bin$pwd /home/disk1/sr/.tiup/bin   (客戶成功)sr@cs03:~/.tiup/bin$./tiup playground Starting component `playground`: /home/disk1/sr/.tiup/components/playground/v1.3.2/tiup-playground Use the latest stable version: v4.0.9   Specify version manually: tiup playground <version> The stable version: tiup playground v4.0.0 The nightly version: tiup playground nightly   Playground Bootstrapping... Start pd instance Start tikv instance Start tidb instance Waiting for tidb instances ready 127.0.0.1:4000 ... Done Start tiflash instance Waiting for tiflash instances ready 127.0.0.1:3930 ... Done CLUSTER START SUCCESSFULLY, Enjoy it ^-^ To connect TiDB: mysql --host 127.0.0.1 --port 4000 -u root To view the dashboard: http://127.0.0.1:2379/dashboard To view the Prometheus: http://127.0.0.1:37449 To view the Grafana: http://127.0.0.1:3000

2.Flink tidb cdc connector下載

下載flink tidb cdc connector,放在flink lib下: wget https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-tidb-cdc/2.2.0/flink-sql-connector-tidb-cdc-2.2.0.jar   (客戶成功)sr@cs03:~/app/flink-1.13.3/lib$pwd /home/disk1/sr/app/flink-1.13.3/lib (客戶成功)sr@cs03:~/app/flink-1.13.3/lib$ll total 353940 -rw-rw-r-- 1 sr sr 9404460 Mar 19 17:29 flink-connector-starrocks-1.2.1_flink-1.13_2.12.jar -rw-r--r-- 1 sr sr 92313 Oct 13 2021 flink-csv-1.13.3.jar -rw-r--r-- 1 sr sr 115418686 Oct 13 2021 flink-dist_2.11-1.13.3.jar -rw-r--r-- 1 sr sr 19583 Nov 5 21:27 flink-format-changelog-json-1.4.0.jar -rw-r--r-- 1 sr sr 148127 Oct 13 2021 flink-json-1.13.3.jar -rw-rw-r-- 1 sr sr 41368997 Feb 12 2020 flink-shaded-hadoop-2-uber-2.7.5-10.0.jar.bak -rwxrwxr-x 1 sr sr 7709740 Jun 8 2021 flink-shaded-zookeeper-3.4.14.jar -rw-rw-r-- 1 sr sr 3674114 Oct 12 2021 flink-sql-connector-kafka_2.12-1.13.3.jar -rw-rw-r-- 1 sr sr 19648014 Dec 16 23:36 flink-sql-connector-mysql-cdc-2.1.1.jar -rw-rw-r-- 1 sr sr 85355380 Apr 19 17:03 flink-sql-connector-tidb-cdc-2.2.0.jar -rw-r--r-- 1 sr sr 36453353 Oct 13 2021 flink-table_2.11-1.13.3.jar -rw-r--r-- 1 sr sr 41061738 Oct 13 2021 flink-table-blink_2.11-1.13.3.jar -rwxrwxr-x 1 sr sr 67114 Mar 31 2021 log4j-1.2-api-2.12.1.jar -rwxrwxr-x 1 sr sr 276771 Mar 31 2021 log4j-api-2.12.1.jar -rwxrwxr-x 1 sr sr 1674433 Mar 31 2021 log4j-core-2.12.1.jar -rwxrwxr-x 1 sr sr 23518 Mar 31 2021 log4j-slf4j-impl-2.12.1.jar

3.Yarn session模式啟動flink環境

(客戶成功)sr@cs03:~/app/flink-1.13.3/bin$./yarn-session.sh SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/home/disk1/sr/app/flink-1.13.3/lib/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] ...... ...... ...... 2022-04-19 17:23:37,134 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - The configured TaskManager memory is 2728 MB. YARN will allocate 3072 MB to make up an integer multiple of its minimum allocation memory (1024 MB, configured via 'yarn.scheduler.minimum-allocation-mb'). The extra 344 MB may not be used by Flink. 2022-04-19 17:23:37,134 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Cluster specification: ClusterSpecification{masterMemoryMB=2600, taskManagerMemoryMB=2728, slotsPerTaskManager=2} 2022-04-19 17:23:39,039 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Submitting application master application_1641365150688_0057 2022-04-19 17:23:39,069 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl [] - Submitted application application_1641365150688_0057 2022-04-19 17:23:39,070 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Waiting for the cluster to be allocated 2022-04-19 17:23:39,071 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Deploying cluster, current state ACCEPTED 2022-04-19 17:23:44,097 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - YARN application has been deployed successfully. 2022-04-19 17:23:44,159 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Found Web Interface cs03:38081 of application 'application_1641365150688_0057'. JobManager Web Interface: http://cs03:38081

4.tidb環境建表,插入資料,更新等

mysql> create database gong; mysql> use gong; mysql> CREATE TABLE orders ( -> order_id INT, -> order_date TIMESTAMP(3), -> customer_name varchar(20), -> price DECIMAL(10, 5), -> product_id INT, -> order_status BOOLEAN, -> PRIMARY KEY(order_id) -> ) ; mysql> insert into orders values(2,now(),"gong",13.99,123,1); Query OK, 1 row affected (0.00 sec) mysql> insert into orders values(3,now(),"gong",13.99,123,1); Query OK, 1 row affected (0.00 sec) mysql> insert into orders values(4,now(),"gong",13.99,123,1); Query OK, 1 row affected (0.01 sec) mysql> insert into orders values(5,now(),"gong",13.99,123,1); Query OK, 1 row affected (0.01 sec) mysql> insert into orders values(6,now(),"gong",13.99,123,1); Query OK, 1 row affected (0.00 sec) mysql> insert into orders values(7,now(),"gong",13.99,123,1); Query OK, 1 row affected (0.01 sec) mysql> insert into orders values(8,now(),"gong",13.99,123,1); Query OK, 1 row affected (0.00 sec) mysql> insert into orders values(9,now(),"gong",13.99,123,1); Query OK, 1 row affected (0.00 sec) mysql> insert into orders values(10,now(),"gong",13.99,123,1); Query OK, 1 row affected (0.01 sec) mysql> insert into orders values(11,now(),"gong",13.99,123,1); Query OK, 1 row affected (0.00 sec) mysql> insert into orders values(12,now(),"gong",13.99,123,1); Query OK, 1 row affected (0.00 sec)     mysql> SELECT * FROM ORDERS; +----------+-------------------------+---------------+----------+------------+--------------+ | order_id | order_date | customer_name | price | product_id | order_status | +----------+-------------------------+---------------+----------+------------+--------------+ | 1 | 2022-04-19 17:12:33.000 | gong | 13.99000 | 123 | 1 | | 2 | 2022-04-19 17:14:20.000 | gong | 13.99000 | 123 | 1 | | 3 | 2022-04-19 17:14:25.000 | gong | 13.99000 | 123 | 1 | | 4 | 2022-04-19 17:14:46.000 | gong | 13.99000 | 123 | 1 | | 5 | 2022-04-19 17:14:51.000 | gong | 13.99000 | 123 | 1 | | 6 | 2022-04-19 17:14:58.000 | gong | 13.99000 | 123 | 1 | | 7 | 2022-04-19 17:15:31.000 | gong | 13.99000 | 123 | 1 | | 8 | 2022-04-19 17:15:36.000 | gong | 13.99000 | 123 | 1 | | 9 | 2022-04-19 17:17:18.000 | gong | 13.99000 | 123 | 1 | | 10 | 2022-04-19 17:17:31.000 | gong | 13.99000 | 123 | 1 | | 11 | 2022-04-19 17:20:31.000 | gong | 13.99000 | 123 | 1 | | 12 | 2022-04-19 17:20:37.000 | gong | 13.99000 | 123 | 1 | +----------+-------------------------+---------------+----------+------------+--------------+     mysql> update orders set customer_name = "gongxiucheng" where order_id = 12; Query OK, 1 row affected (0.02 sec) Rows matched: 1 Changed: 1 Warnings: 0

5.測試tidb資料到flink sql

開啟flink sql: cd $FLINK_HOME/bin ./sql_client.sh embedded -- checkpoint every 3000 milliseconds Flink SQL> SET 'execution.checkpointing.interval' = '3s'; -- register a TiDB table 'orders' in Flink SQL Flink SQL> CREATE TABLE orders ( order_id INT, order_date TIMESTAMP(3), customer_name STRING, price DECIMAL(10, 5), product_id INT, order_status BOOLEAN, PRIMARY KEY(order_id) NOT ENFORCED ) WITH ( 'connector' = 'tidb-cdc', 'tikv.grpc.timeout_in_ms' = '20000', 'pd-addresses' = 'localhost:2379', 'database-name' = 'gong', 'table-name' = 'orders' ); -- read snapshot and binlogs from orders table Flink SQL> SELECT * FROM orders; 生成了flink任務:   (客戶成功)sr@cs03:~/app/flink-1.13.3/bin$./flink list SLF4J: Class path contains multiple SLF4J bindings ...... ...... 2022-04-19 18:16:02,146 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Found Web Interface cs03:38081 of application 'application_1641365150688_0057'. Waiting for response... ------------------ Running/Restarting Jobs ------------------- 19.04.2022 17:25:27 : 4852d0bdb40aba6ae4041f1e36eadd1c : collect (RUNNING) -------------------------------------------------------------- 說明資料已經進入到flink sql中,匯入到StarRocks;

6.對接StarRocks,將資料寫入到StarRocks

在StarRocks建表 CREATE TABLE `orders` ( `order_id` int(11) NOT NULL COMMENT "", `order_date` datetime NULL COMMENT "", `customer_name` varchar(20) NULL COMMENT "", `price` decimal64(10, 5) NULL COMMENT "", `product_id` int(11) NULL COMMENT "", `order_status` boolean NULL COMMENT "" ) ENGINE=OLAP PRIMARY KEY(`order_id`) COMMENT "OLAP" DISTRIBUTED BY HASH(`order_id`) BUCKETS 1 PROPERTIES ( "replication_num" = "3", "in_memory" = "false", "storage_format" = "DEFAULT" );

7.使用Flink sql將資料同步到StarRocks

先將flink StarRocks cdc 依賴包 flink-connector-starrocks-1.2.1_flink-1.13_2.12.jar下載好,放到flink lib目錄下,重啟flink服務;(下載連結 Flink StarRocks connector,請注意1.13版本和1.11/1.12版本使用不同的connector) 在flink sql cli端 CREATE TABLE IF NOT EXISTS `orders_sink` ( order_id INT, order_date TIMESTAMP(3), customer_name varchar(20), price DECIMAL(10, 5), product_id INT, order_status BOOLEAN, PRIMARY KEY(`order_id`) NOT ENFORCED ) with ( 'load-url' = 'cs01:9011', 'sink.buffer-flush.interval-ms' = '15000', 'sink.properties.row_delimiter' = '\x02', 'sink.properties.column_separator' = '\x01', 'connector' = 'starrocks', 'database-name' = 'gong', 'table-name' = 'orders', 'jdbc-url' = 'jdbc:mysql://cs01:9013', 'password' = '', 'username' = 'root' );     Flink SQL> show tables; +-------------+ | table name | +-------------+ | orders | | orders_sink | +-------------+ 2 rows in set 將資料插入到StarRocks Flink SQL> insert into orders_sink select * from orders; [INFO] Submitting SQL update statement to the cluster... 2022-04-19 18:37:14,581 INFO org.apache.hadoop.yarn.client.RMProxy [] - Connecting to ResourceManager at /172.26.194.184:38035 2022-04-19 18:37:14,582 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar 2022-04-19 18:37:14,587 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Found Web Interface cs03:38081 of application 'application_1641365150688_0057'. [INFO] SQL update statement has been successfully submitted to the cluster: Job ID: 33b03fb9d257b57c5243e721c19a6041 檢視新flink sql生成的任務: (客戶成功)sr@cs03:~/app/flink-1.13.3/bin$./flink list SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/home/disk1/sr/app/flink-1.13.3/lib/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] ...... ...... 2022-04-19 18:37:46,875 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar 2022-04-19 18:37:46,953 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Found Web Interface cs03:38081 of application 'application_1641365150688_0057'. Waiting for response... ------------------ Running/Restarting Jobs ------------------- 19.04.2022 18:37:14 : 33b03fb9d257b57c5243e721c19a6041 : insert-into_default_catalog.default_database.orders_sink (RUNNING) -------------------------------------------------------------- No scheduled jobs. 在StarRocks驗證變更資料是否都已經進去了   mysql> select current_version(); +-------------------+ | current_version() | +-------------------+ | 2.1.0 1864de0 | +-------------------+ 1 row in set (0.02 sec)   mysql> select * from orders; +----------+---------------------+---------------+----------+------------+--------------+ | order_id | order_date | customer_name | price | product_id | order_status | +----------+---------------------+---------------+----------+------------+--------------+ | 1 | 2022-04-19 09:12:33 | gong | 13.99000 | 123 | 1 | | 2 | 2022-04-19 09:14:20 | gong | 13.99000 | 123 | 1 | | 3 | 2022-04-19 09:14:25 | gong | 13.99000 | 123 | 1 | | 4 | 2022-04-19 09:14:46 | gong | 13.99000 | 123 | 1 | | 5 | 2022-04-19 09:14:51 | gong | 13.99000 | 123 | 1 | | 6 | 2022-04-19 09:14:58 | gong | 13.99000 | 123 | 1 | | 7 | 2022-04-19 09:15:31 | gong | 13.99000 | 123 | 1 | | 8 | 2022-04-19 09:15:36 | gong | 13.99000 | 123 | 1 | | 9 | 2022-04-19 09:17:18 | gong | 13.99000 | 123 | 1 | | 10 | 2022-04-19 09:17:31 | gong | 13.99000 | 123 | 1 | | 11 | 2022-04-19 09:20:31 | gong | 13.99000 | 123 | 1 | | 12 | 2022-04-19 09:20:37 | gongxiucheng | 13.99000 | 123 | 1 | | 13 | 2022-04-19 09:26:56 | gong | 13.99000 | 123 | 1 | | 14 | 2022-04-19 09:33:21 | gong | 13.99000 | 123 | 1 | | 15 | 2022-04-19 10:08:03 | gong | 13.99000 | 123 | 1 | +----------+---------------------+---------------+----------+------------+--------------+ 15 rows in set (0.01 sec)

8.總結:

整個測試過程還比較順滑,比之前測試tidb 原生ticdc少了很多麻煩,且簡化整個資料鏈路,不依賴kafka,是tidb資料到starrocks利器。