設定content-type
文章整理:加米穀大資料
有不少讀者反饋,參考上篇文章《Hive 終於等來了 Flink》部署 Flink 並整合 Hive 時,出現一些 bug 以及相容性等問題。雖已等來,卻未可用。所以筆者增加了這一篇文章,作為姊妹篇。
回顧
在上篇文章中,筆者使用的 CDH 版本為 5.16.2,其中 Hive 版本為 1.1.0(CDH 5.x 系列 Hive 版本都不高於 1.1.0,是不是不可理解),Flink 原始碼本身對 Hive 1.1.0 版本相容性不好,存在不少問題。為了相容目前版本,筆者基於 CDH 5.16.2 環境,對 Flink 程式碼進行了修改,重新打包並部署。
其實經過很多開源專案的實戰,比如 Apache Atlas,Apache Spark 等,Hive 1.2.x 和 Hive 1.1.x 在大部分情況下,替換一些 Jar 包,是可以解決相容性的問題。對於筆者的環境來說,可以使用 Hive 1.2.1 版本的一些 Jar 包來代替 Hive 1.1.0 版本的 Jar 包。在本篇文章的開始部分,筆者會解決這個問題,然後再補充上篇文章缺少的實戰內容。
剪不斷理還亂的問題
根據讀者的反饋,筆者將所有的問題總結為三類:
- Flink 如何連線 Hive 除了 API 外,有沒有類似 spark-sql 命令
- 識別不到 Hadoop 環境或配置檔案找不到
- 依賴包、類或方法找不到
1. Flink 如何連線 Hive
有的讀者不太清楚,如何配置 Flink 連線 Hive 的 Catalog,這裡補充一個完整的 conf/sql-client-hive.yaml 示例:
catalogs: - name: staginghive type: hive hive-conf-dir: /etc/hive/conf hive-version: 1.2.1 execution: planner: blink type: batch time-characteristic: event-time periodic-watermarks-interval: 200 result-mode: table max-table-result-rows: 1000000 parallelism: 1 max-parallelism: 128 min-idle-state-retention: 0 max-idle-state-retention: 0 current-catalog: staginghive current-database: ssb restart-strategy: type: fallback deployment: response-timeout: 5000 gateway-address: "" gateway-port: 0 m: yarn-cluster yn: 2 ys: 5 yjm: 1024 ytm: 2048
sql-client-hive.yaml 配置檔案裡面包含:
- Hive 配置檔案 catalogs 中配置了 Hive 的配置檔案路徑。
- Yarn 配置資訊 deployment 中配置了 Yarn 的配置資訊。
- 執行引擎資訊 execution 配置了 blink planner,並且使用 batch 模式。batch 模式比較穩定,適合傳統的批處理作業,而且可以容錯,另外中間資料落盤,建議開啟壓縮功能。除了 batch,Flink 也支援 streaming 模式。
■ Flink SQL CLI 工具
類似 spark-sql 命令,Flink 提供了 SQL CLI 工具,即 sql-client.sh 指令碼。在 Flink 1.10 版本中,Flink SQL CLI 改進了很多功能,筆者後面講解。
sql-client.sh 使用方式如下:
$ bin/sql-client.sh embedded -d conf/sql-client-hive.yaml
2. 識別不到 Hadoop 環境或配置檔案找不到
筆者在上篇文章中提到過,在部署 Flink 的環境上部署 CDH gateway,包括 Hadoop、Hive 客戶端,另外還需要配置一些環境變數,如下:
export HADOOP_CONF_DIR=/etc/hadoop/conf
export YARN_CONF_DIR=/etc/hadoop/conf
export HIVE_HOME=/opt/cloudera/parcels/CDH/lib/hive
export HIVE_CONF_DIR=/etc/hive/conf
3. 依賴包、類或方法找不到
先檢視一下 Flink 家目錄下的 lib 目錄:
$ tree lib
lib
├── flink-connector-hive_2.11-1.10.0.jar
├── flink-dist_2.11-1.10.0.jar
├── flink-hadoop-compatibility_2.11-1.10.0.jar
├── flink-shaded-hadoop-2-2.6.0-cdh5.16.2-9.0.jar
├── flink-table_2.11-1.10.0.jar
├── flink-table-blink_2.11-1.10.0.jar
├── hive-exec-1.1.0-cdh5.16.2.jar
├── hive-metastore-1.1.0-cdh5.16.2.jar
├── libfb303-0.9.3.jar
├── log4j-1.2.17.jar
└── slf4j-log4j12-1.7.15.jar
如果上面前兩個問題都解決後,執行如下命令:
$ bin/sql-client.sh embedded -d conf/sql-client-hive.yaml
報錯,報錯,還是報錯:
Caused by: java.lang.ClassNotFoundException: org.apache.commons.logging.LogFactory
其實在執行 sql-client.sh 指令碼前,需要指定 Hadoop 環境的依賴包的路徑,建議不要報錯一個新增一個,除非有的讀者喜歡。這裡筆者提示一個方便的方式,即設定 HADOOPCLASSPATH(可以新增到 ~/.bashprofile 中)環境變數:
export HADOOP_CLASSPATH=`hadoop classpath`
再次執行:
$ bin/sql-client.sh embedded -d conf/sql-client-hive.yaml
很抱歉,繼續報錯:
Caused by: org.apache.flink.table.client.gateway.SqlExecutionException: Could not create execution context. at org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:753) at org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:228) at org.apache.flink.table.client.SqlClient.start(SqlClient.java:98) at org.apache.flink.table.client.SqlClient.main(SqlClient.java:178) Caused by: org.apache.flink.table.catalog.exceptions.CatalogException: Failed to create Hive Metastore client
這裡就是 Hive 1.1.0 版本的 Jar 包與 Flink 出現版本不相容性的問題了,解決方法是:
- 下載 apache-hive-1.2.1 版本
- 替換 Flink lib 目錄下的 Hive Jar 包 刪除掉 hive-exec-1.1.0-cdh5.16.2.jar、 hive-metastore-1.1.0-cdh5.16.2.jar 和 libfb303-0.9.3.jar,然後新增 hive-exec-1.2.1.jar、 hive-metastore-1.2.1.jar 和 libfb303-0.9.2.jar,再次檢視 lib 目錄:
$ tree lib
lib
├── flink-connector-hive_2.11-1.10.0.jar
├── flink-dist_2.11-1.10.0.jar
├── flink-hadoop-compatibility_2.11-1.10.0.jar
├── flink-shaded-hadoop-2-2.6.0-cdh5.16.2-9.0.jar
├── flink-table_2.11-1.10.0.jar
├── flink-table-blink_2.11-1.10.0.jar
├── hive-exec-1.2.1.jar
├── hive-metastore-1.2.1.jar
├── libfb303-0.9.2.jar
├── log4j-1.2.17.jar
└── slf4j-log4j12-1.7.15.jar
最後再執行:
$ bin/sql-client.sh embedded -d conf/sql-client-hive.yaml
這時,讀者就可以看到手握栗子的可愛小松鼠了。
Flink SQL CLI 實踐
在 Flink 1.10 版本(目前為 RC1 階段) 中,Flink 社群對 SQL CLI 做了大量的改動,比如支援 View、支援更多的資料型別和 DDL 語句、支援分割槽讀寫、支援 INSERT OVERWRITE 等,實現了更多的 TableEnvironment API 的功能,更加方便使用者使用。
接下來,筆者詳細講解 Flink SQL CLI。
0. Help
執行下面命令,登入 Flink SQL 客戶端:
$ bin/sql-client.sh embedded -d conf/sql-client-hive.yamlFlink SQL>
執行 HELP,檢視 Flink SQL 支援的命令,如下為大部分常用的:
- CREATE TABLE
- DROP TABLE
- CREATE VIEW
- DESCRIBE
- DROP VIEW
- EXPLAIN
- INSERT INTO
- INSERT OVERWRITE
- SELECT
- SHOW FUNCTIONS
- USE CATALOG
- SHOW TABLES
- SHOW DATABASES
- SOURCE
- USE
- SHOW CATALOGS
1. Hive 操作
■ 1.1 建立表和匯入資料
為了方便讀者進行實驗,筆者使用 ssb-dbgen 生成測試資料,讀者也可以使用測試環境已有的資料來進行實驗。
具體如何在 Hive 中一鍵式建立表並插入資料,可以參考筆者早期的專案https://github.com/MLikeWater/ssb-kylin。
■ 1.2 Hive 表
檢視上個步驟中建立的 Hive 表:
0: jdbc:hive2://xx.xxx.xxx.xxx:10000> show tables;
+--------------+--+
| tab_name |
+--------------+--+
| customer |
| dates |
| lineorder |
| p_lineorder |
| part |
| supplier |
+--------------+--+
讀者可以對 Hive 進行各種查詢,對比後面 Flink SQL 查詢的結果。
2. Flink 操作
■ 2.1 通過 HiveCatalog 訪問 Hive 資料庫
登入 Flink SQL CLI,並查詢 catalogs:
$ bin/sql-client.sh embedded -d conf/sql-client-hive.yaml
Flink SQL> show catalogs;
default_catalog
staginghive
Flink SQL> use catalog staginghive;
通過 show catalogs 獲取配置的所有 catalog。由於筆者在 sql-client-hive.yaml 檔案中設定了預設的 catalog,即為 staginghive。如果需要切換到其他 catalog,可以使用 usecatalog xxx。
■ 2.2 查詢 Hive 元資料
通過 Flink SQL 查詢 Hive 資料庫和表:
# 查詢資料庫
Flink SQL> show databases;
...
ssb
tmp
...
Flink SQL> use ssb;
# 查詢表
Flink SQL> show tables;
customer
dates
lineorder
p_lineorder
part
supplier
# 查詢表結構
Flink SQL> DESCRIBE customer;
root
|-- c_custkey: INT
|-- c_name: STRING
|-- c_address: STRING
|-- c_city: STRING
|-- c_nation: STRING
|-- c_region: STRING
|-- c_phone: STRING
|-- c_mktsegment: STRING
這裡需要注意,Hive 的元資料在 Flink catalog 中都以小寫字母使用。
■ 2.3 查詢
接下來,在 Flink SQL CLI 中查詢一些 SQL 語句。
目前 Flink SQL 解析 Hive 檢視元資料時,會遇到一些 Bug,比如執行 Q1.1 SQL:
Flink SQL> select sum(v_revenue) as revenue
> from p_lineorder
> left join dates on lo_orderdate = d_datekey
> where d_year = 1993
> and lo_discount between 1 and 3
> and lo_quantity < 25;
[ERROR] Could not execute SQL statement. Reason:
org.apache.calcite.sql.validate.SqlValidatorException: Tabeorder' not found; did you mean 'LINEORDER'?
Flink SQL 找不到檢視中的實體表。
p_lineorder 表是 Hive 中的一張檢視,建立表的語句如下:
CREATE VIEW P_LINEORDER AS
SELECT LO_ORDERKEY,
LO_LINENUMBER,
LO_CUSTKEY,
LO_PARTKEY,
LO_SUPPKEY,
LO_ORDERDATE,
LO_ORDERPRIOTITY,
LO_SHIPPRIOTITY,
LO_QUANTITY,
LO_EXTENDEDPRICE,
LO_ORDTOTALPRICE,
LO_DISCOUNT,
LO_REVENUE,
LO_SUPPLYCOST,
LO_TAX,
LO_COMMITDATE,
LO_SHIPMODE,
LO_EXTENDEDPRICE*LO_DISCOUNT AS V_REVENUE
FROM ssb.LINEORDER;
但是對於 Hive 中檢視的定義,Flink SQL 並沒有很好地處理二手域名購買地圖元資料。為了後面 SQL 的順利執行,這裡筆者在 Hive 中刪除並重建該檢視:
0: jdbc:hive2://xx.xxx.xxx.xxx:10000> create view p_lineorder as
select lo_orderkey,
lo_linenumber,
lo_custkey,
lo_partkey,
lo_suppkey,
lo_orderdate,
lo_orderpriotity,
lo_shippriotity,
lo_quantity,
lo_extendedprice,
lo_ordtotalprice,
lo_discount,
lo_revenue,
lo_supplycost,
lo_tax,
lo_commitdate,
lo_shipmode,
lo_extendedprice*lo_discount as v_revenue
from ssb.lineorder;
然後繼續在 Flink SQL CLI 中查詢 Q1.1 SQL:
Flink SQL> select sum(v_revenue) as revenue
> from p_lineorder
> left join dates on lo_orderdate = d_datekey
> where d_year = 1993
> and lo_discount between 1 and 3
> and lo_quantity < 25;
revenue
894280292647
繼續查詢 Q2.1 SQL:
Flink SQL> select sum(lo_revenue) as lo_revenue, d_year, p_brand
> from p_lineorder
> left join dates on lo_orderdate = d_datekey
> left join part on lo_partkey = p_partkey
> left join supplier on lo_suppkey = s_suppkey
> where p_category = 'MFGR#12' and s_region = 'AMERICA'
> group by d_year, p_brand
> order by d_year, p_brand;
lo_revenue d_year p_brand
819634128 1998 MFGR#1206
877651232 1998 MFGR#1207
754489428 1998 MFGR#1208
816369488 1998 MFGR#1209
668482306 1998 MFGR#1210
660366608 1998 MFGR#1211
862902570 1998 MFGR#1212
...
最後再查詢一個 Q4.3 SQL:
Flink SQL> select d_year, s_city, p_brand, sum(lo_revenue) - sum(lo_supplycost) as profit
> from p_lineorder
> left join dates on lo_orderdate = d_datekey
> left join customer on lo_custkey = c_custkey
> left join supplier on lo_suppkey = s_suppkey
> left join part on lo_partkey = p_partkey
> where c_region = 'AMERICA'and s_nation = 'UNITED STATES'
> and (d_year = 1997 or d_year = 1998)
> and p_category = 'MFGR#14'
> group by d_year, s_city, p_brand
> order by d_year, s_city, p_brand;
d_year s_city p_brand profit
1998 UNITED ST9 MFGR#1440 6665681
如果讀者感興趣的話,可以查詢剩餘的 SQL,當然也可以和 Spark SQL 進行比較。另外 Flink SQL 也支援 EXPLAIN,查詢 SQL 的執行計劃。
■ 2.4 建立檢視
同樣,可以在 Flink SQL CLI 中建立和刪除檢視,如下:
Flink SQL> create view p_lineorder2 as
> select lo_orderkey,
> lo_linenumber,
> lo_custkey,
> lo_partkey,
> lo_suppkey,
> lo_orderdate,
> lo_orderpriotity,
> lo_shippriotity,
> lo_quantity,
> lo_extendedprice,
> lo_ordtotalprice,
> lo_discount,
> lo_revenue,
> lo_supplycost,
> lo_tax,
> lo_commitdate,
> lo_shipmode,
> lo_extendedprice * lo_discount as v_revenue
> from ssb.lineorder;
[INFO] View has been created.
這裡筆者需要特別強調的是,目前 Flink 無法刪除 Hive 中的檢視:
Flink SQL> drop view p_lineorder;
[ERROR] Could not execute SQL statement. Reason:
The given view does not exist in the current CLI session. Only views created with a CREATE VIEW statement can be accessed.
■ 2.5 分割槽操作
Hive 資料庫中建立一張分割槽表:
CREATE TABLE IF NOT EXISTS flink_partition_test (
id int,
name string
) PARTITIONED BY (day string, type string)
stored as textfile;
接著,通過 Flink SQL 插入和查詢資料:
# 插入靜態分割槽的資料
Flink SQL> INSERT INTO flink_partition_test PARTITION (type='Flink', `day`='2020-02-01') SELECT 100001, 'Flink001';
# 查詢
Flink SQL> select * from flink_partition_test;
id name day type
100001 Flink001 2020-02-01 Flink
# 插入動態分割槽
Flink SQL> INSERT INTO flink_partition_test SELECT 100002, 'Spark', '2020-02-02', 'SparkSQL';
# 查詢
Flink SQL> select * from flink_partition_test;
id name day type
100002 Spark 2020-02-02 SparkSQL
100001 FlinkSQL 2020-02-01 Flink
# 動態和靜態分割槽結合使用類似,不再演示
# 覆蓋插入資料
Flink SQL> INSERT OVERWRITE flink_partition_test PARTITION (type='Flink') SELECT 100002, 'Spark', '2020-02-08', 'SparkSQL-2.4';
id name day type
100002 Spark 2020-02-02 SparkSQL
100001 FlinkSQL 2020-02-01 Flink
欄位 day 在 Flink 屬於關鍵字,要特殊處理。
■ 2.6 其他功能
- 2.6.1 函式
Flink SQL 支援內建的函式和自定義函式。對於內建的函式,可以執行 show functions 進行檢視,這一塊筆者以後會單獨介紹如何建立自定義函式。
- 2.6.2 設定引數
Flink SQL 支援設定環境引數,可以使用 set 命令檢視和設定引數:
Flink SQL> set;
deployment.gateway-address=
deployment.gateway-port=0
deployment.m=yarn-cluster
deployment.response-timeout=5000
deployment.yjm=1024
deployment.yn=2
deployment.ys=5
deployment.ytm=2048
execution.current-catalog=staginghive
execution.current-database=ssb
execution.max-idle-state-retention=0
execution.max-parallelism=128
execution.max-table-result-rows=1000000
execution.min-idle-state-retention=0
execution.parallelism=1
execution.periodic-watermarks-interval=200
execution.planner=blink
execution.restart-strategy.type=fallback
execution.result-mode=table
execution.time-characteristic=event-time
execution.type=batch
Flink SQL> set deployment.yjm = 2048;
總結
在本文中,筆者通過 Flink SQL 比較詳細地去操作 Hive 資料庫,以及 Flink SQL 提供的一些功能。
當然,目前 Flink SQL 操作 Hive 資料庫還是存在一些問題:
- 目前只支援 TextFile 儲存格式,還無法指定其他儲存格式
只支援 Hive 資料庫中 TextFile 儲存格式的表,而且 row format serde 是 org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe。雖然實現了 RCFile、ORC、Parquet、Sequence 等儲存格式,但是無法自動識別 Hive 表的儲存格式。如果要使用其他儲存格式,需要修改原始碼,重新編譯。不過社群已經對這些儲存格式進行了測試,相信不久以後就可以在 Flink SQL 中使用。
- OpenCSVSerde 支援不完善
如果讀者使用 TextFile 的 row format serde 為 org.apache.hadoop.hive.serde2.OpenCSVSerde 時,無法正確識別字段型別,會把 Hive 表的欄位全部對映為 String 型別。
- 暫時不支援 Bucket 表
- 暫時不支援 ACID 表
- Flink SQL 優化方面功能較少
- 許可權控制方面
這方面和 Spark SQL 類似,目前基於 HDFS ACL 控制,暫時還沒有實現 Sentry 或 Ranger 控制權限,不過目前 Cloudera 正在開發基於 Ranger 設定 Spark SQL 和 Hive 共享訪問許可權的策略,實現行/列級控制以及審計資訊。
Flink 社群發展很快,所有這些問題只是暫時的,隨著新版本的釋出會被逐個解決。
如果 Flink SQL 目前不滿足的需求,建議使用 API 方式來解決問題。