1. 程式人生 > 實用技巧 >Flink實戰(111):flink-sql使用(十九)Flink 與 hive 結合使用(八)Hive Streaming 實戰解析

Flink實戰(111):flink-sql使用(十九)Flink 與 hive 結合使用(八)Hive Streaming 實戰解析

Flink 1.11 正式釋出已經三週了,其中最吸引我的特性就是 Hive Streaming。正巧 Zeppelin-0.9-preview2 也在前不久釋出了,所以就寫了一篇 Zeppelin 上的 Flink Hive Streaming 的實戰解析。本文主要從以下幾部分跟大家分享:

  • Hive Streaming 的意義
  • Checkpoint & Dependency
  • 寫入 Kafka
  • Hive Streaming Sink
  • Hive Streaming Source
  • Hive Temporal Table

1 Hive Streaming 的意義

很多同學可能會好奇,為什麼 Flink 1.11 中,Hive Streaming 的地位這麼高?它的出現,到底能給我們帶來什麼? 其實在大資料領域,一直存在兩種架構 Lambda

Kappa:

  • Lambda 架構——流批分離,靜態資料通過定時排程同步到 Hive 數倉,實時資料既會同步到 Hive,也會被實時計算引擎消費,這裡就引出了一點問題。

  • 資料口徑問題

  • 離線計算產出延時太大

  • 資料冗餘儲存

  • Kappa架構——全部使用實時計算來產出資料,歷史資料通過回溯訊息的消費位點計算,同樣也有很多的問題,畢竟沒有一勞永逸的架構。

  • 訊息中介軟體無法保留全部歷史資料,同樣資料都是行式儲存,佔用空間太大

  • 實時計算計算曆史資料力不從心

  • 無法進行 Ad-Hoc 的分析

為了解決這些問題,行業內推出了實時數倉,解決了大部分痛點,但是還是有些地方力不從心。比如涉及到歷史資料的計算怎麼辦?我想做 Ad-Hoc 的分析又怎麼玩?所以行業內現在都是實時數倉與離線數倉並行存在,而這又帶來了更多的問題:模型需要多份、資料產出不一致、歷史資料的計算等等 。

而 Hive Streaming 的出現就可以解決這些問題!再也不用多套模型了;也不需要同一個指標因為涉及到歷史資料,寫一遍實時 SQL 再寫一遍離線 SQL;Ad-Hoc 也能做了,怎麼做?讀 Hive Streaming 產出的表就行!

接下來,讓我們從引數配置開始,接著流式的寫入 Hive,再到流式的讀取 Hive 表,最後再 Join 上 Hive 維表吧。這一整套流程都體驗後,想必大家對 Hive Streaming 一定會有更深入的瞭解,更能夠體會到它的作用。

2 Checkpoint & Dependency

因為只有在完成 Checkpoint 之後,檔案才會從 In-progress 狀態變成 Finish 狀態

,所以,我們需要合理的去配置 Checkpoint,在 Zeppelin 中配置 Checkpoint 很簡單。

%flink.conf

# checkpoint 配置

pipeline.time-characteristic EventTime
execution.checkpointing.interval 120000
execution.checkpointing.min-pause 60000
execution.checkpointing.timeout 60000
execution.checkpointing.externalized-checkpoint-retention RETAIN_ON_CANCELLATION

# 依賴jar包配置

flink.execution.packages org.apache.flink:flink-connector-kafka_2.11:1.11.0,org.apache.flink:flink-connector-kafka-base_2.11:1.11.0

又因為我們需要從 Kafka 中讀取資料,所以將 Kafka 的依賴也加入進去了。

3 寫入Kafka

我們的資料來自於天池資料集,是以 CSV 的格式存在於本地磁碟,所以需要先將他們寫入 Kafka。

先建一下 CSV Source 和 Kafka Sink 的表:

%flink.ssql
SET table.sql-dialect=default;
DROP TABLE IF EXISTS source_csv;
CREATE TABLE source_csv (
user_id string,
theme_id string,
item_id string,
leaf_cate_id string,
cate_level1_id string,
clk_cnt int,
reach_time string
) WITH (
 'connector' = 'filesystem',
 'path' = 'file:///Users/dijie/Downloads/Cloud_Theme_Click/theme_click_log.csv',
 'format' = 'csv'
 
 )
%flink.ssql
SET table.sql-dialect=default;
DROP TABLE IF EXISTS kafka_table;
CREATE TABLE kafka_table (
user_id string,
theme_id string,
item_id string,
leaf_cate_id string,
cate_level1_id string,
clk_cnt int,
reach_time string,
ts AS localtimestamp,
WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'theme_click_log',
'properties.bootstrap.servers' = '10.70.98.1:9092',
'properties.group.id' = 'testGroup',
'format' = 'json',
'scan.startup.mode' = 'latest-offset'

)

因為註冊的表即可以讀又可以寫,於是我在建表時將 Watermark 加上了;又因為源資料中的時間戳已經很老了,所以我這裡採用當前時間減去5秒作為我的 Watermark。

大家可以看到,我在語句一開始指定了 SQL 方言為 Default,這是為啥呢?還有別的方言嗎?別急,聽我慢慢說。

其實在之前的版本,Flink 就已經可以和 Hive 打通,包括可以把表建在 Hive 上,但是很多語法和 Hive 不相容,包括建的表在 Hive 中也無法檢視,主要原因就是方言不相容。所以,在 Flink 1.11 中,為了減少學習成本(語法不相容),可以用 DDL 建 Hive 表並在 Hive 中查詢,Flink 支援了方言,預設的就是 Default 了,就和之前一樣,如果想建 Hive 表,並支援查詢,請使用 Hive 方言,具體可以參考下方連結。

