Flink系列之1.10版流式SQL應用
隨著Flink 1.10的釋出,對SQL的支援也非常強大。Flink 還提供了 MySql, Hive,ES, Kafka等聯結器Connector,所以使用起來非常方便。
接下來咱們針對構建流式SQL應用文章的梗概如下:
1. 搭建流式SQL應用所需要的環境準備。
2. 構建一個按每小時進行統計購買量的應用。
3. 構建每天以10分鐘的粒度進行統計應用。
4. 構建按分類進行排行,取出想要的結果應用。
1. 搭建流式應用所需要的環境準備
Kafka,用於將資料寫入到Kafka中,然後Flink通過讀取Kafka的資料然後再進行處理。版本號:2.11。
MySQL, 用於儲存資料的分類。Flink從中讀取分類進行處理和計算 。版本號:8.0.15。
ElasticSearch, 用於儲存結果資料和進行索引儲存。下載的時候可以在搜尋引擎裡邊搜尋“elasticsearch 國內”,這樣就可以從國內快速下載,要不然下載的太慢了。版本號:7.6.0。
Kibana, 用於ES的結果展示,圖形化的介面美觀。 下載的時候也需要搜尋“Kibana 國內”,比較快速。版本號:7.6.0。
Flink, 核心的流處理程式,版本號:1.10。Flink支援國內映象下載,這個到時候可以自行找一下。
Zookeeper, Kafka依賴這個應用,所以也會用到的,這個什麼版本都是可以的。我的版本號:3.4.12。
當然我的是mac電腦,如果是mac電腦的話,下載ES和Kibana的時候要下載檔案中帶“darwin”字樣的,可以在Mac中使用其他的不能執行。應該是程式裡邊的編譯不同,這個也是一個小坑。
因為Flink需要連線Mysql, Elasticseratch , Kafka,所以也需要提前下載Flink所需要的Connector jar包到Flink的lib裡邊。
wget -P ./lib/ https://repo1.maven.org/maven2/org/apache/flink/flink-json/1.10.0/flink-json-1.10.0.jar | \ wget -P ./lib/ https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/1.10.0/flink-sql-connector-kafka_2.11-1.10.0.jar | \ wget -P ./lib/ https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch6_2.11/1.10.0/flink-sql-connector-elasticsearch6_2.11-1.10.0.jar | \ wget -P ./lib/ https://repo1.maven.org/maven2/org/apache/flink/flink-jdbc_2.11/1.10.0/flink-jdbc_2.11-1.10.0.jar | \ wget -P ./lib/ https://repo1.maven.org/maven2/mysql/mysql-connector-java/5.1.48/mysql-connector-java-5.1.48.jar
環境都準備好了,那麼需要把環境都啟動起來,進行檢查。
Elasticsearch啟動好了之後需要訪問這個網址沒有問題,說明成功了:http://localhost:9200/_cluster/health?pretty。
Flink啟動好之後需要訪問 http://localhost:8081 會有介面展示。
Kibana 啟動好了之後訪問:http://127.0.0.1:5601/ 會有介面展示。當然Kibana在目錄conf/kibana.yml裡邊需要把ES的地址給開啟。
Zookeeper 這個相信很多同學都會配置了,如果有不會配置的,可以自己搜尋一下。
我們先看一下最後的效果圖,可能不是特別好,是這麼個意思。
2. 構建一個按每個小時統計購買量應用。
我們寫一個程式,往Kafka裡邊寫資料,模擬一些連續的資料來源頭。
首先定義一個Pojo類。
package myflink.pojo; public class UserBehavior { //使用者ID public long userId; //商品ID public long itemId; //商品類目ID public int categoryId; //使用者行為,包括{"pv","buy","cart", "fav"} public String behavior; //行為發生的時間戳,單位秒 public String ts; public long getUserId() { return userId; } public void setUserId(long userId) { this.userId = userId; } public long getItemId() { return itemId; } public void setItemId(long itemId) { this.itemId = itemId; } public int getCategoryId() { return categoryId; } public void setCategoryId(int categoryId) { this.categoryId = categoryId; } public String getBehavior() { return behavior; } public void setBehavior(String behavior) { this.behavior = behavior; } public String getTimestamp() { return ts; } public void setTimestamp(String ts) { this.ts = ts; } }
接著寫一個往Kafka寫資料的類。隨機生成用於的行為,裡邊包括使用者的id,類目id等。讓程式執行起來。
package myflink.kafka; import com.alibaba.fastjson.JSON; import myflink.pojo.UserBehavior; import org.apache.commons.lang3.RandomUtils; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import java.text.SimpleDateFormat; import java.util.Date; import java.util.Properties; import java.util.concurrent.TimeUnit; /** * @author huangqingshi * @Date 2020-03-15 */ public class KafkaWriter { //本地的kafka機器列表 public static final String BROKER_LIST = "localhost:9092"; //kafka的topic public static final String TOPIC_USER_BEHAVIOR = "user_behaviors"; //key序列化的方式,採用字串的形式 public static final String KEY_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer"; //value的序列化的方式 public static final String VALUE_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer"; private static final String[] BEHAVIORS = {"pv","buy","cart", "fav"}; private static KafkaProducer<String, String> producer; public static void writeToKafka() throws Exception{ //構建userBehavior, 資料都是隨機產生的 int randomInt = RandomUtils.nextInt(0, 4); UserBehavior userBehavior = new UserBehavior(); userBehavior.setBehavior(BEHAVIORS[randomInt]); Long ranUserId = RandomUtils.nextLong(1, 10000); userBehavior.setUserId(ranUserId); int ranCate = RandomUtils.nextInt(1, 100); userBehavior.setCategoryId(ranCate); Long ranItemId = RandomUtils.nextLong(1, 100000); userBehavior.setItemId(ranItemId); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'"); userBehavior.setTimestamp(sdf.format(new Date())); //轉換為json String userBehaviorStr = JSON.toJSONString(userBehavior); //包裝成kafka傳送的記錄 ProducerRecord<String, String> record = new ProducerRecord<String, String>(TOPIC_USER_BEHAVIOR, null, null, userBehaviorStr); //傳送到快取 producer.send(record); System.out.println("向kafka傳送資料:" + userBehaviorStr); //立即傳送 producer.flush(); } public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", BROKER_LIST); props.put("key.serializer", KEY_SERIALIZER); props.put("value.serializer", VALUE_SERIALIZER); producer = new KafkaProducer<>(props); while(true) { try { //每一秒寫一條資料 TimeUnit.SECONDS.sleep(1); writeToKafka(); } catch (Exception e) { e.printStackTrace(); } } } }
本地idea Console 輸出的結果是這樣的:
向kafka傳送資料:{"behavior":"buy","categoryId":7,"itemId":75902,"timestamp":"2020-03-15T11:35:11Z","ts":"2020-03-15T11:35:11Z","userId":4737}
我們將Flink的任務數調整成10個,也就是同時執行的任務數。 位置在 conf/flink-conf.yaml,taskmanager.numberOfTaskSlots: 10,然後重啟下。我的已經啟動並且運行了3個任務,看下圖:
我們接下來執行Flink 內建的客戶端。命令: bin/sql-client.sh embedded,這樣我們就開始了Flink SQL之旅了。我們使用Flink的DDL,從Kafka裡邊讀取資料,採用ProcessingTime的時間事件進行處理,為ts設定水位線,允許5秒延遲。更多參考 時間屬性 和 Flink DDL。裡邊的Kafka 連線以及相關的配置,相信大家都不是很陌生。
CREATE TABLE user_behavior ( userId BIGINT, itemId BIGINT, categoryId BIGINT, behavior STRING, ts TIMESTAMP(3), proctime as PROCTIME(), -- 通過計算列產生一個處理時間列 WATERMARK FOR ts as ts - INTERVAL '5' SECOND -- 在ts上定義watermark,ts成為事件時間列 ) WITH ( 'connector.type' = 'kafka', -- 使用 kafka connector 'connector.version' = 'universal', -- kafka 版本,universal 支援 0.11 以上的版本 'connector.topic' = 'user_behaviors', -- kafka topic 'connector.startup-mode' = 'earliest-offset', -- 從起始 offset 開始讀取 'connector.properties.zookeeper.connect' = 'localhost:2181', -- zookeeper 地址 'connector.properties.bootstrap.servers' = 'localhost:9092', -- kafka broker 地址 'format.type' = 'json' -- 資料來源格式為 json );
接下來我們使用select來看一下Flink的資料,執行語句:select * from user_behavior,會出現如下圖。同時SQL上面還支援 show tables、describe user_behavior 等操作。
我們需要將結果放入Elasticsearch,這樣也比較簡單,我們還通過DDL來建立一個表。我們只需要一個語句,就可以實現連線Elasticsearch(後邊簡稱ES)並且建立相應的Type和Index了。不需要自己再去建立一次,是不是很簡單,哈。裡邊有兩個欄位,一個是每天的小時數,一個是購買的統計量。當有資料寫入這個表的時候,那麼就會將資料寫入到ES上,非常方便。
CREATE TABLE buy_cnt_per_hour ( hour_of_day BIGINT, buy_cnt BIGINT ) WITH ( 'connector.type' = 'elasticsearch', -- 使用 elasticsearch connector 'connector.version' = '6', -- elasticsearch 版本,6 能支援 es 6+ 以及 7+ 的版本 'connector.hosts' = 'http://localhost:9200', -- elasticsearch 地址 'connector.index' = 'buy_cnt_per_hour', -- elasticsearch 索引名,相當於資料庫的表名 'connector.document-type' = 'user_behavior', -- elasticsearch 的 type,相當於資料庫的庫名 'connector.bulk-flush.max-actions' = '1', -- 每條資料都重新整理 'format.type' = 'json', -- 輸出資料格式 json 'update-mode' = 'append' );
每個小時的購買量,那麼我們需要的是使用滾動視窗,Tumbling Window,那麼使用TUMBLE_START函式,另外我們還需要獲取ts中的小時數,那麼需要使用HOUR函式。將所有behavior為buy的寫入到這個表中。
INSERT INTO buy_cnt_per_hour SELECT HOUR(TUMBLE_START(ts, INTERVAL '1' HOUR)), COUNT(*) FROM user_behavior WHERE behavior = 'buy' GROUP BY TUMBLE(ts, INTERVAL '1' HOUR);
這個時候看Flink裡邊的任務中會出現這個任務,因為是持續不斷的進行處理的。執行過程中如果有資料的話,那麼會將資料寫到表 buy_cnt_per_hour,同時也會將資料寫到ES裡邊。
下面我們來配置一下Kinbana來將結果進行展示,訪問 http://localhost:5601, 然後選擇左邊選單的“Management”,然後選擇 “Index Patterns” -> “Create Index Pattern”, 輸入我們剛才建立的Index: “buy_cnt_per_hour”。可以通過左側的“Discover”按鈕就可以看到我們的資料了。
我們繼續點選左側的“Dashboard”按鈕,建立一個“使用者行為日誌分析”的Dashboard。 進入左側的 “Visualize” - “Create Visualization" 選擇“Area”圖,Bucket的按我下邊截圖左下進行配置和選擇。
儲存後新增到Dashboard即可。這樣就從資料來源頭到資料展示就構建完成了,是不是很快~
3. 構建每天以10分鐘的粒度進行統計獨立使用者數應用。
我們繼續使用DDL建立Flink的表以及對應的ES的Index。
CREATE TABLE cumulative_uv ( time_str STRING, uv BIGINT ) WITH ( 'connector.type' = 'elasticsearch', 'connector.version' = '6', 'connector.hosts' = 'http://localhost:9200', 'connector.index' = 'cumulative_uv', 'connector.document-type' = 'user_behavior', 'format.type' = 'json', 'update-mode' = 'upsert' );
建立好了需要將ts進行分解出來小時和分鐘,通過一個檢視,這個檢視和資料庫的檢視類似,不儲存資料,也不佔用Flink的執行Task。首先將ts格式化,然後轉換成時間:小時:分鐘,分鐘後邊沒有0,結尾需要補個0。然後統計不同的使用者數需要使用DISTINCT函式和COUNT函式。還有使用Over Window功能,也就是從之前的資料到現在,以處理時間升序把資料按Window的功能來進行統計。直白的將就是有一條資料的話就會將資料處理, 然後有一條資料比當前最大值大的話會保留最大值。當前視窗是以每10分鐘為一個視窗。
CREATE VIEW uv_per_10min AS SELECT MAX(SUBSTR(DATE_FORMAT(ts, 'HH:mm'),1,4) || '0') OVER w AS time_str, COUNT(DISTINCT userId) OVER w AS uv FROM user_behavior WINDOW w AS (ORDER BY proctime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW);
這個檢視主要是資料比較多,只需要每10分鐘一個點其實就滿足要求了,那麼現在我們需要做的就是再將資料處理一下即可寫入ES。
INSERT INTO cumulative_uv SELECT time_str, MAX(uv) FROM uv_per_10min GROUP BY time_str;
這樣ES裡邊就會有新的index產生,下一步我們在kibana裡邊建立一個 index pattern, 輸入index “cumulative_uv”,接下來到 “Visualize”裡邊建立一個 Visualization ,名為“累計獨立使用者數”,表選擇“Line”型別的圖示,其他指標和我下圖配置的一樣即可。
累計獨立使用者數也建立好了。
4. 構建按分類進行排行,取出想要的結果應用。
接下來我們需要按主類目進行統計和排序。因為子類目非常多。
首先我們需要準備一個mysql, 然後建立好表。簡單些幾條對應的類目關係,當然可以根據自己所生成的資料進行自行寫入一些對應的關係表。
create table category ( sub_category_id bigint(20), parent_category_id bigint(20) );
insert into category(sub_category_id, parent_category_id) values(1038, 1);
insert into category(sub_category_id, parent_category_id) values(91244, 1);
insert into category(sub_category_id, parent_category_id) values(44712, 1);
insert into category(sub_category_id, parent_category_id) values(2,2);
insert into category(sub_category_id, parent_category_id) values(3,3);
insert into category(sub_category_id, parent_category_id) values(4,4);
insert into category(sub_category_id, parent_category_id) values(5,5);
insert into category(sub_category_id, parent_category_id) values(6,6);
insert into category(sub_category_id, parent_category_id) values(7,7);
insert into category(sub_category_id, parent_category_id) values(8,8);
insert into category(sub_category_id, parent_category_id) values(9,9);
定義一個Flink表,資料從Mysql獲取,用於進行類目關係關聯。
CREATE TABLE category_dim ( sub_category_id BIGINT, -- 子類目 parent_category_id BIGINT -- 頂級類目 ) WITH ( 'connector.type' = 'jdbc', 'connector.url' = 'jdbc:mysql://localhost:3306/flink', 'connector.table' = 'category', 'connector.driver' = 'com.mysql.jdbc.Driver', 'connector.username' = 'root', 'connector.password' = 'root', 'connector.lookup.cache.max-rows' = '5000', 'connector.lookup.cache.ttl' = '10min' );
建立ES的index,用於儲存統計後的結果。
CREATE TABLE top_category ( category_name STRING, -- 類目名稱 buy_cnt BIGINT -- 銷量 ) WITH ( 'connector.type' = 'elasticsearch', 'connector.version' = '6', 'connector.hosts' = 'http://localhost:9200', 'connector.index' = 'top_category', 'connector.document-type' = 'user_behavior', 'format.type' = 'json', 'update-mode' = 'upsert' );
接下來還是建立一個檢視,將表和類目關聯起來,方便後邊的統計結果。使用的是 Temporal Join。
CREATE VIEW rich_user_behavior AS SELECT U.userId, U.itemId, U.behavior, CASE C.parent_category_id WHEN 1 THEN '服飾鞋包' WHEN 2 THEN '家裝家飾' WHEN 3 THEN '家電' WHEN 4 THEN '美妝' WHEN 5 THEN '母嬰' WHEN 6 THEN '3C數碼' WHEN 7 THEN '運動戶外' WHEN 8 THEN '食品' ELSE '其他' END AS category_name FROM user_behavior AS U LEFT JOIN category_dim FOR SYSTEM_TIME AS OF U.proctime AS C ON U.categoryId = C.sub_category_id;
將型別為“buy”的寫入到表,同時也就是寫入了ES裡邊,然後ES裡邊的index-top_category 也就有了資料了。
INSERT INTO top_category SELECT category_name, COUNT(*) buy_cnt FROM rich_user_behavior WHERE behavior = 'buy' GROUP BY category_name;
我們繼續在Kibana裡邊建立一個index pattern,輸入“top_category”,然後visualize裡邊建立一個visualization 名為類目排行榜。詳細的配置可參考如下。
好了整個的過程計算建立完了。
通過使用Flink 1.10以及對應的Connector, 實現了對Mysql, Kafka, Elasticsearch 的快速連線,更快的達到的我們想要實現的效果。
裡邊涉及到往kafka裡邊寫資料可參考工程:https://github.com/stonehqs/flink-demo