1. 程式人生 > 實用技巧 >Flink實戰(一百一):flink-sql使用(十七)connector(十五)ES的結合使用

Flink實戰(一百一):flink-sql使用(十七)connector(十五)ES的結合使用

SQL 構建一個端到端的流式應用(版本1.11)

1 環境準備

建立一個目錄,並切換到改目錄

[root@bigdata1 ~]# mkdir -p /opt/module/flink-sql/
[root@bigdata1 ~]# cd /opt/module/flink-sql/

從githup上下載docker-compose.yml 檔案到次目錄https://github.com/wuchong/flink-sql-demo/blob/v1.11-EN/docker-compose.yml

該 Docker Compose 中包含的容器有:

  • Flink SQL Client: 用於提交 Flink SQL
  • Flink叢集: 包含一個 JobManager 和 一個 TaskManager 用於執行 SQL 任務。
  • DataGen:資料生成器。容器啟動後會自動開始生成使用者行為資料,併發送到 Kafka 叢集中。預設每秒生成 2000 條資料,能持續生成一個多小時。也可以更改docker-compose.yml中 datagen 的speedup引數來調整生成速率(重啟 docker compose 才能生效)。
  • MySQL:集成了 MySQL 5.7 ,以及預先建立好了類目表(category),預先填入了子類目與頂級類目的對映關係,後續作為維表使用。
  • Kafka:主要用作資料來源。DataGen 元件會自動將資料灌入這個容器中。
  • Zookeeper:Kafka 容器依賴。
  • Elasticsearch:主要儲存 Flink SQL 產出的資料。
  • Kibana:視覺化 Elasticsearch 中的資料。

安裝Docker Compose

執行以下命令以下載Docker Compose的當前穩定版本:

[root@bigdata1 ~]# sudo curl -L "https://github.com/docker/compose/releases/download/1.27.4/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose

將可執行許可權應用於二進位制檔案:

[root@bigdata1 ~]# sudo chmod +x /usr/local/bin/docker-compose

測試安裝

[root@bigdata1 ~]# docker-compose  --version
[81516] Cannot open self /usr/local/bin/docker-compose or archive /usr/local/bin/docker-compose.pkg

切換到 /usr/local/bin/目錄下檢視docker-compose檔案,發現只有1.8M

[root@bigdata1 ~]# cd /usr/local/bin/
[root@bigdata1 bin]# ll
總用量 1752
-rwxr-xr-x. 1 root root 1792577 10月 21 01:29 docker-compose
[root@bigdata1 bin]# du -sh docker-compose 
1.8M    docker-compose

估計是因為wget下載是網路不好,沒有完整下載,此時需要去github上手動下載檔案https://github.com/docker/compose/releases

將檔案上傳到/usr/local/bin/ 目錄下,重新命名為docker-compose,修改檔案許可權:

[root@bigdata1 ~]# cd /usr/local/bin/
[root@bigdata1 bin]# mv docker-compose-Linux-x86_64 docker-compose
mv:是否覆蓋"docker-compose"? y
[root@bigdata1 ~]# sudo chmod +x /usr/local/bin/docker-compose

測試安裝

[root@bigdata1 ~]# docker-compose --version
docker-compose version 1.25.0-rc4, build 8f3c9c58

githup上的居然是1.25.0版本的,不是最新的fuck

啟動docker

[root@bigdata1 ~]# sudo systemctl start docker

切換到/opt/module/flink-sql/ 啟動容器docker-compose

[root@bigdata1 ~]# cd /opt/module/flink-sql/
[root@bigdata1 flink-sql]# docker-compose up -d
Creating network "flink-sql_default" with the default driver
Pulling jobmanager (flink:1.11.0-scala_2.11)...

docker ps 檢視五個容器是否正常啟動

