1. 程式人生 > >Flink SQL Client綜合實戰

Flink SQL Client綜合實戰

### 歡迎訪問我的GitHub [https://github.com/zq2599/blog_demos](https://github.com/zq2599/blog_demos) 內容:所有原創文章分類彙總及配套原始碼,涉及Java、Docker、Kubernetes、DevOPS等; 在[《Flink SQL Client初探》](https://blog.csdn.net/boling_cavalry/article/details/105964425)一文中,我們體驗了Flink SQL Client的基本功能,今天來通過實戰更深入學習和體驗Flink SQL; ### 實戰內容 本次實戰主要是通過Flink SQL Client消費kafka的實時訊息,再用各種SQL操作對資料進行查詢統計,內容彙總如下: 1. DDL建立Kafka表 2. 視窗統計; 3. 資料寫入ElasticSearch 4. 聯表操作 ### 版本資訊 1. Flink:1.10.0 2. Flink所在作業系統:CentOS Linux release 7.7.1908 3. JDK:1.8.0_211 4. Kafka:2.4.0(scala:2.12) 5. Mysql:5.7.29 ### 資料來源準備 1. 本次實戰用的資料,來源是阿里雲天池公開資料集的一份淘寶使用者行為資料集,獲取方式請參考[《準備資料集用於flink學習》](https://blog.csdn.net/boling_cavalry/article/details/106033059) 2. 獲取到資料集檔案後轉成kafka訊息發出,這樣我們使用Flink SQL時就按照實時消費kafka訊息的方式來操作,具體的操作方式請參考[《將CSV的資料傳送到kafka》](https://blog.csdn.net/boling_cavalry/article/details/106033472) 3. 上述操作完成後,一百零四萬條淘寶使用者行為資料就會通過kafka訊息順序發出,咱們的實戰就有不間斷實時資料可用 了,訊息內容如下: ```shell {"user_id":1004080,"item_id":2258662,"category_id":79451,"behavior":"pv","ts":"2017-11-24T23:47:47Z"} {"user_id":100814,"item_id":5071478,"category_id":1107469,"behavior":"pv","ts":"2017-11-24T23:47:47Z"} {"user_id":114321,"item_id":4306269,"category_id":4756105,"behavior":"pv","ts":"2017-11-24T23:47:48Z"} ``` 4. 上述訊息中每個欄位的含義如下表: | 列名稱 | 說明 | |--|--| | 使用者ID | 整數型別,序列化後的使用者ID | | 商品ID | 整數型別,序列化後的商品ID | | 商品類目ID | 整數型別,序列化後的商品所屬類目ID | | 行為型別 | 字串,列舉型別,包括('pv', 'buy', 'cart', 'fav') | | 時間戳 | 行為發生的時間戳 | | 時間字串 | 根據時間戳欄位生成的時間字串 | ### jar準備 實戰過程中要用到下面這五個jar檔案: 1. flink-jdbc_2.11-1.10.0.jar 2. flink-json-1.10.0.jar 3. flink-sql-connector-elasticsearch6_2.11-1.10.0.jar 4. flink-sql-connector-kafka_2.11-1.10.0.jar 5. mysql-connector-java-5.1.48.jar 我已將這些檔案打包上傳到GitHub,下載地址:https://raw.githubusercontent.com/zq2599/blog_demos/master/files/sql_lib.zip 請在flink安裝目錄下新建資料夾sql_lib,然後將這五個jar檔案放進去; ### Elasticsearch準備 如果您裝了docker和docker-compose,那麼下面的命令可以快速部署elasticsearch和head工具: ```shell wget https://raw.githubusercontent.com/zq2599/blog_demos/master/elasticsearch_docker_compose/docker-compose.yml && \ docker-compose up -d ``` 準備完畢,開始操作吧; ### DDL建立Kafka表 1. 進入flink目錄,啟動flink:bin/start-cluster.sh 2. 啟動Flink SQL Client:bin/sql-client.sh embedded -l sql_lib 3. 啟動成功顯示如下: ![在這裡插入圖片描述](https://img2020.cnblogs.com/other/485422/202011/485422-20201117104443254-205211446.png) 4. 執行以下命令即可建立kafka表,請按照自己的資訊調整引數: ```sql CREATE TABLE user_behavior ( user_id BIGINT, item_id BIGINT, category_id BIGINT, behavior STRING, ts TIMESTAMP(3), proctime as PROCTIME(), -- 處理時間列 WATERMARK FOR ts as ts - INTERVAL '5' SECOND -- 在ts上定義watermark,ts成為事件時間列 ) WITH ( 'connector.type' = 'kafka', -- kafka connector 'connector.version' = 'universal', -- universal 支援 0.11 以上的版本 'connector.topic' = 'user_behavior', -- kafka topic 'connector.startup-mode' = 'earliest-offset', -- 從起始 offset 開始讀取 'connector.properties.zookeeper.connect' = '192.168.50.43:2181', -- zk 地址 'connector.properties.bootstrap.servers' = '192.168.50.43:9092', -- broker 地址 'format.type' = 'json' -- 資料來源格式為 json ); ``` 5. 執行SELECT * FROM user_behavior;看看原始資料,如果訊息正常應該和下圖類似: ![6.](https://img2020.cnblogs.com/other/485422/202011/485422-20201117104445130-1421286278.png) ### 視窗統計 1. 下面的SQL是以每十分鐘為視窗,統計每個視窗內的總瀏覽數,TUMBLE_START返回的資料格式是timestamp,這裡再呼叫DATE_FORMAT函式將其格式化成了字串: ```SQL SELECT DATE_FORMAT(TUMBLE_START(ts, INTERVAL '10' MINUTE), 'yyyy-MM-dd hh:mm:ss'), DATE_FORMAT(TUMBLE_END(ts, INTERVAL '10' MINUTE), 'yyyy-MM-dd hh:mm:ss'), COUNT(*) FROM user_behavior WHERE behavior = 'pv' GROUP BY TUMBLE(ts, INTERVAL '10' MINUTE); ``` 2. 得到資料如下所示: ![在這裡插入圖片描述](https://img2020.cnblogs.com/other/485422/202011/485422-20201117104445805-2102101642.png) ### 資料寫入ElasticSearch 1. 確保elasticsearch已部署好; 2. 執行以下語句即可建立es表,請按照您自己的es資訊調整下面的引數: ```sql CREATE TABLE pv_per_minute ( start_time STRING, end_time STRING, pv_cnt BIGINT ) WITH ( 'connector.type' = 'elasticsearch', -- 型別 'connector.version' = '6', -- elasticsearch版本 'connector.hosts' = 'http://192.168.133.173:9200', -- elasticsearch地址 'connector.index' = 'pv_per_minute', -- 索引名,相當於資料庫表名 'connector.document-type' = 'user_behavior', -- type,相當於資料庫庫名 'connector.bulk-flush.max-actions' = '1', -- 每條資料都重新整理 'format.type' = 'json', -- 輸出資料格式json 'update-mode' = 'append' ); ``` 3. 執行以下語句,就會將每分鐘的pv總數寫入es的pv_per_minute索引: ```sql INSERT INTO pv_per_minute SELECT DATE_FORMAT(TUMBLE_START(ts, INTERVAL '1' MINUTE), 'yyyy-MM-dd hh:mm:ss') AS start_time, DATE_FORMAT(TUMBLE_END(ts, INTERVAL '1' MINUTE), 'yyyy-MM-dd hh:mm:ss') AS end_time, COUNT(*) AS pv_cnt FROM user_behavior WHERE behavior = 'pv' GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE); ``` 4. 用es-head檢視,發現數據已成功寫入: ![在這裡插入圖片描述](https://img2020.cnblogs.com/other/485422/202011/485422-20201117104448259-774015497.png) ### 聯表操作 1. 當前user_behavior表的category_id表示商品類目,例如11120表示計算機書籍,61626表示牛仔褲,本次實戰的資料集中,這樣的類目共有五千多種; 2. 如果我們將這五千多種類目分成6個大類,例如11120屬於教育類,61626屬於服裝類,那麼應該有個大類和類目的關係表; 3. 這個大類和類目的關係表在MySQL建立,表名叫category_info,建表語句如下: ```sql CREATE TABLE `category_info`( `id` int(11) unsigned NOT NULL AUTO_INCREMENT, `parent_id` bigint , `category_id` bigint , PRIMARY KEY ( `id` ) ) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8 COLLATE=utf8_bin; ``` 4. 表category_info所有資料來自對原始資料中category_id欄位的提取,並且隨機將它們劃分為6個大類,該表的資料請在我的GitHub下載:https://raw.githubusercontent.com/zq2599/blog_demos/master/files/category_info.sql 5. 請在MySQL上建表category_info,並將上述資料全部寫進去; 6. 在Flink SQL Client執行以下語句建立這個維表,mysql資訊請按您自己配置調整: ```sql CREATE TABLE category_info ( parent_id BIGINT, -- 商品大類 category_id BIGINT -- 商品詳細類目 ) WITH ( 'connector.type' = 'jdbc', 'connector.url' = 'jdbc:mysql://192.168.50.43:3306/flinkdemo', 'connector.table' = 'category_info', 'connector.driver' = 'com.mysql.jdbc.Driver', 'connector.username' = 'root', 'connector.password' = '123456', 'connector.lookup.cache.max-rows' = '5000', 'connector.lookup.cache.ttl' = '10min' ); ``` 7. 嘗試聯表查詢: ```sql SELECT U.user_id, U.item_id, U.behavior, C.parent_id, C.category_id FROM user_behavior AS U LEFT JOIN category_info FOR SYSTEM_TIME AS OF U.proctime AS C ON U.category_id = C.category_id; ``` 8. 如下圖,聯表查詢成功,每條記錄都能對應大類: ![在這裡插入圖片描述](https://img2020.cnblogs.com/other/485422/202011/485422-20201117104449302-949379370.png) 9. 再試試聯表統計,每個大類的總瀏覽量: ```sql SELECT C.parent_id, COUNT(*) AS pv_count FROM user_behavior AS U LEFT JOIN category_info FOR SYSTEM_TIME AS OF U.proctime AS C ON U.category_id = C.category_id WHERE behavior = 'pv' GROUP BY C.parent_id; ``` 10. 如下圖,資料是動態更新的: ![在這裡插入圖片描述](https://img2020.cnblogs.com/other/485422/202011/485422-20201117104449788-1130002892.png) 11. 執行以下語句,可以在統計時將大類ID轉成中文名: ```sql SELECT CASE C.parent_id WHEN 1 THEN '服飾鞋包' WHEN 2 THEN '家裝家飾' WHEN 3 THEN '家電' WHEN 4 THEN '美妝' WHEN 5 THEN '母嬰' WHEN 6 THEN '3C數碼' ELSE '其他' END AS category_name, COUNT(*) AS pv_count FROM user_behavior AS U LEFT JOIN category_info FOR SYSTEM_TIME AS OF U.proctime AS C ON U.category_id = C.category_id WHERE behavior = 'pv' GROUP BY C.parent_id; ``` 12. 效果如下圖: ![在這裡插入圖片描述](https://img2020.cnblogs.com/other/485422/202011/485422-20201117104450081-536440859.png) 至此,我們藉助Flink SQL Client體驗了Flink SQL豐富的功能,如果您也在學習Flink SQL,希望本文能給您一些參考; ### 歡迎關注公眾號:程式設計師欣宸 > 微信搜尋「程式設計師欣宸」,我是欣宸,期待與您一同暢遊Java世界... [https://github.com/zq2599/blog_demos](https://github.com/zq2599/blo