Hudi-整合Flink(Flink操作hudi表)
阿新 • • 發佈:2022-03-08
一、安裝部署Flink 1.12
Apache Flink是一個框架和分散式處理引擎,用於對無界和有界資料流進行有狀態計算。Flink被設計在所有常見的叢集環境中執行,以記憶體執行速度和任意規模來執行計算。1.準備tar包
flink-1.13.1-bin-scala_2.12.tgz2.解壓
tar -zxvf flink-1.13.1-bin-scala_2.12.tgz3.新增Hadoop依賴jar包,放在flink的lib目錄下
flink-shaded-hadoop-2-uber-2.7.5-10.0.jar https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-hadoop-2-uber/2.7.5-10.04.啟動HDFS叢集
hadoop-daemon.sh start namenode hadoop-daemon.sh start datanode5.啟動flink本地叢集
/flink/bin/start-cluster.sh 可看到兩個程序:TaskManagerRunner、StandaloneSessionClusterEntrypoint 停止命令 /flink/bin/stop-cluster.sh6.Flink Web UI
http://localhost:8081/#/overview7.執行官方示例
讀取文字檔案資料,進行詞頻統計WordCount,將結果列印控制檯 /flink/bin/flink run /fline/examples/batch/WordCount.jar二、Flink整合Hudi時,本質將整合jar包:hudi-flink-bundle_2.12-0.10.1.jar,放入Flink應用CLASSPATH下即可。
三、啟動Flink SQL Cli命令列
四、使用
1.建立表:test_flink_hudi_mor,資料儲存到hudi表中,底層HDFS儲存,表型別MOR
CREATE TABLE test_flink_hudi_mor( uuid VARCHAR(20), name VARCHAR(10), age INT, ts TIMESTAMP(3), `partition` VARCHAR(20) ) PARTITIONED BY (`partition`) WITH( 'connector' = 'hudi', 'path' = 'hdfs://localhost:9000/hudi-warehouse/test_flink_hudi_mor', 'write.tasks' = '1', 'compaction.tasks' = '1', 'table.type' = 'MERGE_ON_READ' );connector:表聯結器 path:資料儲存路徑 write.tasks:flink往hudi寫資料時,task數量 compaction.tasks:往hudi寫資料時,做合併的task數量 table.type:hudi表型別
Flink SQL> desc test_flink_hudi_mor; > +-----------+--------------+------+-----+--------+-----------+ | name | type | null | key | extras | watermark | +-----------+--------------+------+-----+--------+-----------+ | uuid | VARCHAR(20) | true | | | | | name | VARCHAR(10) | true | | | | | age | INT | true | | | | | ts | TIMESTAMP(3) | true | | | | | partition | VARCHAR(20) | true | | | | +-----------+--------------+------+-----+--------+-----------+ 5 rows in set
2.插入資料
INSERT INTO test_flink_hudi_mor VALUES ('id1','FZ',29, TIMESTAMP '1993-04-09 00:00:01', 'par1' ); INSERT INTO test_flink_hudi_mor VALUES ('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'), ('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'), ('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'), ('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'), ('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'), ('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'), ('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'), ('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');重複insert,會更新,id1的值由 VALUES ('id1','FZ',29, TIMESTAMP '1993-04-09 00:00:01', 'par1’ ) 改為 ('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1’) 因為是MOR表,先入log,還未合併成parquet檔案,如下圖:
四、Streaming query
1.建立表:test_flink_hudi_mor_2, 以流的方式查詢讀取,對映到前面表test_flink_hudi_mor
CREATE TABLE test_flink_hudi_mor_2( uuid VARCHAR(20), name VARCHAR(10), age INT, ts TIMESTAMP(3), `partition` VARCHAR(20) ) PARTITIONED BY (`partition`) WITH( 'connector' = 'hudi', 'path' = 'hdfs://localhost:9000/hudi-warehouse/test_flink_hudi_mor', 'table.type' = 'MERGE_ON_READ', 'read.tasks' = '1', 'read.streaming.enabled' = 'true', 'read.streaming.start-commit' = '20220307211200', 'read.streaming.check-interval' = '4' );read.streaming.enabled設定為true,表名通過streaming的方式讀取表資料 read.streaming.check-interval指定了source監控新的commits的間隔為4s table.type設定表型別為MERGE_ON_READ
2.重新開啟terminal啟動flink SQL CLI,重新建立表:test_flink_hudi_mor,採用批batch模式插入一條資料
CREATE TABLE test_flink_hudi_mor( uuid VARCHAR(20), name VARCHAR(10), age INT, ts TIMESTAMP(3), `partition` VARCHAR(20) ) PARTITIONED BY (`partition`) WITH( 'connector' = 'hudi', 'path' = 'hdfs://localhost:9000/hudi-warehouse/test_flink_hudi_mor', 'write.tasks' = '1', 'compaction.tasks' = '1', 'table.type' = 'MERGE_ON_READ' ); INSERT INTO test_flink_hudi_mor VALUES ('id9','FZ',29, TIMESTAMP '1993-04-09 00:00:01', 'par5' ); INSERT INTO test_flink_hudi_mor VALUES ('id10','DX',28, TIMESTAMP '1994-06-02 00:00:01', 'par5' );