1. 程式人生 > 其它 >Demo:基於 Flink SQL 構建流式應用

Demo:基於 Flink SQL 構建流式應用

簡介:本文所有的實戰演練都將在 Flink SQL CLI 上執行,全程只涉及 SQL 純文字,無需一行 Java/Scala 程式碼,無需安裝 IDE。

上週四在 Flink 中文社群釘釘群中直播分享了《Demo:基於 Flink SQL 構建流式應用》,直播內容偏向實戰演示。這篇文章是對直播內容的一個總結,並且改善了部分內容,比如除 Flink 外其他元件全部採用 Docker Compose 安裝,簡化準備流程。讀者也可以結合視訊和本文一起學習。完整分享可以觀看視訊回顧:
Flink 1.10.0 於近期剛釋出,釋放了許多令人激動的新特性。尤其是 Flink SQL 模組,發展速度非常快,因此本文特意從實踐的角度出發,帶領大家一起探索使用 Flink SQL 如何快速構建流式應用。
本文將基於 Kafka, MySQL, Elasticsearch, Kibana,使用 Flink SQL 構建一個電商使用者行為的實時分析應用。本文所有的實戰演練都將在 Flink SQL CLI 上執行,全程只涉及 SQL 純文字,無需一行 Java/Scala 程式碼,無需安裝 IDE。本實戰演練的最終效果圖:


準備
一臺裝有 Docker 和 Java8 的 Linux 或 MacOS 計算機。
使用 Docker Compose 啟動容器
本實戰演示所依賴的元件全都編排到了容器中,因此可以通過docker-compose一鍵啟動。你可以通過wget命令自動下載該docker-compose.yml檔案,也可以手動下載。
mkdir flink-demo; cd flink-demo; wget
該 Docker Compose 中包含的容器有:

  • DataGen:
    資料生成器。容器啟動後會自動開始生成使用者行為資料,併發送到 Kafka 叢集中。預設每秒生成 1000 條資料,持續生成約 3 小時。也可以更改docker-compose.yml中 datagen 的speedup引數來調整生成速率(重啟 docker compose 才能生效)。
  • MySQL:集成了 MySQL 5.7 ,以及預先建立好了類目表(category),預先填入了子類目與頂級類目的對映關係,後續作為維表使用。
  • Kafka:主要用作資料來源。DataGen 元件會自動將資料灌入這個容器中。
  • Zookeeper:Kafka 容器依賴。
  • Elasticsearch:主要儲存 Flink SQL 產出的資料。
  • Kibana:視覺化 Elasticsearch 中的資料。

在啟動容器前,建議修改 Docker 的配置,將資源調整到 4GB 以及 4核。啟動所有的容器,只需要在docker-compose.yml所在目錄下執行如下命令。
docker-compose up -d
該命令會以 detached 模式自動啟動 Docker Compose 配置中定義的所有容器。你可以通過docker ps來觀察上述的五個容器是否正常啟動了。 也可以訪問來檢視 Kibana 是否執行正常。
另外可以通過如下命令停止所有的容器:
docker-compose down
下載安裝 Flink 本地叢集
我們推薦使用者手動下載安裝 Flink,而不是通過 Docker 自動啟動 Flink。因為這樣可以更直觀地理解 Flink 的各個元件、依賴、和指令碼。

  1. 下載 Flink 1.10.0 安裝包並解壓(解壓目錄flink-1.10.0):
  2. 進入 flink-1.10.0 目錄:cd flink-1.10.0
  3. 通過如下命令下載依賴 jar 包,並拷貝到lib/目錄下,也可手動下載和拷貝。因為我們執行時需要依賴各個 connector 實現。
  4. -P ./lib/| \
    wget -P ./lib/| \
    wget -P ./lib/| \
    wget -P ./lib/| \
    wget -P ./lib/
  5. conf/flink-conf.yaml中的taskmanager.numberOfTaskSlots修改成 10,因為我們會同時執行多個任務。
  6. 執行./bin/start-cluster.sh,啟動叢集。
    執行成功的話,可以在http://localhost:8081訪問到 Flink Web UI。並且可以看到可用 Slots 數為 10 個。

  1. 執行bin/sql-client.sh embedded啟動 SQL CLI。便會看到如下的松鼠歡迎介面。


