1. 程式人生 > 其它 >資料湖實踐第一課 flink+iceberg入門

資料湖實踐第一課 flink+iceberg入門

 

 

前言

資料胡越來越熱門,我也開始探索資料湖在公司落地,把資料湖實踐入門、填坑做個記錄,也方便以後大家入門


一、資料湖iceberg實踐環境說明

1.hadoop版本 社群版 2.7.2
2. hive版本 2.3.6
3. flink版本1.11.6 目前flink出來flink1.14.2,但先選擇flink1.11看看,原因是官網建議用flink1.11,減少用其他版本造成的坑。

官網說明: Step.1 Downloading the flink 1.11.x binary package from the apache flink download page. We now use scala 2.12 to archive the apache iceberg-flink-runtime jar, so it’s recommended to use flink 1.11 bundled with scala 2.12.

二、啟動flink sql客戶端

1. 啟動flink standalone叢集

https://iceberg.apache.org/#flink/#preparation-when-using-flink-sql-client

程式碼如下(示例):

# HADOOP_HOME is your hadoop root directory after unpack the binary package.
export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`

# Start the flink standalone cluster
.
/bin/start-cluster.sh

2.下載flink iceberg runtime的包,啟動flink-sql

程式碼如下(示例):
下載地址:https://repo.maven.apache.org/maven2/org/apache/iceberg/iceberg-flink-runtime/
iceberg-flink-runtime-xxx.jar
我使用 iceberg-flink-runtime-0.11.1.jar

啟動flink sql並帶上iceberg

bin/sql-client.sh embedded -j /opt/software/iceberg-flink-runtime-0.11
.1.jar shell

3.建立基於hadoop的catalog

建立指令碼,warehouse的路徑,它會自動建立
hdfs路徑裡面 ns是名稱空間,但namenode的使用ip:port代替
在flink-sql client 中執行指令碼

CREATE CATALOG hadoop_catalog WITH (
  'type'='iceberg',
  'catalog-type'='hadoop',
  'warehouse'='hdfs://ns/user/hive/warehouse/iceberg_hadoop_catalog',
  'property-version'='1'
);

會自動建立路徑 /user/hive/warehouse/iceberg_hadoop_catalog/default, 下面是空的
[root@hadoop101 ~]# hadoop fs -ls /user/hive/warehouse/iceberg_hadoop_catalog/default

Flink SQL> show catalogs;
default_catalog
hadoop_catalog

建立資料庫

Flink SQL> create database iceberg_db;
[INFO] Database has been created.

Flink SQL> show databases;
default_database
iceberg_db

建立表


Flink SQL> CREATE TABLE `hadoop_catalog`.`default`.`sample` (
>     id BIGINT COMMENT 'unique id',
>     data STRING
> );
[INFO] Table has been created.

查看錶, 發現從目前庫找,找不到。

Flink SQL> use default_database;

Flink SQL> show tables;
[INFO] Result was empty.

Flink SQL> use iceberg_db;

Flink SQL> show tables;
[INFO] Result was empty.

從hdfs路徑去找,發現,生成了表的目錄和元資訊

4.寫資料,讀資料測試(hadoop catalog的限制)

hadoop catalog 建立的東西只能在本客戶端使用

開啟另一個sql客戶端,寫資料

bin/sql-client.sh embedded -j /opt/software/iceberg-flink-runtime-0.11.1.jar shell

Flink SQL> INSERT INTO `hadoop_catalog`.`default`.`sample` VALUES (1, 'a');
[INFO] Submitting SQL update statement to the cluster...
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.TableException: Sink `hadoop_catalog`.`default`.`sample` does not exists


Flink SQL> show databases;
default_database

發現在第一個客戶端建立的database和表,都沒有, 我先認為這個hadoop catalog的限制。

步驟1: 把所有sql-client的客戶端退出,重新進入sql-client
步驟2:檢查hadoop上hadoop_catalog對應的表是否還在,發現表還在

發現1. 之前建立的database沒有了,獲取hadoop_catalog ,
結論: 客戶端退出後,catalog在hadoop上的資訊還在,但客戶端需要重新建立catalog,catalog下的表不用重新建

Flink SQL> show catalogs;
default_catalog

重新建立catalog
Flink SQL> CREATE CATALOG hadoop_catalog WITH (
>   'type'='iceberg',
>   'catalog-type'='hadoop',
>   'warehouse'='hdfs://ns/user/hive/warehouse/iceberg_hadoop_catalog',
>   'property-version'='1'
> );
[INFO] Catalog has been created.

Flink SQL> use hadoop_catalog;
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.catalog.exceptions.CatalogException: A database with name [hadoop_catalog] does not exist in the catalog: [default_catalog].

Flink SQL> use catalog hadoop_catalog;

Flink SQL> show tables;
sample
sample_like

Flink SQL> show databases;
default

Flink SQL> show catalogs;
default_catalog
hadoop_catalog

插入兩條資料,再查詢出來看看

Flink SQL> INSERT INTO `hadoop_catalog`.`default`.`sample` VALUES (1, 'a');
[INFO] Submitting SQL update statement to the cluster...
[INFO] Table update statement has been successfully submitted to the cluster:
Job ID: a7008acfe1389133c1ae6a5c00e4d611


Flink SQL> INSERT INTO `hadoop_catalog`.`default`.`sample` VALUES (2, 'b');
[INFO] Submitting SQL update statement to the cluster...
[INFO] Table update statement has been successfully submitted to the cluster:
Job ID: f642dd21e493d630824cb9b30098de3c

Flink SQL> select * from sample;

查詢結果:

看看hdfs上的檔案

檢視data的資料: 發現 2個數據檔案

檢視metadata,發現metadata比較多

之前沒完整記錄第一執行後的metadata資訊,需要重跑,這個流程,記錄完整變更的資訊

5.建立基於hive的catalog

5.1 建立hive的catalog失敗

建立報錯,錯誤原因,如下圖, 沒有hive的依賴
建hive catalog語法

 CREATE CATALOG hive_catalog WITH (
  'type'='iceberg',
  'catalog-type'='hive',
  'uri'='thrift://hadoop101:9083',
  'clients'='5',
  'property-version'='1',
  'warehouse'='/user/hive/warehouse'
);
Flink SQL>  CREATE CATALOG hive_catalog WITH (
>   'type'='iceberg',
>   'catalog-type'='hive',
>   'uri'='thrift://hadoop101:9083',
>   'clients'='5',
>   'property-version'='1',
>   'warehouse'='/user/hive/warehouse'
> );
> 
Exception in thread "main" org.apache.flink.table.client.SqlClientException: Unexpected exception. This is a bug. Please consider filing an issue.
        at org.apache.flink.table.client.SqlClient.main(SqlClient.java:222)
Caused by: java.lang.NoClassDefFoundError: org/apache/hadoop/hive/metastore/api/NoSuchObjectException
        at org.apache.iceberg.flink.CatalogLoader$HiveCatalogLoader.loadCatalog(CatalogLoader.java:112)
        at org.apache.iceberg.flink.FlinkCatalog.<init>(FlinkCatalog.java:111)
        at org.apache.iceberg.flink.FlinkCatalogFactory.createCatalog(FlinkCatalogFactory.java:127)
        at org.apache.iceberg.flink.FlinkCatalogFactory.createCatalog(FlinkCatalogFactory.java:117)
        at org.apache.flink.table.api.internal.TableEnvironmentImpl.createCatalog(TableEnvironmentImpl.java:1110)
        at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:1043)
        at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:693)
        at org.apache.flink.table.client.gateway.local.LocalExecutor.lambda$executeSql$7(LocalExecutor.java:366)
        at org.apache.flink.table.client.gateway.local.ExecutionContext.wrapClassLoader(ExecutionContext.java:254)
        at org.apache.flink.table.client.gateway.local.LocalExecutor.executeSql(LocalExecutor.java:366)
        at org.apache.flink.table.client.cli.CliClient.callDdl(CliClient.java:651)
        at org.apache.flink.table.client.cli.CliClient.callDdl(CliClient.java:646)
        at org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:362)
        at java.util.Optional.ifPresent(Optional.java:159)
        at org.apache.flink.table.client.cli.CliClient.open(CliClient.java:210)
        at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:147)
        at org.apache.flink.table.client.SqlClient.start(SqlClient.java:115)
        at org.apache.flink.table.client.SqlClient.main(SqlClient.java:208)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hive.metastore.api.NoSuchObjectException
        at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:62)
        at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:65)
        at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:47)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        ... 18 more

Shutting down the session...
done.
);

解決方法: 增加hive的classpath 看看, 官網沒看到增加的方法。。。
想到個辦法:直接把hive/lib的classpath放到hadoop classpath上,不就ok了?

export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`:/opt/module/hive/lib/*.jar

改完,重跑,沒效果

繼續努力
解決問題思路,
分析問題: 安裝報錯提示, 是包沒找到,由這一行發出來的
org.apache.iceberg.flink.CatalogLoader$HiveCatalogLoader.loadCatalog(CatalogLoader.java:112)
把iceberg的原始碼下載回來對於0.11分支的。 發現對於的hive是2.3.7版本,跟我使用的hive2.3.6沒大版本變動。
解決方法:排除了hive版本問題,hive的classpath也引進來了,繼續看官網
最後解決方法:增加flink-sql-connector-hive-2.3.6_2.11-1.11.0.jar

[root@hadoop101 software]# bin/sql-client.sh embedded -j /opt/software/iceberg-flink-runtime-0.11.1.jar  -j /opt/software/flink-sql-connector-hive-2.3.6_2.11-1.11.0.jar shell

5.1 成功建立catalog

[root@hadoop101 software]# sql-client.sh embedded -j /opt/software/iceberg-flink-runtime-0.11.1.jar  -j /opt/software/flink-sql-connector-hive-2.3.6_2.11-1.11.0.jar shell
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/module/flink-1.11.6/lib/log4j-slf4j-impl-2.16.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/module/hadoop-2.7.2/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
No default environment specified.
Searching for '/opt/module/flink-1.11.6/conf/sql-client-defaults.yaml'...found.
Reading default environment from: file:/opt/module/flink-1.11.6/conf/sql-client-defaults.yaml
No session environment specified.

Command history file path: /root/.flink-sql-history
                                   ▒▓██▓██▒
                               ▓████▒▒█▓▒▓███▓▒
                            ▓███▓░░        ▒▒▒▓██▒  ▒
                          ░██▒   ▒▒▓▓█▓▓▒░      ▒████
                          ██▒         ░▒▓███▒    ▒█▒█▒
                            ░▓█            ███   ▓░▒██
                              ▓█       ▒▒▒▒▒▓██▓░▒░▓▓█
                            █░ █   ▒▒░       ███▓▓█ ▒█▒▒▒
                            ████░   ▒▓█▓      ██▒▒▒ ▓███▒
                         ░▒█▓▓██       ▓█▒    ▓█▒▓██▓ ░█░
                   ▓░▒▓████▒ ██         ▒█    █▓░▒█▒░▒█▒
                  ███▓░██▓  ▓█           █   █▓ ▒▓█▓▓█▒
                ░██▓  ░█░            █  █▒ ▒█████▓▒ ██▓░▒
               ███░ ░ █░          ▓ ░█ █████▒░░    ░█░▓  ▓░
              ██▓█ ▒▒▓▒          ▓███████▓░       ▒█▒ ▒▓ ▓██▓
           ▒██▓ ▓█ █▓█       ░▒█████▓▓▒░         ██▒▒  █ ▒  ▓█▒
           ▓█▓  ▓█ ██▓ ░▓▓▓▓▓▓▓▒              ▒██▓           ░█▒
           ▓█    █ ▓███▓▒░              ░▓▓▓███▓          ░▒░ ▓█
           ██▓    ██▒    ░▒▓▓███▓▓▓▓▓██████▓▒            ▓███  █
          ▓███▒ ███   ░▓▓▒░░   ░▓████▓░                  ░▒▓▒  █▓
          █▓▒▒▓▓██  ░▒▒░░░▒▒▒▒▓██▓░                            █▓
          ██ ▓░▒█   ▓▓▓▓▒░░  ▒█▓       ▒▓▓██▓    ▓▒          ▒▒▓
          ▓█▓ ▓▒█  █▓░  ░▒▓▓██▒            ░▓█▒   ▒▒▒░▒▒▓█████▒
           ██░ ▓█▒█▒  ▒▓▓▒  ▓█                █░      ░░░░   ░█▒
           ▓█   ▒█▓   ░     █░                ▒█              █▓
            █▓   ██         █░                 ▓▓        ▒█▓▓▓▒█░
             █▓ ░▓██░       ▓▒                  ▓█▓▒░░░▒▓█░    ▒█
              ██   ▓█▓░      ▒                    ░▒█▒██▒      ▓▓
               ▓█▒   ▒█▓▒░                         ▒▒ █▒█▓▒▒░░▒██
                ░██▒    ▒▓▓▒                     ▓██▓▒█▒ ░▓▓▓▓▒█▓
                  ░▓██▒                          ▓░  ▒█▓█  ░░▒▒▒
                      ▒▓▓▓▓▓▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒░░▓▓  ▓░▒█░
          
    ______ _ _       _       _____  ____  _         _____ _ _            _  BETA   
   |  ____| (_)     | |     / ____|/ __ \| |       / ____| (_)          | |  
   | |__  | |_ _ __ | | __ | (___ | |  | | |      | |    | |_  ___ _ __ | |_ 
   |  __| | | | '_ \| |/ /  \___ \| |  | | |      | |    | | |/ _ \ '_ \| __|
   | |    | | | | | |   <   ____) | |__| | |____  | |____| | |  __/ | | | |_ 
   |_|    |_|_|_| |_|_|\_\ |_____/ \___\_\______|  \_____|_|_|\___|_| |_|\__|
          
        Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit.


Flink SQL> CREATE CATALOG hive_catalog WITH (
>   'type'='iceberg',
>   'catalog-type'='hive',
>   'uri'='thrift://hadoop101:9083',
>   'clients'='5',
>   'property-version'='1',
>   'hive-conf-dir'='/opt/module/hive/conf'
> );
2022-01-13 10:58:27,528 INFO  org.apache.hadoop.hive.conf.HiveConf                         [] - Found configuration file null
2022-01-13 10:58:27,741 WARN  org.apache.hadoop.hive.conf.HiveConf                         [] - HiveConf of name hive.metastore.event.db.notification.api.auth does not exist
[INFO] Catalog has been created.

5.3 測試基於hive 的catalog對多客戶端的支援

在本客戶端檢視

Flink SQL> show catalogs;
default_catalog
hive_catalog

新開一個客戶端,(第一個客戶端沒有退出)

Flink SQL> show catalogs;
default_catalog
Flink SQL> show databases;
default_database

結論: 沒有看到hive_catalog, 說明: 對hadoop,hive的catalog, 不同客戶端是不共享的。


總結

按照官網跑: [link](https://iceberg.apache.org/#flink/).

1.瞭解iceberg 支援3種catalog儲存機制
2.目前實踐了第一種 儲存到hadoop上,這種方式,多客戶端無法共享catalog,無法上生產
3.需要使用基於hive的catalog