Flink實戰(111):flink-sql使用(十九)Flink 與 hive 結合使用(八)Hive Streaming 實戰解析
Flink 1.11 正式釋出已經三週了,其中最吸引我的特性就是 Hive Streaming。正巧 Zeppelin-0.9-preview2 也在前不久釋出了,所以就寫了一篇 Zeppelin 上的 Flink Hive Streaming 的實戰解析。本文主要從以下幾部分跟大家分享:
- Hive Streaming 的意義
- Checkpoint & Dependency
- 寫入 Kafka
- Hive Streaming Sink
- Hive Streaming Source
- Hive Temporal Table
1 Hive Streaming 的意義
很多同學可能會好奇,為什麼 Flink 1.11 中,Hive Streaming 的地位這麼高?它的出現,到底能給我們帶來什麼? 其實在大資料領域,一直存在兩種架構 Lambda
-
Lambda 架構——流批分離,靜態資料通過定時排程同步到 Hive 數倉,實時資料既會同步到 Hive,也會被實時計算引擎消費,這裡就引出了一點問題。
-
資料口徑問題
-
離線計算產出延時太大
-
資料冗餘儲存
-
Kappa架構——全部使用實時計算來產出資料,歷史資料通過回溯訊息的消費位點計算,同樣也有很多的問題,畢竟沒有一勞永逸的架構。
-
訊息中介軟體無法保留全部歷史資料,同樣資料都是行式儲存,佔用空間太大
-
實時計算計算曆史資料力不從心
-
無法進行 Ad-Hoc 的分析
為了解決這些問題,行業內推出了實時數倉,解決了大部分痛點,但是還是有些地方力不從心。比如涉及到歷史資料的計算怎麼辦?我想做 Ad-Hoc 的分析又怎麼玩?所以行業內現在都是實時數倉與離線數倉並行存在,而這又帶來了更多的問題:模型需要多份、資料產出不一致、歷史資料的計算等等 。
而 Hive Streaming 的出現就可以解決這些問題!再也不用多套模型了;也不需要同一個指標因為涉及到歷史資料,寫一遍實時 SQL 再寫一遍離線 SQL;Ad-Hoc 也能做了,怎麼做?讀 Hive Streaming 產出的表就行!
接下來,讓我們從引數配置開始,接著流式的寫入 Hive,再到流式的讀取 Hive 表,最後再 Join 上 Hive 維表吧。這一整套流程都體驗後,想必大家對 Hive Streaming 一定會有更深入的瞭解,更能夠體會到它的作用。
2 Checkpoint & Dependency
因為只有在完成 Checkpoint 之後,檔案才會從 In-progress 狀態變成 Finish 狀態
%flink.conf # checkpoint 配置 pipeline.time-characteristic EventTime execution.checkpointing.interval 120000 execution.checkpointing.min-pause 60000 execution.checkpointing.timeout 60000 execution.checkpointing.externalized-checkpoint-retention RETAIN_ON_CANCELLATION # 依賴jar包配置 flink.execution.packages org.apache.flink:flink-connector-kafka_2.11:1.11.0,org.apache.flink:flink-connector-kafka-base_2.11:1.11.0
又因為我們需要從 Kafka 中讀取資料,所以將 Kafka 的依賴也加入進去了。
3 寫入Kafka
我們的資料來自於天池資料集,是以 CSV 的格式存在於本地磁碟,所以需要先將他們寫入 Kafka。
先建一下 CSV Source 和 Kafka Sink 的表:
%flink.ssql SET table.sql-dialect=default; DROP TABLE IF EXISTS source_csv; CREATE TABLE source_csv ( user_id string, theme_id string, item_id string, leaf_cate_id string, cate_level1_id string, clk_cnt int, reach_time string ) WITH ( 'connector' = 'filesystem', 'path' = 'file:///Users/dijie/Downloads/Cloud_Theme_Click/theme_click_log.csv', 'format' = 'csv' )
%flink.ssql SET table.sql-dialect=default; DROP TABLE IF EXISTS kafka_table; CREATE TABLE kafka_table ( user_id string, theme_id string, item_id string, leaf_cate_id string, cate_level1_id string, clk_cnt int, reach_time string, ts AS localtimestamp, WATERMARK FOR ts AS ts - INTERVAL '5' SECOND ) WITH ( 'connector' = 'kafka', 'topic' = 'theme_click_log', 'properties.bootstrap.servers' = '10.70.98.1:9092', 'properties.group.id' = 'testGroup', 'format' = 'json', 'scan.startup.mode' = 'latest-offset' )
因為註冊的表即可以讀又可以寫,於是我在建表時將 Watermark 加上了;又因為源資料中的時間戳已經很老了,所以我這裡採用當前時間減去5秒作為我的 Watermark。
大家可以看到,我在語句一開始指定了 SQL 方言為 Default,這是為啥呢?還有別的方言嗎?別急,聽我慢慢說。
其實在之前的版本,Flink 就已經可以和 Hive 打通,包括可以把表建在 Hive 上,但是很多語法和 Hive 不相容,包括建的表在 Hive 中也無法檢視,主要原因就是方言不相容。所以,在 Flink 1.11 中,為了減少學習成本(語法不相容),可以用 DDL 建 Hive 表並在 Hive 中查詢,Flink 支援了方言,預設的就是 Default 了,就和之前一樣,如果想建 Hive 表,並支援查詢,請使用 Hive 方言,具體可以參考下方連結。
Hive 方言:
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/hive_catalog.html
再把資料從 CSV 中讀取後寫入 Kafka。
%flink.ssql(type=update) insert into kafka_table select * from source_csv ;
再瞄一眼 Kafka,看看資料有沒有被灌進去:
看來沒問題,那麼接下來讓我們寫入 Hive。
4 Hive Streaming Sink
建一個Hive Sink Table,記得將方言切換到 Hive,否則會有問題。
%flink.ssql SET table.sql-dialect=hive; DROP TABLE IF EXISTS hive_table; CREATE TABLE hive_table ( user_id string, theme_id string, item_id string, leaf_cate_id string, cate_level1_id string, clk_cnt int, reach_time string ) PARTITIONED BY (dt string, hr string, mi string) STORED AS parquet TBLPROPERTIES ( 'partition.time-extractor.timestamp-pattern'='$dt $hr:$mi:00', 'sink.partition-commit.trigger'='partition-time', 'sink.partition-commit.delay'='1 min', 'sink.partition-commit.policy.kind'='metastore,success-file' );
引數給大家稍微解釋一下:
- partition.time-extractor.timestamp-pattern:分割槽時間抽取器,與 DDL 中的分割槽欄位保持一致;
- sink.partition-commit.trigger:分割槽觸發器型別,可選 process-time 或partition-time。process-time:不需要上面的引數,也不需要水印,噹噹前時間大於分割槽建立時間 +sink.partition-commit.delay 中定義的時間,提交分割槽;partition-time:需要 Source 表中定義 watermark,當 watermark > 提取到的分割槽時間 +sink.partition-commit.delay 中定義的時間,提交分割槽;
- sink.partition-commit.delay:相當於延時時間;
- sink.partition-commit.policy.kind:怎麼提交,一般提交成功之後,需要通知 metastore,這樣 Hive 才能讀到你最新分割槽的資料;如果需要合併小檔案,也可以自定義 Class,通過實現 PartitionCommitPolicy 介面。
%flink.ssql insert into hive_table
select
user_id,theme_id,item_id,leaf_cate_id,cate_level1_id,
clk_cnt,reach_time,DATE_FORMAT(ts, 'yyyy-MM-dd'), DATE_FORMAT(ts, 'HH') ,DATE_FORMAT(ts, 'mm') from kafka_table
讓程式再跑一會兒~我們先去倒一杯 95 年的 Java☕️ 。
然後再看看我們的 HDFS,看看路徑下的東西。
大家也可以用 Hive 自行查詢看看,我呢就先賣個關子,一會兒用 Hive Streaming 來讀資料。
5 Hive Streaming Source
因為 Hive 表上面已經建立過了,所以這邊讀資料的時候直接拿來用就行了,不同的地方是需要使用 Table Hints 去覆蓋引數。
Hive Streaming Source 最大的不足是,無法讀取已經讀取過的分割槽下新增的檔案。簡單來說就是,讀過的分割槽,就不會再讀了。看似很坑,不過仔細想想,這樣才符合流的特性。
照舊給大家說一下引數的意思:
- stream-source.enable:顯而易見,表示是否開啟流模式。
- stream-source.monitor-interval:監控新檔案/分割槽產生的間隔。
- stream-source.consume-order:可以選 create-time 或者 partition-time;create-time 指的不是分割槽建立時間,而是在 HDFS 中檔案/資料夾的建立時間;partition-time 指的是分割槽的時間;對於非分割槽表,只能用 create-time。官網這邊的介紹寫的有點模糊,會讓人誤以為可以查到已經讀過的分割槽下新增的檔案,其實經過我的測試和翻看原始碼發現並不能。
- stream-source.consume-start-offset:表示從哪個分割槽開始讀。
光說不幹假把式,讓我們撈一把資料看看~
SET 那一行得帶著,不然無法使用 Table Hints。
6 Hive Temporal Table
看完了 Streaming Source 和 Streaming Sink,讓我們最後再試一下 Hive 作為維表吧。
其實用 Hive 維表很簡單,只要是在 Hive 中存在的表,都可以當做維表使用,引數完全可以用 Table Hints 來覆蓋。
- lookup.join.cache.ttl:表示快取時間;這裡值得注意的是,因為 Hive 維表會把維表所有資料快取在 TM 的記憶體中,如果維表量很大,那麼很容易就 OOM;如果 ttl 時間太短,那麼會頻繁的載入資料,效能會有很大影響。
因為是 LEFT JOIN,所以維表中不存在的資料會以 NULL 補全。再看一眼 DAG 圖:
大家看一下畫框的地方,能看到這邊是使用的維表關聯 LookupJoin。
如果大家 SQL 語句寫錯了,丟了 for system_time as of a.p,那麼 DAG 圖就會變成這樣:
這種就不是維表 JOIN 其實更像是流和批在 JOIN。
7 寫在最後
Hive Streaming 的完善意味著打通了流批一體的最後一道壁壘,既可以做到歷史資料的 OLAP 分析,又可以實時吐出結果,這無疑是 ETL 開發者的福音,想必接下來的日子,會有更多的企業完成他們實時數倉的建設。
作者:Flink中文社群
連結:https://www.jianshu.com/p/31bdacd841ae
來源:簡書
著作權歸作者所有。商業轉載請聯絡作者獲得授權,非商業轉載請註明出處。