使用 DDL 建立 Kafka 表
Datagen 容器在啟動後會往 Kafka 的user_behaviortopic 中持續不斷地寫入資料。資料包含了2017年11月27日一天的使用者行為(行為包括點選、購買、加購、喜歡),每一行表示一條使用者行為,以 JSON 的格式由使用者ID、商品ID、商品類目ID、行為型別和時間組成。該原始資料集來自阿里雲天池公開資料集,特此鳴謝。
我們可以在docker-compose.yml所在目錄下執行如下命令,檢視 Kafka 叢集中生成的前10條資料。
docker-composeexeckafkabash-c'kafka-console-consumer.sh--topicuser_behavior--bootstrap-serverkafka:9094--from-beginning--max-messages10'
{"user_id": "952483", "item_id":"310884", "category_id": "4580532", "behavior": "pv", "ts": "2017-11-27T00:00:00Z"} {"user_id": "794777", "item_id":"5119439", "category_id": "982926", "behavior": "pv", "ts": "2017-11-27T00:00:00Z"} ...
有了資料來源後,我們就可以用 DDL 去建立並連線這個 Kafka 中的 topic 了。在 Flink SQL CLI 中執行該 DDL。
CREATE TABLE user_behavior ( user_id BIGINT, item_id BIGINT, category_id BIGINT, behavior STRING, ts TIMESTAMP(3), proctime as PROCTIME(),-- 通過計算列產生一個處理時間列WATERMARK FOR ts as ts - INTERVAL '5' SECOND-- 在ts上定義watermark,ts成為事件時間列) WITH ( 'connector.type' = 'kafka',-- 使用 kafka connector'connector.version' = 'universal',-- kafka 版本,universal 支援 0.11 以上的版本'connector.topic' = 'user_behavior',-- kafka topic'connector.startup-mode' = 'earliest-offset',-- 從起始 offset 開始讀取'connector.properties.zookeeper.connect' = 'localhost:2181',-- zookeeper 地址'connector.properties.bootstrap.servers' = 'localhost:9092',-- kafka broker 地址'format.type' = 'json'-- 資料來源格式為 json);
如上我們按照資料的格式聲明瞭 5 個欄位,除此之外,我們還通過計算列語法和PROCTIME()內建函式聲明瞭一個產生處理時間的虛擬列。我們還通過 WATERMARK 語法,在 ts 欄位上聲明瞭 watermark 策略(容忍5秒亂序), ts 欄位因此也成了事件時間列。關於時間屬性以及 DDL 語法可以閱讀官方文件瞭解更多:

  • 時間屬性:
  • DDL:

在 SQL CLI 中成功建立 Kafka 表後,可以通過show tables;describe user_behavior;來檢視目前已註冊的表,以及表的詳細資訊。我們也可以直接在 SQL CLI 中執行SELECT * FROM user_behavior;預覽下資料(按q退出)。
接下來,我們會通過三個實戰場景來更深入地瞭解 Flink SQL 。
統計每小時的成交量
使用 DDL 建立 Elasticsearch 表
我們先在 SQL CLI 中建立一個 ES 結果表,根據場景需求主要需要儲存兩個資料:小時、成交量。
CREATE TABLE buy_cnt_per_hour ( hour_of_day BIGINT, buy_cnt BIGINT ) WITH ( 'connector.type' = 'elasticsearch',-- 使用 elasticsearch connector'connector.version' = '6',-- elasticsearch 版本,6 能支援 es 6+ 以及 7+ 的版本'connector.hosts' = 'http://localhost:9200',-- elasticsearch 地址'connector.index' = 'buy_cnt_per_hour',-- elasticsearch 索引名,相當於資料庫的表名'connector.document-type' = 'user_behavior',-- elasticsearch 的 type,相當於資料庫的庫名'connector.bulk-flush.max-actions' = '1',-- 每條資料都重新整理'format.type' = 'json',-- 輸出資料格式 json'update-mode' = 'append' );
我們不需要在 Elasticsearch 中事先建立buy_cnt_per_hour索引,Flink Job 會自動建立該索引。
提交 Query
統計每小時的成交量就是每小時共有多少 "buy" 的使用者行為。因此會需要用到 TUMBLE 視窗函式,按照一小時切窗。然後每個視窗分別統計 "buy" 的個數,這可以通過先過濾出 "buy" 的資料,然後COUNT(*)實現。
INSERT INTO buy_cnt_per_hour SELECT HOUR(TUMBLE_START(ts, INTERVAL '1' HOUR)), COUNT(*) FROM user_behavior WHERE behavior = 'buy' GROUP BY TUMBLE(ts, INTERVAL '1' HOUR);
這裡我們使用HOUR內建函式,從一個 TIMESTAMP 列中提取出一天中第幾個小時的值。使用了INSERT INTO將 query 的結果持續不斷地插入到上文定義的 es 結果表中(可以將 es 結果表理解成 query 的物化檢視)。另外可以閱讀該文件瞭解更多關於視窗聚合的內容:
在 Flink SQL CLI 中執行上述查詢後,在 Flink Web UI 中就能看到提交的任務,該任務是一個流式任務,因此會一直執行。


