資料湖之iceberg系列(六)-flink處理資料
1 叢集搭建
安裝flink叢集 ,啟動flink叢集!
bin/start-cluster.sh
將flink-run...jar上傳到flink的lib目錄下
啟動flink-sql
bin/sql-client.sh embedded -j ./lib/iceberg-flink-runtime-0.10.0.jar shell
2 快速入門
CREATE CATALOG hive_catalog WITH ( 'type'='iceberg', 'catalog-type'='hive', 'uri'='thrift://linux01:9083', 'clients'='5', 'property-version'='1', 'warehouse'='hdfs://linux01:8020//user/hive/warehouse/flink' );
CREATE CATALOG hadoop_catalog WITH (
'type'='iceberg',
'catalog-type'='hadoop',
'warehouse'='hdfs://linux01:8020/warehouse/flink',
'property-version'='1'
);
切換catalog ;
Use catalog hadoop_catalog;
建立資料庫
CREATE DATABASE iceberg_db;
USE iceberg_db;
建表
create table t2(id bigint , name string) partitioned by (name) ;
插入資料
Insert into t2 values(1,’hangge’) ;
設定引數
-- Execute the flink job in streaming mode for current session contextSETexecution.type = streaming
-- Execute the flink job in batch mode for current session contextSETexecution.type = batch
查詢資料
Select * from t2 ;
3 基本操作
3.1 flink-sql操作
建庫
CREATE DATABASE ..
建表
CREATETABLEhive_catalog.default.sample (
idBIGINTCOMMENT'unique id',
dataSTRING
);
Table create commands support the most commonly usedflink create clausesnow, including:
PARTITION BY (column1, column2, ...)to configure partitioning, apache flink does not yet support hidden partitioning.
COMMENT 'table document'to set a table description.
WITH ('key'='value', ...)to settable configurationwhich will be stored in apache iceberg table properties.
Currently, it does not support computed column, primary key and watermark definition etc.
插入資料
Insert into t values
INSERTOVERWRITE hive_catalog.default.sample PARTITION(data='a') SELECT6;
查詢資料
SETexecution.type = batch ;
SELECT* FROMsample;
修改表
ALTERTABLEhive_catalog.default.sample SET('write.format.default'='avro')
重新命名
ALTERTABLEhive_catalog.default.sample RENAMETOhive_catalog.default.new_sample;
刪除表
DROPTABLEhive_catalog.default.sample;
3.2DataStream操作
Read with DataStream(不支援)
Writewith DataStream
追加寫
StreamExecutionEnvironment env = ...;
DataStream<RowData> input = ... ;
Configuration hadoopConf = newConfiguration();
TableLoader tableLoader = TableLoader.fromHadooptable("hdfs://nn:8020/warehouse/path");
FlinkSink.forRowData(input)
.tableLoader(tableLoader)
.hadoopConf(hadoopConf)
.build();
env.execute("Test Iceberg DataStream");
覆蓋寫
StreamExecutionEnvironment env = ...;
DataStream<RowData> input = ... ;
Configuration hadoopConf = newConfiguration();
TableLoader tableLoader = TableLoader.fromHadooptable("hdfs://nn:8020/warehouse/path");
FlinkSink.forRowData(input)
.tableLoader(tableLoader)
.overwrite(true)
.hadoopConf(hadoopConf)
.build();
env.execute("Test Iceberg DataStream");