[root@bigdata1 flink-sql]# docker ps
CONTAINER ID        IMAGE                                                 COMMAND                  CREATED             STATUS              PORTS                                                NAMES
c2fc3162f707        jark/demo-sql-client:0.2                              "/docker-entrypoint.…"   12 seconds ago      Up 10 seconds       6123/tcp, 8081/tcp                                   flink-sql_sql-client_1
f3c79ec39173        jark/datagen:0.2                                      "/usr/local/bin/mvn-…"   12 seconds ago      Up 11 seconds                                                            flink-sql_datagen_1
1ca8862da311        wurstmeister/kafka:2.12-2.2.1                         "start-kafka.sh"         13 seconds ago      Up 11 seconds       0.0.0.0:9092->9092/tcp, 0.0.0.0:9094->9094/tcp       flink-sql_kafka_1
648381b17e15        flink:1.11.0-scala_2.11                               "/docker-entrypoint.…"   13 seconds ago      Up 12 seconds       6123/tcp, 8081/tcp                                   flink-sql_taskmanager_1
bfdff79acdaa        wurstmeister/zookeeper:3.4.6                          "/bin/sh -c '/usr/sb…"   14 seconds ago      Up 12 seconds       22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp   flink-sql_zookeeper_1
5cca44730188        docker.elastic.co/elasticsearch/elasticsearch:7.6.0   "/usr/local/bin/dock…"   14 seconds ago      Up 12 seconds       0.0.0.0:9200->9200/tcp, 0.0.0.0:9300->9300/tcp       flink-sql_elasticsearch_1
1abf608ff0ed        flink:1.11.0-scala_2.11                               "/docker-entrypoint.…"   14 seconds ago      Up 12 seconds       6123/tcp, 0.0.0.0:8081->8081/tcp                     flink-sql_jobmanager_1
28ae43f07570        docker.elastic.co/kibana/kibana:7.6.0                 "/usr/local/bin/dumb…"   14 seconds ago      Up 12 seconds       0.0.0.0:5601->5601/tcp                               flink-sql_kibana_1
7349b947aab9        jark/mysql-example:0.2                                "docker-entrypoint.s…"   14 seconds ago      Up 13 seconds       0.0.0.0:3306->3306/tcp, 33060/tcp                    flink-sql_mysql_1

訪問http://localhost:5601/來檢視 Kibana 是否執行正常。

另外可以通過如下命令停止所有的容器:

[root@bigdata1 flink-sql]# docker-compose down

進入 SQL CLI 客戶端

[root@bigdata1 flink-sql]# docker-compose exec sql-client ./sql-client.sh

2 使用 DDL 建立 Kafka 表

Datagen 容器在啟動後會往 Kafka 的user_behaviortopic 中持續不斷地寫入資料。資料包含了2017年11月27日一天的使用者行為(行為包括點選、購買、加購、喜歡),每一行表示一條使用者行為,以 JSON 的格式由使用者ID、商品ID、商品類目ID、行為型別和時間組成。該原始資料集來自阿里雲天池公開資料集,特此鳴謝。

我們可以在docker-compose.yml所在目錄下執行如下命令,檢視 Kafka 叢集中生成的前5條資料。

[root@bigdata1 flink-sql]# docker-compose exec kafka bash -c 'kafka-console-consumer.sh --topic user_behavior --bootstrap-server kafka:9094 --from-beginning --max-messages 5'
{"user_id": "952483", "item_id":"310884", "category_id": "4580532", "behavior": "pv", "ts": "2017-11-27 00:00:00"}
{"user_id": "794777", "item_id":"5119439", "category_id": "982926", "behavior": "pv", "ts": "2017-11-27 00:00:00"}
{"user_id": "875914", "item_id":"4484065", "category_id": "1320293", "behavior": "pv", "ts": "2017-11-27 00:00:00"}
{"user_id": "980877", "item_id":"5097906", "category_id": "149192", "behavior": "pv", "ts": "2017-11-27 00:00:00"}
{"user_id": "944074", "item_id":"2348702", "category_id": "3002561", "behavior": "pv", "ts": "2017-11-27 00:00:00"}
Processed a total of 5 messages

有了資料來源後,我們就可以用 DDL 去建立並連線這個 Kafka 中的 topic 了。在 Flink SQL CLI 中執行該 DDL。

CREATE TABLE user_behavior (
    user_id BIGINT,
    item_id BIGINT,
    category_id BIGINT,
    behavior STRING,
    ts TIMESTAMP(3),
    proctime AS PROCTIME(),   -- generates processing-time attribute using computed column
    WATERMARK FOR ts AS ts - INTERVAL '5' SECOND  -- defines watermark on ts column, marks ts as event-time attribute
) WITH (
    'connector' = 'kafka',  -- using kafka connector
    'topic' = 'user_behavior',  -- kafka topic
    'scan.startup.mode' = 'earliest-offset',  -- reading from the beginning
    'properties.bootstrap.servers' = 'kafka:9094',  -- kafka broker address
    'format' = 'json'  -- the data format is json
);

如上我們按照資料的格式聲明瞭 5 個欄位,除此之外,我們還通過計算列語法和PROCTIME()內建函式聲明瞭一個產生處理時間的虛擬列。我們還通過 WATERMARK 語法,在 ts 欄位上聲明瞭 watermark 策略(容忍5秒亂序), ts 欄位因此也成了事件時間列。關於時間屬性以及 DDL 語法可以閱讀官方文件瞭解更多:

