1. 程式人生 > 實用技巧 >flink table&sql canal使用案例

flink table&sql canal使用案例

版本資訊

產品版本
Flink 1.11.1
flink-cdc-connectors 1.1.0
Java 1.8.0_231
MySQL 5.7.16

Mavan依賴

  • pom.xml 依賴部分
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <flink.version>1.11.1</flink.version>
    </properties>
    
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_2.11</artifactId>
            <version>${flink.version}</version>
            <type>test-jar</type>
        </dependency>
        <!-- Flink-CDC -->
        <dependency>
            <groupId>com.alibaba.ververica</groupId>
            <artifactId>flink-connector-mysql-cdc</artifactId>
            <version>1.1.0</version>
        </dependency>
    
    </dependencies>

    主從同步配置、資料準備

    • 關閉MySQL服務
    • 在需要被同步的MySQL節點,新增如下配置(可供參考的文件
      [mysqld]
      # 前面還有其他配置
      # 新增的部分
      server-id = 12345
      log-bin = mysql-bin
      # 必須為ROW
      binlog_format = ROW
      # 必須為FULL,MySQL-5.7後才有該引數
      binlog_row_image  = FULL
      expire_logs_days  = 10
    • 啟動MySQL服務
    • 使用如下命令,可檢視binlog相關變數配置
      SHOW VARIABLES LIKE '%binlog%';
    • 建立待測試的庫、表、資料
      CREATE DATABASE db_inventory_cdc;
      
      CREATE TABLE tb_products_cdc(
      	id INT PRIMARY KEY AUTO_INCREMENT,
      	name VARCHAR(64),
      	description VARCHAR(128)
      );
      
      INSERT INTO tb_products_cdc
      VALUES 
      	(DEFAULT, 'zhangsan', 'aaa'),
      	(DEFAULT, 'lisi', 'bbb'),
      	(DEFAULT, 'wangwu', 'ccc');
    • 建立用於同步的使用者,並給予許可權(可供參考的文件
      -- 設定擁有同步許可權的使用者
      CREATE USER 'flinkuser' IDENTIFIED BY 'flinkpassword';
      -- 賦予同步相關許可權
      GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'flinkuser';
    • 建立使用者並賦予許可權成功後,使用該使用者登入MySQL,可以使用以下命令檢視主從同步相關資訊
      SHOW MASTER STATUS
      SHOW SLAVE STATUS
      SHOW BINARY LOGS

    使用Flink-CDC

      • sql-cli點選檢視
      • 編碼方式,方便提交jar包,示例如下
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironimport org.apache.flink.table.api.EnvironmentSettings;
    import org.apache.flink.table.api.TableResult;
    import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
    import org.apache.flink.table.planner.factories.TestValuesTableFactory;
    public class FlinkCDCSQLTest {
    
        public static void main(String[] args) throws Exception {
            EnvironmentSettings fsSettings = EnvironmentSettings.newInstance()
                    .useBlinkPlanner()
                    .inStreamingMode()
                    .build();
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, fsSettings);
            
            // 資料來源表
            String sourceDDL =
                    "CREATE TABLE mysql_binlog (\n" +
                    " id INT NOT NULL,\n" +
                    " name STRING,\n" +
                    " description STRING\n" +
                    ") WITH (\n" +
                    " 'connector' = 'mysql-cdc',\n" +
                    " 'hostname' = 'localhost',\n" +
                    " 'port' = '3306',\n" +
                    " 'username' = 'flinkuser',\n" +
                    " 'password' = 'flinkpassword',\n" +
                    " 'database-name' = 'db_inventory_cdc',\n" +
                    " 'table-name' = 'tb_products_cdc'\n" +
                    ")";
            // 輸出目標表
            String sinkDDL =
                    "CREATE TABLE tb_sink (\n" +
                    " name STRING,\n" +
                    " countSum BIGINT,\n" +
                    " PRIMARY KEY (name) NOT ENFORCED\n" +
                    ") WITH (\n" +
                    " 'connector' = 'print'\n" +
                    ")";
            // 簡單的聚合處理
            String transformSQL =
                    "INSERT INTO tb_sink " +
                    "SELECT name, COUNT(1) " +
                    "FROM mysql_binlog " +
                    "GROUP BY name";
                    
            tableEnv.executeSql(sourceDDL);
            tableEnv.executeSql(sinkDDL);
            TableResult result = tableEnv.executeSql(transformSQL);
            
            // 等待flink-cdc完成快照
            waitForSnapshotStarted("tb_sink");
            result.print();
            
            result.getJobClient().get().cancel().get();
        }
    
        private static void waitForSnapshotStarted(String sinkName) throws InterruptedException {
            while (sinkSize(sinkName) == 0) {
                Thread.sleep(100);
            }
        }
    
        private static int sinkSize(String sinkName) {
            synchronized (TestValuesTableFactory.class) {
                try {
                    return TestValuesTableFactory.getRawResults(sinkName).size();
                } catch (IllegalArgumentException e) {
                    // job is not started yet
                    return 0;
                }
            }
        }
    
    }

    簡單的測試

      • 進行簡單測試,開始修改MySQL表的資料
        -- SQL測試資料,對照Flink應用
        
        INSERT INTO tb_products_cdc VALUE(DEFAULT, 'lisi', 'ddd');
        
        DELETE FROM tb_products_cdc WHERE id=4;
        
        UPDATE tb_products_cdc SET name='wangwu' WHERE id=2;
      • 執行一條SQL,檢視一下Flink的結果變化