1. 程式人生 > 其它 >Flink 1.9 實戰:使用 SQL 讀取 Kafka 並寫入 MySQL

Flink 1.9 實戰:使用 SQL 讀取 Kafka 並寫入 MySQL

上週六在深圳分享了《Flink SQL 1.9.0 技術內幕和最佳實踐》,會後許多小夥伴對最後演示環節的 Demo 程式碼非常感興趣,迫不及待地想嘗試下,所以寫了這篇文章分享下這份程式碼。希望對於 Flink SQL 的初學者能有所幫助。完整分享可以觀看 Meetup 視訊回顧 :

演示程式碼已經開源到了 GitHub 上:https://github.com/wuchong/flink-sql-submit

這份程式碼主要由兩部分組成:1) 能用來提交 SQL 檔案的 SqlSubmit 實現。2) 用於演示的 SQL 示例、Kafka 啟動停止指令碼、 一份測試資料集、Kafka 資料來源生成器。

通過本實戰,你將學到:

  1. 如何使用 Blink Planner
  2. 一個簡單的 SqlSubmit 是如何實現的
  3. 如何用 DDL 建立一個 Kafka 源表和 MySQL 結果表
  4. 執行一個從 Kafka 讀取資料,計算 PVUV,並寫入 MySQL 的作業
  5. 設定調優引數,觀察對作業的影響

SqlSubmit 的實現

筆者一開始是想用 SQL Client 來貫穿整個演示環節,但可惜 1.9 版本 SQL CLI 還不支援處理 CREATE TABLE 語句。所以筆者就只好自己寫了個簡單的提交指令碼。後來想想,也挺好的,可以讓聽眾同時瞭解如何通過 SQL 的方式,和程式設計的方式使用 Flink SQL。

SqlSubmit 的主要任務是執行和提交一個 SQL 檔案,實現非常簡單,就是通過正則表示式匹配每個語句塊。如果是 CREATE TABLE 或 INSERT INTO 開頭,則會呼叫 tEnv.sqlUpdate(...)。如果是 SET 開頭,則會將配置設定到 TableConfig 上。其核心程式碼主要如下所示:

EnvironmentSettings settings = EnvironmentSettings.newInstance()
        .useBlinkPlanner()
        .inStreamingMode()
        .build();
// 建立一個使用 Blink Planner 的 TableEnvironment, 並工作在流模式
TableEnvironment tEnv = TableEnvironment.create(settings);
// 讀取 SQL 檔案
List<String> sql = Files.readAllLines(path);
// 通過正則表示式匹配字首,來區分不同的 SQL 語句
List<SqlCommandCall> calls = SqlCommandParser.parse(sql);
// 根據不同的 SQL 語句,呼叫 TableEnvironment 執行
for (SqlCommandCall call : calls) {
  switch (call.command) {
    case SET:
      String key = call.operands[0];
      String value = call.operands[1];
      // 設定引數
      tEnv.getConfig().getConfiguration().setString(key, value);
      break;
    case CREATE_TABLE:
      String ddl = call.operands[0];
      tEnv.sqlUpdate(ddl);
      break;
    case INSERT_INTO:
      String dml = call.operands[0];
      tEnv.sqlUpdate(dml);
      break;
    default:
      throw new RuntimeException("Unsupported command: " + call.command);
  }
}
// 提交作業
tEnv.execute("SQL Job");

使用 DDL 連線 Kafka 源表

在 flink-sql-submit 專案中,我們準備了一份測試資料集(來自阿里雲天池公開資料集,特別鳴謝),位於 src/main/resources/user_behavior.log。資料以 JSON 格式編碼,大概長這個樣子:

{"user_id": "543462", "item_id":"1715", "category_id": "1464116", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "662867", "item_id":"2244074", "category_id": "1575622", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}

為了模擬真實的 Kafka 資料來源,筆者還特地寫了一個 source-generator.sh 指令碼(感興趣的可以看下原始碼),會自動讀取 user_behavior.log 的資料並以預設每毫秒1條的速率灌到 Kafka 的 user_behavior topic 中。

有了資料來源後,我們就可以用 DDL 去建立並連線這個 Kafka 中的 topic(詳見 src/main/resources/q1.sql)。

CREATE TABLE user_log (
    user_id VARCHAR,
    item_id VARCHAR,
    category_id VARCHAR,
    behavior VARCHAR,
    ts TIMESTAMP
) WITH (
    'connector.type' = 'kafka', -- 使用 kafka connector
    'connector.version' = 'universal',  -- kafka 版本,universal 支援 0.11 以上的版本
    'connector.topic' = 'user_behavior',  -- kafka topic
    'connector.startup-mode' = 'earliest-offset', -- 從起始 offset 開始讀取
    'connector.properties.0.key' = 'zookeeper.connect',  -- 連線資訊
    'connector.properties.0.value' = 'localhost:2181', 
    'connector.properties.1.key' = 'bootstrap.servers',
    'connector.properties.1.value' = 'localhost:9092', 
    'update-mode' = 'append',
    'format.type' = 'json',  -- 資料來源格式為 json
    'format.derive-schema' = 'true' -- 從 DDL schema 確定 json 解析規則
)

注:可能有使用者會覺得其中的 connector.properties.0.key 等引數比較奇怪,社群計劃將在下一個版本中改進並簡化 connector 的引數配置。

使用 DDL 連線 MySQL 結果表

連線 MySQL 可以使用 Flink 提供的 JDBC connector。例如