使用 Kibana 視覺化結果
我們已經通過 Docker Compose 啟動了 Kibana 容器,可以通過http://localhost:5601訪問 Kibana。首先我們需要先配置一個 index pattern。點選二手手機拍賣平臺地圖左側工具欄的 "Management",就能找到 "Index Patterns"。點選 "Create Index Pattern",然後通過輸入完整的索引名 "buy_cnt_per_hour" 建立 index pattern。建立完成後, Kibana 就知道了我們的索引,我們就可以開始探索資料了。
先點選左側工具欄的"Discovery"按鈕,Kibana 就會列出剛剛建立的索引中的內容。


接下來,我們先建立一個 Dashboard 用來展示各個視覺化的檢視。點選頁面左側的"Dashboard",建立一個名為 ”使用者行為日誌分析“ 的Dashboard。然後點選 "Create New" 建立一個新的檢視,選擇 "Area" 面積圖,選擇 "buy_cnt_per_hour" 索引,按照如下截圖中的配置(左側)畫出成交量面積圖,並儲存為”每小時成交量“。


可以看到凌晨是一天中成交量的低谷。
統計一天每10分鐘累計獨立使用者數
另一個有意思的視覺化是統計一天中每一刻的累計獨立使用者數(uv),也就是每一刻的 uv 數都代表從0點到當前時刻為止的總計 uv 數,因此該曲線肯定是單調遞增的。
我們仍然先在 SQL CLI 中建立一個 Elasticsearch 表,用於儲存結果彙總資料。主要有兩個欄位:時間和累積 uv 數。
CREATE TABLE cumulative_uv ( time_str STRING, uv BIGINT ) WITH ( 'connector.type' = 'elasticsearch', 'connector.version' = '6', 'connector.hosts' = 'http://localhost:9200','connector.index' = 'cumulative_uv', 'connector.document-type' = 'user_behavior', 'format.type' = 'json', 'update-mode' = 'upsert' );
為了實現該曲線,我們可以先通過 OVER WINDOW 計算出每條資料的當前分鐘,以及當前累計 uv(從0點開始到當前行為止的獨立使用者數)。 uv 的統計我們通過內建的COUNT(DISTINCT user_id)來完成,Flink SQL 內部對 COUNT DISTINCT 做了非常多的優化,因此可以放心使用。
CREATE VIEW uv_per_10min AS SELECT MAX(SUBSTR(DATE_FORMAT(ts, 'HH:mm'),1,4) || '0') OVER w AS time_str, COUNT(DISTINCT user_id) OVER w AS uv FROM user_behavior WINDOW w AS (ORDER BY proctime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW);
這裡我們使用SUBSTRDATE_FORMAT還有||內建函式,將一個 TIMESTAMP 欄位轉換成了 10分鐘單位的時間字串,如:12:10,12:20。關於 OVER WINDOW 的更多內容可以參考文件:
我們還使用了 CREATE VIEW 語法將 query 註冊成了一個邏輯檢視,可以方便地在後續查詢中對該 query 進行引用,這有利於拆解複雜 query。注意,建立邏輯檢視不會觸發作業的執行,檢視的結果也不會落地,因此使用起來非常輕量,沒有額外開銷。由於uv_per_10min每條輸入資料都產生一條輸出資料,因此對於儲存壓力較大。我們可以基於uv_per_10min再根據分鐘時間進行一次聚合,這樣每10分鐘只有一個點會儲存在 Elasticsearch 中,對於 Elasticsearch 和 Kibana 視覺化渲染的壓力會小很多。
INSERT INTO cumulative_uv SELECT time_str, MAX(uv) FROM uv_per_10min GROUP BY time_str;
提交上述查詢後,在 Kibana 中建立cumulative_uv的 index pattern,然後在 Dashboard 中建立一個"Line"折線圖,選擇cumulative_uv索引,按照如下截圖中的配置(左側)畫出累計獨立使用者數曲線,並儲存。


