1. 程式人生 > 其它 >Hudi-Flink SQL實時讀取Hudi表資料 Hudi-Flink消費kafka將增量資料實時寫入Hudi

Hudi-Flink SQL實時讀取Hudi表資料 Hudi-Flink消費kafka將增量資料實時寫入Hudi

程式碼如下(hudi表實時寫入參考上一篇[Hudi-Flink消費kafka將增量資料實時寫入Hudi])

package com.zhen.hudi;

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;

/**
 * @Author FengZhen
 * @Date 3/10/22 8:33 PM
 * @Description 基於Flink SQL Connector實現:從hudi表中載入資料,編寫SQL查詢
 */
public
class FlinkSQLReadDemo { public static void main(String[] args) { //1.獲取表的執行環境 EnvironmentSettings settings = EnvironmentSettings .newInstance() .inStreamingMode() .build(); TableEnvironment tableEnv = TableEnvironment.create(settings);
//2.建立輸入表,TODO:載入hudi表資料 tableEnv.executeSql( "CREATE TABLE order_hudi(\n" + " `orderId` STRING PRIMARY KEY NOT ENFORCED,\n" + " `userId` STRING,\n" + " `orderTime` STRING,\n" + "
`ip` STRING,\n" + " `orderMoney` DOUBLE,\n" + " `orderStatus` INT,\n" + " `ts` STRING,\n" + " `partition_day` STRING\n" + ")\n" + "PARTITIONED BY (partition_day)\n" + "WITH(\n" + " 'connector' = 'hudi',\n" + " 'path'='hdfs://localhost:9000/hudi-warehouse/flink_hudi_order',\n" + " 'table.type' = 'MERGE_ON_READ',\n" + " 'read.streaming.enabled' = 'true',\n" + " 'read.streaming.check-interval' = '4'\n" + ")" ); //3.執行查詢語句,流式讀取hudi表資料 tableEnv.executeSql( "SELECT orderId, userId, orderTime, ip, orderMoney, orderStatus, ts, partition_day FROM order_hudi" ).print(); } }