1. 程式人生 > 其它 >Flink 1.11.2 實現kafka到mysql

Flink 1.11.2 實現kafka到mysql

技術標籤:Flinkflink 1.11kafkamysql

新增pom

    <properties>
        <flink.version>1.11.2</flink.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId> org.apache.cassandra</groupId>
            <artifactId>cassandra-all</artifactId>
            <version>0.8.1</version>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>log4j</groupId>
                    <artifactId>log4j</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.22</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime-web_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
    </dependencies>

案例程式碼

package com.jh;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;


public class StreamJob {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);

        bsTableEnv.executeSql("CREATE TABLE user_behavior (\n" +
                "    user_id BIGINT,\n" +
                "    item_id BIGINT,\n" +
                "    category_id BIGINT,\n" +
                "    behavior STRING,\n" +
                "    ts TIMESTAMP(3),\n" +
                "    proctime AS PROCTIME(),   -- generates processing-time attribute using computed column\n" +
                "    WATERMARK FOR ts AS ts - INTERVAL '5' SECOND  -- defines watermark on ts column, marks ts as event-time attribute\n" +
                ") WITH (\n" +
                "    'connector' = 'kafka',  -- using kafka connector\n" +
                "    'topic' = 'user_behavior',  -- kafka topic\n" +
                "    'scan.startup.mode' = 'earliest-offset',  -- reading from the beginning\n" +
                "    'properties.bootstrap.servers' = 'bigdata1:9092',  -- kafka broker address\n" +
                "    'properties.zookeeper' = 'bigdata1:2181',  -- zookeeper address\n" +
                "    'format' = 'json'  -- the data format is json\n" +
                ")");

        bsTableEnv.executeSql("CREATE TABLE buy_cnt_per_hour (\n" +
                "    hour_of_day BIGINT,\n" +
                "    buy_cnt BIGINT\n" +
                ") WITH (\n" +
                "   'connector' = 'jdbc',\n" +
                "   'url' = 'jdbc:mysql://bigdata1:3306/stream?createDatabaseIfNotExist=true&serverTimezone=UTC&useUnicode=true&characterEncoding=utf8&useSSL=false&AllowPublicKeyRetrieval=True',\n" +
                "   'table-name' = 'buy_cnt_per_hour' ,\n" +
                "   'driver' = 'com.mysql.cj.jdbc.Driver',\n" +
                "   'username' = 'root', \n" +
                "   'password' = '123456' \n" +
                ")");

       bsTableEnv.executeSql("INSERT INTO buy_cnt_per_hour\n" +
               "SELECT HOUR(TUMBLE_START(ts, INTERVAL '1' HOUR)), COUNT(*)\n" +
               "FROM user_behavior\n" +
               "WHERE behavior = 'buy'\n" +
               "GROUP BY TUMBLE(ts, INTERVAL '1' HOUR)");

        bsEnv.execute("");
    }
}