Flink實戰(110):flink-sql使用(十八)connector(十九)Flink Hive Connector 使用
阿新 • • 發佈:2020-12-23
來源:https://www.yuque.com/docs/share/14a7a0e8-37d1-4142-8962-48dcf3761f7e?#
Flink 1.12 版本
1. Hive 建表
//1、建立 Hive 資料庫 create database zhisheng; //2、檢視建立的資料庫 show databases; //3、使用建立的資料庫 use zhisheng; //4、在該庫下建立 Hive 表 CREATE TABLE IF NOT EXISTS flink ( appid int, message String ) ROW FORMAT DELIMITED FIELDS TERMINATEDBY '\t' LINES TERMINATED BY '\n' STORED AS TEXTFILE; //5、往該表插入一條資料 insert into flink values(11111, '233sadadadwqqdq');
2.Flink 讀取 Hive 已經存在的表資料
//1、建立 Hive CATALOG,Flink 通過 catalog 不僅可以將自己的表寫入 Hive 的 metastore,也能讀寫 Hive 的表 CREATE CATALOG flinkHiveCatalog WITH ( 'type' = 'hive', 'default-database' = 'zhisheng', 'hive-conf-dir' = '/app/apache-hive-2.1.1-bin/conf' ); //2、使用該 Catalog USE CATALOG flinkHiveCatalog; //3、因為剛才已經寫入了一條資料到 Hive 表(flink) select * from flink;
3.Flink 往 Hive 中已經存在的表寫資料
//1、建立 Source 表 CREATE TABLE yarn_log_datagen_test_hive_sink ( appid INT, message STRING ) WITH ('connector' = 'datagen', 'rows-per-second'='10', 'fields.appid.kind'='random', 'fields.appid.min'='1', 'fields.appid.max'='1000', 'fields.message.length'='100' ); //2、將資料寫入到 Hive 表 insert into flink select * from yarn_log_datagen_test_hive_sink;
//再次查詢 Hive 表裡面的資料 select * from flink;
直接在 Hive 利用命令查詢:
4 .完整 Example
CREATE CATALOG flinkHiveCatalog WITH ( 'type' = 'hive', 'default-database' = 'zhisheng', 'hive-conf-dir' = '/app/apache-hive-2.1.1-bin/conf' ); USE CATALOG flinkHiveCatalog; SET table.sql-dialect=hive; -- 建立 Hive 表要指定 sql-dialect 為 Hive,否則建立的時候識別不了下面的 DDL 語句 CREATE TABLE yarn_logs ( appid INT, message STRING ) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES ( 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00', 'sink.partition-commit.trigger'='partition-time', 'sink.partition-commit.delay'='1 h', 'sink.partition-commit.policy.kind'='metastore,success-file', 'sink.parallelism'='2' -- 該引數內部才支援設定並行度 ); SET table.sql-dialect=default; -- 建立 Flink 表又要換回預設的 sql-dialect,Flink 支援在同一個 SQL 裡面設定多個 sql-dialect CREATE TABLE yarn_log_datagen_test ( appid INT, message STRING, log_ts TIMESTAMP(3), WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '10', 'fields.appid.kind' = 'random', 'fields.appid.min' = '1', 'fields.appid.max' = '1000', 'fields.message.length' = '100' ); -- streaming sql, insert into hive table INSERT INTO yarn_logs SELECT appid, message, DATE_FORMAT(log_ts, 'yyyy-MM-dd'), DATE_FORMAT(log_ts, 'HH') FROM yarn_log_datagen_test; -- batch sql, select with partition pruning SELECT * FROM yarn_logs WHERE dt='2020-12-16' and hr='12';
檢視 table 的儲存路徑
show create table yarn_logs;