CREATE TABLE pvuv_sink (
    dt VARCHAR,
    pv BIGINT,
    uv BIGINT
) WITH (
    'connector.type' = 'jdbc', -- 使用 jdbc connector
    'connector.url' = 'jdbc:mysql://localhost:3306/flink-test', -- jdbc url
    'connector.table' = 'pvuv_sink', -- 表名
    'connector.username' = 'root', -- 使用者名稱
    'connector.password' = '123456', -- 密碼
    'connector.write.flush.max-rows' = '1' -- 預設5000條,為了演示改為1條
)

PV UV 計算

假設我們的需求是計算每小時全網的使用者訪問量,和獨立使用者數。很多使用者可能會想到使用滾動視窗來計算。但這裡我們介紹另一種方式。即 Group Aggregation 的方式。

INSERT INTO pvuv_sink
SELECT
  DATE_FORMAT(ts, 'yyyy-MM-dd HH:00') dt,
  COUNT(*) AS pv,
  COUNT(DISTINCT user_id) AS uv
FROM user_log
GROUP BY DATE_FORMAT(ts, 'yyyy-MM-dd HH:00')

它使用 DATE_FORMAT 這個內建函式,將日誌時間歸一化成“年月日小時”的字串格式,並根據這個字串進行分組,即根據每小時分組,然後通過 COUNT(*) 計算使用者訪問量(PV),通過 COUNT(DISTINCT user_id) 計算獨立使用者數(UV)。這種方式的執行模式是每收到一條資料,便會進行基於之前計算的值做增量計算(如+1),然後將最新結果輸出。所以實時性很高,但輸出量也大。

我們將這個查詢的結果,通過 INSERT INTO 語句,寫到了之前定義的 pvuv_sink MySQL 表中。

注:在深圳 Meetup 中,我們有對這種查詢的效能調優做了深度的介紹。

實戰演示

環境準備

本實戰演示環節需要安裝一些必須的服務,包括:

  • Flink 本地叢集:用來執行 Flink SQL 任務。
  • Kafka 本地叢集:用來作為資料來源。
  • MySQL 資料庫:用來作為結果表。
  • Flink 本地叢集安裝

1.下載 Flink 1.9.0 安裝包並解壓:https://www.apache.org/dist/flink/flink-1.9.0/flink-1.9.0-bin-scala_2.11.tgz
2.下載以下依賴 jar 包,並拷貝到 flink-1.9.0/lib/ 目錄下。因為我們執行時需要依賴各個 connector 實現。

  • flink-sql-connector-kafka_2.11-1.9.0.jar
    http://central.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/1.9.0/flink-sql-connector-kafka_2.11-1.9.0.jar
  • flink-json-1.9.0-sql-jar.jar
    http://central.maven.org/maven2/org/apache/flink/flink-json/1.9.0/flink-json-1.9.0-sql-jar.jar
  • flink-jdbc_2.11-1.9.0.jar
    http://central.maven.org/maven2/org/apache/flink/flink-jdbc_2.11/1.9.0/flink-jdbc_2.11-1.9.0.jar
  • mysql-connector-java-5.1.48.jar
    https://dev.mysql.com/downloads/connector/j/5.1.html

3.將 flink-1.9.0/conf/flink-conf.yaml 中的 taskmanager.numberOfTaskSlots 修改成 10,因為我們的演示任務可能會消耗多於1個的 slot。
4.在 flink-1.9.0 目錄下執行 ./bin/start-cluster.sh,啟動叢集。

執行成功的話,可以在http://localhost:8081訪問到 Flink Web UI。


另外,還需要將 Flink 的安裝路徑填到 flink-sql-submit 專案的 env.sh 中,用於後面提交 SQL 任務,如我的路徑是

FLINK_DIR=/Users/wuchong/dev/install/flink-1.9.0

Kafka 本地叢集安裝

下載 Kafka 2.2.0 安裝包並解壓:https://www.apache.org/dist/kafka/2.2.0/kafka_2.11-2.2.0.tgz

將安裝路徑填到 flink-sql-submit 專案的 env.sh 中,如我的路徑是

KAFKA_DIR=/Users/wuchong/dev/install/kafka_2.11-2.2.0

在 flink-sql-submit 目錄下執行 ./start-kafka.sh 啟動 Kafka 叢集。

在命令列執行 jps,如果看到 Kafka 程序和 QuorumPeerMain 程序即表明啟動成功。

MySQL 安裝

可以在官方頁面下載 MySQL 並安裝:
https://dev.mysql.com/downloads/mysql/
如果有 Docker 環境的話,也可以直接通過 Docker 安裝
https://hub.docker.com/_/mysql

$ docker pull mysql
$ docker run --name mysqldb -p 3306:3306 -e MYSQL_ROOT_PASSWORD=123456 -d mysql

然後在 MySQL 中建立一個flink-test的資料庫,並按照上文的 schema 建立pvuv_sink表。

提交 SQL 任務

1.在flink-sql-submit目錄下執行./source-generator.sh,會自動建立user_behavior topic,並實時往裡灌入資料。

2.在flink-sql-submit目錄下執行./run.sh q1, 提交二手手機號交易平臺地圖成功後,可以在 Web UI 中看到拓撲。

在 MySQL 客戶端,我們也可以實時地看到每個小時的 pv uv 值在不斷地變化

結尾

本文帶大家搭建基礎叢集環境,並使用 SqlSubmit 提交純 SQL 任務來學習瞭解如何連線外部系統。flink-sql-submit/src/main/resources/q1.sql中還有一些註釋掉的調優引數,感興趣的同學可以將引數開啟,觀察對作業的影響。關於這些調優引數的原理,可以看下我在深圳 Meetup上的分享《Flink SQL 1.9.0 技術內幕和最佳實踐》。