Hive 方言:
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/hive_catalog.html

再把資料從 CSV 中讀取後寫入 Kafka。

%flink.ssql(type=update)

insert into kafka_table select * from source_csv ;

再瞄一眼 Kafka,看看資料有沒有被灌進去:

看來沒問題,那麼接下來讓我們寫入 Hive。

4 Hive Streaming Sink

建一個Hive Sink Table,記得將方言切換到 Hive,否則會有問題。

%flink.ssql
SET table.sql-dialect=hive;
DROP TABLE IF EXISTS hive_table;
CREATE TABLE hive_table (
user_id string,
theme_id string,
item_id string,
leaf_cate_id string,
cate_level1_id string,
clk_cnt int,
reach_time string
) PARTITIONED BY (dt string, hr string, mi string) STORED AS parquet TBLPROPERTIES (

 'partition.time-extractor.timestamp-pattern'='$dt $hr:$mi:00',
 'sink.partition-commit.trigger'='partition-time',
 'sink.partition-commit.delay'='1 min',
 'sink.partition-commit.policy.kind'='metastore,success-file'

);

引數給大家稍微解釋一下:

  • partition.time-extractor.timestamp-pattern:分割槽時間抽取器,與 DDL 中的分割槽欄位保持一致;
  • sink.partition-commit.trigger:分割槽觸發器型別,可選 process-time 或partition-time。process-time:不需要上面的引數,也不需要水印,噹噹前時間大於分割槽建立時間 +sink.partition-commit.delay 中定義的時間,提交分割槽;partition-time:需要 Source 表中定義 watermark,當 watermark > 提取到的分割槽時間 +sink.partition-commit.delay 中定義的時間,提交分割槽;
  • sink.partition-commit.delay:相當於延時時間;
  • sink.partition-commit.policy.kind:怎麼提交,一般提交成功之後,需要通知 metastore,這樣 Hive 才能讀到你最新分割槽的資料;如果需要合併小檔案,也可以自定義 Class,通過實現 PartitionCommitPolicy 介面。
接下來讓我們把資料插入剛剛建立的 Hive Table:
%flink.ssql

insert into hive_table 
select
user_id,theme_id,item_id,leaf_cate_id,cate_level1_id,
clk_cnt,reach_time,DATE_FORMAT(ts, 'yyyy-MM-dd'), DATE_FORMAT(ts, 'HH') ,DATE_FORMAT(ts, 'mm') from kafka_table

讓程式再跑一會兒~我們先去倒一杯 95 年的 Java☕️ 。

然後再看看我們的 HDFS,看看路徑下的東西。

大家也可以用 Hive 自行查詢看看,我呢就先賣個關子,一會兒用 Hive Streaming 來讀資料。

5 Hive Streaming Source

因為 Hive 表上面已經建立過了,所以這邊讀資料的時候直接拿來用就行了,不同的地方是需要使用 Table Hints 去覆蓋引數。

Hive Streaming Source 最大的不足是,無法讀取已經讀取過的分割槽下新增的檔案。簡單來說就是,讀過的分割槽,就不會再讀了。看似很坑,不過仔細想想,這樣才符合流的特性。

照舊給大家說一下引數的意思:

  • stream-source.enable:顯而易見,表示是否開啟流模式。
  • stream-source.monitor-interval:監控新檔案/分割槽產生的間隔。
  • stream-source.consume-order:可以選 create-time 或者 partition-time;create-time 指的不是分割槽建立時間,而是在 HDFS 中檔案/資料夾的建立時間;partition-time 指的是分割槽的時間;對於非分割槽表,只能用 create-time。官網這邊的介紹寫的有點模糊,會讓人誤以為可以查到已經讀過的分割槽下新增的檔案,其實經過我的測試和翻看原始碼發現並不能。
  • stream-source.consume-start-offset:表示從哪個分割槽開始讀。

光說不幹假把式,讓我們撈一把資料看看~

SET 那一行得帶著,不然無法使用 Table Hints。

6 Hive Temporal Table

看完了 Streaming Source 和 Streaming Sink,讓我們最後再試一下 Hive 作為維表吧。

其實用 Hive 維表很簡單,只要是在 Hive 中存在的表,都可以當做維表使用,引數完全可以用 Table Hints 來覆蓋。

  • lookup.join.cache.ttl:表示快取時間;這裡值得注意的是,因為 Hive 維表會把維表所有資料快取在 TM 的記憶體中,如果維表量很大,那麼很容易就 OOM;如果 ttl 時間太短,那麼會頻繁的載入資料,效能會有很大影響。

因為是 LEFT JOIN,所以維表中不存在的資料會以 NULL 補全。再看一眼 DAG 圖:

大家看一下畫框的地方,能看到這邊是使用的維表關聯 LookupJoin。

如果大家 SQL 語句寫錯了,丟了 for system_time as of a.p,那麼 DAG 圖就會變成這樣:

這種就不是維表 JOIN 其實更像是流和批在 JOIN。

7 寫在最後

Hive Streaming 的完善意味著打通了流批一體的最後一道壁壘,既可以做到歷史資料的 OLAP 分析,又可以實時吐出結果,這無疑是 ETL 開發者的福音,想必接下來的日子,會有更多的企業完成他們實時數倉的建設。


作者:Flink中文社群
連結:https://www.jianshu.com/p/31bdacd841ae
來源:簡書
著作權歸作者所有。商業轉載請聯絡作者獲得授權,非商業轉載請註明出處。