頂級類目排行榜
最後一個有意思的視覺化是類目排行榜,從而瞭解哪些類目是支柱類目。不過由於源資料中的類目分類太細(約5000個類目),對於排行榜意義不大,因此我們希望能將其歸約到頂級類目。所以筆者在 mysql 容器中預先準備了子類目與頂級類目的對映資料,用作維表。
在 SQL CLI 中建立 MySQL 表,後續用作維表查詢。
CREATE TABLE category_dim ( sub_category_id BIGINT,-- 子類目parent_category_id BIGINT-- 頂級類目) WITH ( 'connector.type' = 'jdbc', 'connector.url' = 'jdbc:mysql://localhost:3306/flink', 'connector.table' = 'category', 'connector.driver' = 'com.mysql.jdbc.Driver', 'connector.username' = 'root', 'connector.password' = '123456', 'connector.lookup.cache.max-rows' = '5000', 'connector.lookup.cache.ttl' = '10min' );
同時我們再建立一個 Elasticsearch 表,用於儲存類目統計結果。
CREATE TABLE top_category ( category_name STRING,-- 類目名稱buy_cnt BIGINT-- 銷量) WITH ( 'connector.type' = 'elasticsearch', 'connector.version' = '6', 'connector.hosts' = 'http://localhost:9200', 'connector.index' = 'top_category', 'connector.document-type' = 'user_behavior', 'format.type' = 'json', 'update-mode' = 'upsert' );
第一步我們通過維表關聯,補全類目名稱。我們仍然使用 CREATE VIEW 將該查詢註冊成一個檢視,簡化邏輯。維表關聯使用 temporal join 語法,可以檢視文件瞭解更多:
CREATE VIEW rich_user_behavior AS SELECT U.user_id, U.item_id, U.behavior, CASE C.parent_category_id WHEN 1 THEN '服飾鞋包' WHEN 2 THEN '家裝家飾' WHEN 3 THEN '家電' WHEN 4 THEN '美妝' WHEN 5 THEN '母嬰' WHEN 6 THEN '3C數碼' WHEN 7 THEN '運動戶外' WHEN 8 THEN '食品' ELSE '其他' END AS category_name FROM user_behavior AS U LEFT JOIN category_dim FOR SYSTEM_TIME AS OF U.proctime AS C ON U.category_id = C.sub_category_id;
最後根據 類目名稱分組,統計出buy的事件數,並寫入 Elasticsearch 中。
INSERT INTO top_category SELECT category_name, COUNT(*) buy_cnt FROM rich_user_behavior WHERE behavior = 'buy' GROUP BY category_name;
提交上述查詢後,在 Kibana 中建立top_category的 index pattern,然後在 Dashboard 中建立一個"Horizontal Bar"條形圖,選擇top_category索引,按照如下截圖中的配置(左側)畫出類目排行榜,並儲存。


可以看到服飾鞋包的成交量遠遠領先其他類目。
到目前為止,我們已經完成了三個實戰案例及其視覺化檢視。現在可以回到 Dashboard 頁面,對各個檢視進行拖拽編排,讓我們的 Dashboard 看上去更加正式、直觀(如本文開篇效果圖)。當然,Kibana 還提供了非常豐富的圖形和視覺化選項,而使用者行為資料中也有很多有意思的資訊值得挖掘,感興趣的讀者可以用 Flink SQL 對資料進行更多維度的分析,並使用 Kibana 展示更多視覺化圖,並觀測圖形資料的實時變化。
結尾
在本文中,我們展示瞭如何使用 Flink SQL 整合 Kafka, MySQL, Elasticsearch 以及 Kibana 來快速搭建一個實時分析應用。整個過程無需一行 Java/Scala 程式碼,使用 SQL 純文字即可完成。期望通過本文,可以讓讀者瞭解到 Flink SQL 的易用和強大,包括輕鬆連線各種外部系統、對事件時間和亂序資料處理的原生支援、維表關聯、豐富的內建函式等等。希望你能喜歡我們的實戰演練,並從中獲得樂趣和知識!

SQL 訊息中介軟體 資料視覺化 關係型資料庫 MySQL Kafka 流計算 Docker 索引 容器