實踐資料湖iceberg 第四課 在sqlclient中,以sql方式從kafka讀資料到iceberg(升級版本到flink1.12.7)
阿新 • • 發佈:2022-04-17
前言
之前使用flink1.11.6 iceberg0.11 沒寫成功,升級flink到1.12.7
升級後版本:
flink-1.12.7-bin-scala_2.12
flink-sql-connector-hive-2.3.6_2.12-1.12.7.jar
kafka_2.12-2.4.1
1. 啟動flink sql
[root@hadoop101 bin]# sql-client.sh embedded -j /opt/software/iceberg-flink-runtime-0.12.1.jar -j /opt/software/flink-sql-connector-hive-2.3.6_2.12-1.12.7.jar -j /opt/software/flink-sql-connector-kafka_2.12-1.12.7.jar shell SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/opt/module/flink-1.12.7/lib/log4j-slf4j-impl-2.16.0.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/opt/module/hadoop-2.7.2/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory] No default environment specified. Searching for '/opt/module/flink-1.12.7/conf/sql-client-defaults.yaml'...found. Reading default environment from: file:/opt/module/flink-1.12.7/conf/sql-client-defaults.yaml No session environment specified. Command history file path: /root/.flink-sql-history ▒▓██▓██▒ ▓████▒▒█▓▒▓███▓▒ ▓███▓░░ ▒▒▒▓██▒ ▒ ░██▒ ▒▒▓▓█▓▓▒░ ▒████ ██▒ ░▒▓███▒ ▒█▒█▒ ░▓█ ███ ▓░▒██ ▓█ ▒▒▒▒▒▓██▓░▒░▓▓█ █░ █ ▒▒░ ███▓▓█ ▒█▒▒▒ ████░ ▒▓█▓ ██▒▒▒ ▓███▒ ░▒█▓▓██ ▓█▒ ▓█▒▓██▓ ░█░ ▓░▒▓████▒ ██ ▒█ █▓░▒█▒░▒█▒ ███▓░██▓ ▓█ █ █▓ ▒▓█▓▓█▒ ░██▓ ░█░ █ █▒ ▒█████▓▒ ██▓░▒ ███░ ░ █░ ▓ ░█ █████▒░░ ░█░▓ ▓░ ██▓█ ▒▒▓▒ ▓███████▓░ ▒█▒ ▒▓ ▓██▓ ▒██▓ ▓█ █▓█ ░▒█████▓▓▒░ ██▒▒ █ ▒ ▓█▒ ▓█▓ ▓█ ██▓ ░▓▓▓▓▓▓▓▒ ▒██▓ ░█▒ ▓█ █ ▓███▓▒░ ░▓▓▓███▓ ░▒░ ▓█ ██▓ ██▒ ░▒▓▓███▓▓▓▓▓██████▓▒ ▓███ █ ▓███▒ ███ ░▓▓▒░░ ░▓████▓░ ░▒▓▒ █▓ █▓▒▒▓▓██ ░▒▒░░░▒▒▒▒▓██▓░ █▓ ██ ▓░▒█ ▓▓▓▓▒░░ ▒█▓ ▒▓▓██▓ ▓▒ ▒▒▓ ▓█▓ ▓▒█ █▓░ ░▒▓▓██▒ ░▓█▒ ▒▒▒░▒▒▓█████▒ ██░ ▓█▒█▒ ▒▓▓▒ ▓█ █░ ░░░░ ░█▒ ▓█ ▒█▓ ░ █░ ▒█ █▓ █▓ ██ █░ ▓▓ ▒█▓▓▓▒█░ █▓ ░▓██░ ▓▒ ▓█▓▒░░░▒▓█░ ▒█ ██ ▓█▓░ ▒ ░▒█▒██▒ ▓▓ ▓█▒ ▒█▓▒░ ▒▒ █▒█▓▒▒░░▒██ ░██▒ ▒▓▓▒ ▓██▓▒█▒ ░▓▓▓▓▒█▓ ░▓██▒ ▓░ ▒█▓█ ░░▒▒▒ ▒▓▓▓▓▓▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒░░▓▓ ▓░▒█░ ______ _ _ _ _____ ____ _ _____ _ _ _ BETA | ____| (_) | | / ____|/ __ \| | / ____| (_) | | | |__ | |_ _ __ | | __ | (___ | | | | | | | | |_ ___ _ __ | |_ | __| | | | '_ \| |/ / \___ \| | | | | | | | | |/ _ \ '_ \| __| | | | | | | | | < ____) | |__| | |____ | |____| | | __/ | | | |_ |_| |_|_|_| |_|_|\_\ |_____/ \___\_\______| \_____|_|_|\___|_| |_|\__| Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit. Flink SQL>
2. 建kafka表
format=raw的只有在flink1.12後才支援
create table kafka_test_log ( data String ) WITH ( 'connector' = 'kafka', 'topic' = 'test_log', 'properties.bootstrap.servers' = 'hadoop101:9092,hadoop102:9092,hadoop103:9092', 'properties.group.id' = 'rickKafkaHiveGroup5', 'scan.startup.mode' = 'earliest-offset', 'format' = 'raw' ) create table kafka_test_log_csv ( data String ) WITH ( 'connector' = 'kafka', 'topic' = 'test_log', 'properties.bootstrap.servers' = 'hadoop101:9092,hadoop102:9092,hadoop103:9092', 'properties.group.id' = 'rickKafkaHiveGroup6', 'scan.startup.mode' = 'earliest-offset', 'format' = 'csv' ) create table kafka_test_log2 ( data String ) WITH ( 'connector' = 'kafka', 'topic' = 'test_log2', 'properties.bootstrap.servers' = 'hadoop101:9092,hadoop102:9092,hadoop103:9092', 'properties.group.id' = 'rickKafkaHiveGroup5', 'scan.startup.mode' = 'earliest-offset', 'format' = 'raw' ) create table kafka_test_log_csv ( data String ) WITH ( 'connector' = 'kafka', 'topic' = 'test_log', 'properties.bootstrap.servers' = 'hadoop101:9092,hadoop102:9092,hadoop103:9092', 'properties.group.id' = 'rickKafkaHiveGroup7', 'scan.startup.mode' = 'earliest-offset', 'format' = 'csv' )
3. 讀kafka的資料寫入到kafka
Flink SQL> insert into kafka_test_log2 select * from kafka_test_log;
[INFO] Submitting SQL update statement to the cluster...
[INFO] Table update statement has been successfully submitted to the cluster:
Job ID: 777618b911d015a9b80cab316edf3fe8
頁面檢視
讀進來和發出去的條數都是0,
使用sql直接查,發現把資料完整從 kafka_test_log寫到 kafka_test_log2;
結論:flink的insert into 語法的mertrix有bug,顯示條數有問題
Flink SQL> select * from kafka_test_log2;
4.寫入到iceberg
程式碼如下(示例):
4.1 建立 hive catalog 從kafka->iceberg
建立hive_catalog與表
CREATE CATALOG hive_catalog4 WITH (
'type'='iceberg',
'catalog-type'='hive',
'uri'='thrift://hadoop101:9083',
'clients'='5',
'property-version'='1',
'warehouse'='hdfs:///user/hive/warehouse/hive_catalog4'
);
在hive_catalog下建立資料庫
use catalog hive_catalog4;
create table `hive_catalog4`.`default`.`ib_hive_test_log`(
data String
);
在hive datalog下建表,寫入iceberg
insert into `hive_catalog4`.`default`.`ib_hive_test_log` select * from default_catalog.default_database.kafka_test_log_csv
4.2 建立 hadoop catalog ,從kafka->iceberg
CREATE CATALOG hadoop_catalog4 WITH (
'type'='iceberg',
'catalog-type'='hadoop',
'warehouse'='hdfs://ns/user/hive/warehouse/iceberg_hadoop_catalog4',
'property-version'='1'
);
use catalog hadoop_catalog4;
create database iceberg_db;
create table `hadoop_catalog4`.`iceberg_db`.`ib_hadoop_test_log`(
data String
);
insert into hadoop_catalog4.iceberg_db.ib_hadoop_test_log select data from default_catalog.default_database.kafka_test_log ;
到hdfs檢視
生產者生產看看,發現iceberg的資料目錄還是0,iceberg的輸出沒有
[root@hadoop101 ~]# kafka-console-producer.sh --topic test_log --broker-list hadoop101:9092,hadoop102:9092
總結
經過測試,讀寫kafka都沒有問題 有想過是否消費者組的問題,更換消費者組,還是沒輸出。。。 hive catalog 與 hadoop catalog都嘗試過,沒用是不是iceberg有問題?