1. 程式人生 > 其它 >Hudi-整合Flink(Flink操作hudi表)

Hudi-整合Flink(Flink操作hudi表)

一、安裝部署Flink 1.12

Apache Flink是一個框架和分散式處理引擎,用於對無界和有界資料流進行有狀態計算。Flink被設計在所有常見的叢集環境中執行,以記憶體執行速度和任意規模來執行計算。  

1.準備tar包

flink-1.13.1-bin-scala_2.12.tgz

2.解壓

 tar -zxvf flink-1.13.1-bin-scala_2.12.tgz

3.新增Hadoop依賴jar包,放在flink的lib目錄下

flink-shaded-hadoop-2-uber-2.7.5-10.0.jar https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-hadoop-2-uber/2.7.5-10.0
 

4.啟動HDFS叢集

hadoop-daemon.sh start namenode hadoop-daemon.sh start datanode

5.啟動flink本地叢集

/flink/bin/start-cluster.sh 可看到兩個程序:TaskManagerRunner、StandaloneSessionClusterEntrypoint   停止命令 /flink/bin/stop-cluster.sh  

6.Flink Web UI

http://localhost:8081/#/overview  

7.執行官方示例

讀取文字檔案資料,進行詞頻統計WordCount,將結果列印控制檯 /flink/bin/flink run /fline/examples/batch/WordCount.jar  

二、Flink整合Hudi時,本質將整合jar包:hudi-flink-bundle_2.12-0.10.1.jar,放入Flink應用CLASSPATH下即可。

Flink SQL Connector支援Hudi作為Source和Sink時,兩種方式將jar包放入CLASSPATH路徑: 方式一:執行Flink SQL Client命令時,通過引數【-j xx.jar】指定jar包 flink/bin/sql-client.sh embedded -j …./hudi-flink-bundle_2.12-0.10.1.jar 方式二:將jar包直接放入Flink軟體安裝包lib目錄下【$FLINK_HOME/lib】   修改conf/flink-conf.yaml taskmanager.numberOfTaskSlots: 4 works四個localhost   由於Flink需要連線HDFS檔案系統,所以需要設定HADOOP_CLASSPATH環境變數,再啟動叢集  

三、啟動Flink SQL Cli命令列

sql-client.sh embedded shell   設定分析結果展示模式為:set execution.result-mode=tableau;  

四、使用

1.建立表:test_flink_hudi_mor,資料儲存到hudi表中,底層HDFS儲存,表型別MOR

CREATE TABLE test_flink_hudi_mor(
    uuid VARCHAR(20),
    name VARCHAR(10),
    age INT,
    ts TIMESTAMP(3),
    `partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH(
    'connector' = 'hudi',
    'path' = 'hdfs://localhost:9000/hudi-warehouse/test_flink_hudi_mor',
    'write.tasks' = '1',
    'compaction.tasks' = '1',
    'table.type' = 'MERGE_ON_READ'
);
  connector:表聯結器 path:資料儲存路徑 write.tasks:flink往hudi寫資料時,task數量 compaction.tasks:往hudi寫資料時,做合併的task數量 table.type:hudi表型別  
Flink SQL> desc test_flink_hudi_mor;
>
+-----------+--------------+------+-----+--------+-----------+
|      name |         type | null | key | extras | watermark |
+-----------+--------------+------+-----+--------+-----------+
|      uuid |  VARCHAR(20) | true |     |        |           |
|      name |  VARCHAR(10) | true |     |        |           |
|       age |          INT | true |     |        |           |
|        ts | TIMESTAMP(3) | true |     |        |           |
| partition |  VARCHAR(20) | true |     |        |           |
+-----------+--------------+------+-----+--------+-----------+
5 rows in set

 

 

2.插入資料

INSERT INTO test_flink_hudi_mor VALUES ('id1','FZ',29, TIMESTAMP '1993-04-09 00:00:01', 'par1' );
 
INSERT INTO test_flink_hudi_mor VALUES
  ('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),
  ('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),
  ('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),
  ('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),
  ('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),
  ('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),
  ('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),
  ('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');
重複insert,會更新,id1的值由 VALUES ('id1','FZ',29, TIMESTAMP '1993-04-09 00:00:01', 'par1’ ) 改為  ('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1’) 因為是MOR表,先入log,還未合併成parquet檔案,如下圖:  

四、Streaming query

1.建立表:test_flink_hudi_mor_2, 以流的方式查詢讀取,對映到前面表test_flink_hudi_mor

CREATE TABLE test_flink_hudi_mor_2(
    uuid VARCHAR(20),
    name VARCHAR(10),
    age INT,
    ts TIMESTAMP(3),
    `partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH(
    'connector' = 'hudi',
    'path' = 'hdfs://localhost:9000/hudi-warehouse/test_flink_hudi_mor',   
    'table.type' = 'MERGE_ON_READ',
    'read.tasks' = '1',
    'read.streaming.enabled' = 'true',
    'read.streaming.start-commit' = '20220307211200',
    'read.streaming.check-interval' = '4'
); 
  read.streaming.enabled設定為true,表名通過streaming的方式讀取表資料 read.streaming.check-interval指定了source監控新的commits的間隔為4s table.type設定表型別為MERGE_ON_READ    

2.重新開啟terminal啟動flink SQL CLI,重新建立表:test_flink_hudi_mor,採用批batch模式插入一條資料

CREATE TABLE test_flink_hudi_mor(
    uuid VARCHAR(20),
    name VARCHAR(10),
    age INT,
    ts TIMESTAMP(3),
    `partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH(
    'connector' = 'hudi',
    'path' = 'hdfs://localhost:9000/hudi-warehouse/test_flink_hudi_mor',
    'write.tasks' = '1',
    'compaction.tasks' = '1',
    'table.type' = 'MERGE_ON_READ'
);
 
INSERT INTO test_flink_hudi_mor VALUES ('id9','FZ',29, TIMESTAMP '1993-04-09 00:00:01', 'par5' );
INSERT INTO test_flink_hudi_mor VALUES ('id10','DX',28, TIMESTAMP '1994-06-02 00:00:01', 'par5' );