在 SQL CLI 中成功建立 Kafka 表後,可以通過show tables;describe user_behavior;來檢視目前已註冊的表,以及表的詳細資訊。我們也可以直接在 SQL CLI 中執行SELECT * FROM user_behavior;預覽下資料(按q退出)。

接下來,我們會通過三個實戰場景來更深入地瞭解 Flink SQL 。

3 統計每小時的成交量

3.1 使用 DDL 建立 Elasticsearch 表

我們先在 SQL CLI 中建立一個 ES 結果表,根據場景需求主要需要儲存兩個資料:小時、成交量。

CREATE TABLE buy_cnt_per_hour (
    hour_of_day BIGINT,
    buy_cnt BIGINT
) WITH (
    'connector' = 'elasticsearch-7', -- using elasticsearch connector
    'hosts' = 'http://elasticsearch:9200',  -- elasticsearch address
    'index' = 'buy_cnt_per_hour'  -- elasticsearch index name, similar to database table name
);
我們不需要在 Elasticsearch 中事先建立buy_cnt_per_hour索引,Flink Job 會自動建立該索引。

3.2 提交 Query

統計每小時的成交量就是每小時共有多少 "buy" 的使用者行為。因此會需要用到 TUMBLE 視窗函式,按照一小時切窗。然後每個視窗分別統計 "buy" 的個數,這可以通過先過濾出 "buy" 的資料,然後COUNT(*)實現。
INSERT INTO buy_cnt_per_hour
SELECT HOUR(TUMBLE_START(ts, INTERVAL '1' HOUR)), COUNT(*)
FROM user_behavior
WHERE behavior = 'buy'
GROUP BY TUMBLE(ts, INTERVAL '1' HOUR);
這裡我們使用HOUR內建函式,從一個 TIMESTAMP 列中提取出一天中第幾個小時的值。使用了INSERT INTO將 query 的結果持續不斷地插入到上文定義的 es 結果表中
(可以將 es 結果表理解成 query 的物化檢視)。
另外可以閱讀該文件瞭解更多關於視窗聚合的內容:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/queries.html#group-windows

在 Flink SQL CLI 中執行上述查詢後,在 Flink Web UI 中就能看到提交的任務,該任務是一個流式任務,因此會一直執行。

可以看到凌晨是一天中成交量的低谷。

3.3 使用 Kibana 視覺化結果

我們已經通過 Docker Compose 啟動了 Kibana 容器,可以通過http://localhost:5601訪問 Kibana。首先我們需要先配置一個 index pattern。點選左側工具欄的 "Management",就能找到 "Index Patterns"。點選 "Create Index Pattern",然後通過輸入完整的索引名 "buy_cnt_per_hour" 建立 index pattern。建立完成後, Kibana 就知道了我們的索引,我們就可以開始探索資料了。

先點選左側工具欄的"Discovery"按鈕,Kibana 就會列出剛剛建立的索引中的內容。

接下來,我們先建立一個 Dashboard 用來展示各個視覺化的檢視。點選頁面左側的"Dashboard",建立一個名為 ”使用者行為日誌分析“ 的Dashboard。然後點選 "Create New" 建立一個新的檢視,選擇 "Area" 面積圖,選擇 "buy_cnt_per_hour" 索引,按照如下截圖中的配置(左側)畫出成交量面積圖,並儲存為”每小時成交量“。

4 統計一天每10分鐘累計獨立使用者數

另一個有意思的視覺化是統計一天中每一刻的累計獨立使用者數(uv),也就是每一刻的 uv 數都代表從0點到當前時刻為止的總計 uv 數,因此該曲線肯定是單調遞增的。

我們仍然先在 SQL CLI 中建立一個 Elasticsearch 表,用於儲存結果彙總資料。主要欄位有:日期時間和累積 uv 數。我們將日期時間作為 Elasticsearch 中的 document id,便於更新該日期時間的 uv 值。

CREATE TABLE cumulative_uv (
    date_str STRING,
    time_str STRING,
    uv BIGINT,
    PRIMARY KEY (date_str, time_str) NOT ENFORCED
) WITH (
    'connector' = 'elasticsearch-7',
    'hosts' = 'http://elasticsearch:9200',
    'index' = 'cumulative_uv'
);

