1. 程式人生 > 其它 >資料湖之iceberg系列(六)-flink處理資料

資料湖之iceberg系列(六)-flink處理資料

1 叢集搭建

安裝flink叢集 ,啟動flink叢集!

bin/start-cluster.sh

將flink-run...jar上傳到flink的lib目錄下

啟動flink-sql

bin/sql-client.sh embedded -j ./lib/iceberg-flink-runtime-0.10.0.jar shell

2 快速入門

CREATE CATALOG hive_catalog WITH (
 'type'='iceberg',
 'catalog-type'='hive',
 'uri'='thrift://linux01:9083',
 'clients'='5',
 'property-version'='1',
 'warehouse'='hdfs://linux01:8020//user/hive/warehouse/flink'
);
CREATE CATALOG hadoop_catalog WITH (
 'type'='iceberg',
 'catalog-type'='hadoop',
 'warehouse'='hdfs://linux01:8020/warehouse/flink',
 'property-version'='1'
);

切換catalog ;

Use catalog hadoop_catalog;

建立資料庫

CREATE DATABASE iceberg_db;

USE iceberg_db;

建表

create table t2(id bigint , name string) partitioned by (name) ;

插入資料

Insert into t2 values(1,’hangge’) ;

設定引數

-- Execute the flink job in streaming mode for current session contextSETexecution.type = streaming

-- Execute the flink job in batch mode for current session contextSETexecution.type = batch

查詢資料

Select * from t2 ;

3 基本操作

3.1 flink-sql操作

建庫

CREATE DATABASE ..

建表

CREATETABLEhive_catalog.default.sample (

idBIGINTCOMMENT'unique id',

dataSTRING

);

Table create commands support the most commonly usedflink create clausesnow, including:

PARTITION BY (column1, column2, ...)to configure partitioning, apache flink does not yet support hidden partitioning.
COMMENT 'table document'to set a table description.
WITH ('key'='value', ...)to settable configurationwhich will be stored in apache iceberg table properties.
Currently, it does not support computed column, primary key and watermark definition etc.

插入資料

Insert into t values

INSERTOVERWRITE hive_catalog.default.sample PARTITION(data='a') SELECT6;

查詢資料

SETexecution.type = batch ;

SELECT* FROMsample;

修改表

ALTERTABLEhive_catalog.default.sample SET('write.format.default'='avro')

重新命名

ALTERTABLEhive_catalog.default.sample RENAMETOhive_catalog.default.new_sample;

刪除表

DROPTABLEhive_catalog.default.sample;

3.2DataStream操作

Read with DataStream(不支援)

Writewith DataStream

追加寫

StreamExecutionEnvironment env = ...;

DataStream<RowData> input = ... ;

Configuration hadoopConf = newConfiguration();

TableLoader tableLoader = TableLoader.fromHadooptable("hdfs://nn:8020/warehouse/path");

FlinkSink.forRowData(input)

.tableLoader(tableLoader)

.hadoopConf(hadoopConf)

.build();

env.execute("Test Iceberg DataStream");

覆蓋寫

StreamExecutionEnvironment env = ...;

DataStream<RowData> input = ... ;

Configuration hadoopConf = newConfiguration();

TableLoader tableLoader = TableLoader.fromHadooptable("hdfs://nn:8020/warehouse/path");

FlinkSink.forRowData(input)

.tableLoader(tableLoader)

.overwrite(true)

.hadoopConf(hadoopConf)

.build();

env.execute("Test Iceberg DataStream");