為了實現該曲線,我們先抽取出日期和時間欄位,我們使用DATE_FORMAT抽取出基本的日期與時間,再用SUBSTR和 字串連線函式||將時間修正到10分鐘級別,如:12:10,12:20。其次,我們在外層查詢上基於日期分組,求當前最大的時間,和 UV,寫入到 Elasticsearch 的索引中。UV 的統計我們通過內建的COUNT(DISTINCT user_id)來完成,Flink SQL 內部對 COUNT DISTINCT 做了非常多的優化,因此可以放心使用。

這裡之所以需要求最大的時間,同時又按日期+時間作為主鍵寫入到 Elasticsearch,是因為我們在計算累積 UV 數。

INSERT INTO cumulative_uv
SELECT date_str, MAX(time_str), COUNT(DISTINCT user_id) as uv
FROM (
  SELECT
    DATE_FORMAT(ts, 'yyyy-MM-dd') as date_str,
    SUBSTR(DATE_FORMAT(ts, 'HH:mm'),1,4) || '0' as time_str,
    user_id
  FROM user_behavior)
GROUP BY date_str;

提交上述查詢後,在 Kibana 中建立cumulative_uv的 index pattern,然後在 Dashboard 中建立一個"Line"折線圖,選擇cumulative_uv索引,按照如下截圖中的配置(左側)畫出累計獨立使用者數曲線,並儲存。

5 頂級類目排行榜

最後一個有意思的視覺化是類目排行榜,從而瞭解哪些類目是支柱類目。不過由於源資料中的類目分類太細(約5000個類目),對於排行榜意義不大,因此我們希望能將其歸約到頂級類目。所以筆者在 mysql 容器中預先準備了子類目與頂級類目的對映資料,用作維表。

在 SQL CLI 中建立 MySQL 表,後續用作維表查詢。

CREATE TABLE category_dim (
    sub_category_id BIGINT,
    parent_category_name STRING
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://mysql:3306/flink',
    'table-name' = 'category',
    'username' = 'root',
    'password' = '123456',
    'lookup.cache.max-rows' = '5000',
    'lookup.cache.ttl' = '10min'
);
同時我們再建立一個 Elasticsearch 表,用於儲存類目統計結果。
CREATE TABLE top_category (
    category_name STRING PRIMARY KEY NOT ENFORCED,
    buy_cnt BIGINT
) WITH (
    'connector' = 'elasticsearch-7',
    'hosts' = 'http://elasticsearch:9200',
    'index' = 'top_category'
);
第一步我們通過維表關聯,補全類目名稱。我們仍然使用 CREATE VIEW 將該查詢註冊成一個檢視,簡化邏輯。維表關聯使用 temporal join 語法,
可以檢視文件瞭解更多:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/streaming/joins.html#join-with-a-temporal-tabl
CREATE VIEW rich_user_behavior AS
SELECT U.user_id, U.item_id, U.behavior, C.parent_category_name as category_name
FROM user_behavior AS U LEFT JOIN category_dim FOR SYSTEM_TIME AS OF U.proctime AS C
ON U.category_id = C.sub_category_id;
最後根據 類目名稱分組,統計出buy的事件數,並寫入 Elasticsearch 中。
INSERT INTO top_category
SELECT category_name, COUNT(*) buy_cnt
FROM rich_user_behavior
WHERE behavior = 'buy'
GROUP BY category_name;
提交上述查詢後,在 Kibana 中建立top_category的 index pattern,然後在 Dashboard 中建立一個"Horizontal Bar"條形圖,
選擇top_category索引,按照如下截圖中的配置(左側)畫出類目排行榜,並儲存。

可以看到服飾鞋包的成交量遠遠領先其他類目。

Kibana 還提供了非常豐富的圖形和視覺化選項,感興趣的使用者可以用 Flink SQL 對資料進行更多維度的分析,並使用 Kibana 展示出視覺化圖,並觀測圖形資料的實時變化。

6 結尾

在本文中,我們展示瞭如何使用 Flink SQL 整合 Kafka, MySQL, Elasticsearch 以及 Kibana 來快速搭建一個實時分析應用。整個過程無需一行 Java/Scala 程式碼,使用 SQL 純文字即可完成。期望通過本文,可以讓讀者瞭解到 Flink SQL 的易用和強大,包括輕鬆連線各種外部系統、對事件時間和亂序資料處理的原生支援、維表關聯、豐富的內建函式等等。希望你能喜歡我們的實戰演練,並從中獲得樂趣和知識!

本文章轉自:https://github.com/wuchong/flink-sql-demo#%E5%87%86%E5%A4%87