1. 程式人生 > 其它 >實踐資料湖iceberg 第三課 在sqlclient中,以sql方式從kafka讀資料到iceberg

實踐資料湖iceberg 第三課 在sqlclient中,以sql方式從kafka讀資料到iceberg

 

環境說明

從實踐中瞭解hive catalog 的特點
環境說明:
flink1.11.6
iceberg 0.11
kafka2.12_2.4.1


1. 啟動帶hive和kafka功能的flink-sql

[root@hadoop101 software]# bin/sql-client.sh embedded -j /opt/software/iceberg-flink-runtime-0.11.1.jar  -j /opt/software/flink-sql-connector-hive-2.3.6_2.11-1.11.0.jar  -j /opt/software/flink-sql-connector-
kafka_2.11-1.11.6.jar shell

2. 建立一個hive datalog,來存放iceberg檔案

CREATE CATALOG hive_catalog WITH (
  'type'='iceberg',
  'catalog-type'='hive',
  'uri'='thrift://hadoop101:9083',
  'clients'='5',
  'property-version'='1',
  'hive-conf-dir'='/opt/module/hive/conf'
);

Flink SQL> show catalogs;
default_catalog
hive_catalog

建立hadoop catalog

CREATE CATALOG hadoop_catalog WITH (
  'type'='iceberg',
  'catalog-type'='hadoop',
  'warehouse'='hdfs://ns/user/hive/warehouse/iceberg_hadoop_catalog',
  'property-version'='1'
);

3 .catalog下建立資料庫和表

程式碼如下(示例):
建立資料庫

use  catalog hive_catalog
create database iceberg_db;

建立表:

create table `hive_catalog`.
`iceberg_db`.`ib_test_log`( data String );

4. 建立生產者,消費者

[root@hadoop102 kafka]# kafka-console-consumer.sh --topic test_log --bootstrap-server hadoop101:9092,hadoop102:9092,hadoop103:9092

生產者隨意生產資料
[root@hadoop101 ~]# kafka-console-producer.sh --topic test_log  --broker-list hadoop101:9092,hadoop102:9092

#5. 檢查生產者的資料是否能讀取到

Flink SQL>  select data from default_catalog.source.kafka_test_log;
[INFO] Result retrieval cancelled.

很確定能讀取到

5. 把資料寫入到iceberg表

insert into hive_catalog.iceberg_db.ib_test_log select data from default_catalog.source.kafka_test_log;

發現hdfs上的表,資料目錄大小一直都是0,但生產者在不斷寫入資料,topic名稱也檢查了。

6.寫入失敗分析

一開始,覺得是catalog不對,重新定義catalog,(kafka表沒有加catalog和database字首,如下圖)

重新定義kafka的catalog後,還是讀取不到。

給kafka表定義一個catalog和database, 發現還是讀不到資料,bytes received還是0

taskManager日誌

TaskManager的stack資訊,的確有在select資料,難道沒有select到?

"Kafka Fetcher for Source: TableSourceScan(table=[[default_catalog, source, kafka_test_log]], fields=[data]) -> IcebergStreamWriter (1/1)" Id=2405 RUNNABLE
	at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
	at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
	at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
	at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
	-  locked sun.nio.ch.Util$3@19f5e8f7
	-  locked java.util.Collections$UnmodifiableSet@744f2387
	-  locked sun.nio.ch.EPollSelectorImpl@2180e63e
	at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
	at org.apache.flink.kafka.shaded.org.apache.kafka.common.network.Selector.select(Selector.java:794)
	at org.apache.flink.kafka.shaded.org.apache.kafka.common.network.Selector.poll(Selector.java:467)
	at org.apache.flink.kafka.shaded.org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:547)
	...

	Number of locked synchronizers = 1
	- java.util.concurrent.locks.ReentrantLock$FairSync@2402c423

"Legacy Source Thread - Source: TableSourceScan(table=[[default_catalog, source, kafka_test_log]], fields=[data]) -> IcebergStreamWriter (1/1)" Id=2401 WAITING on java.lang.Object@12863204
	at java.lang.Object.wait(Native Method)
	-  waiting on java.lang.Object@12863204
	at java.lang.Object.wait(Object.java:502)
	at org.apache.flink.streaming.connectors.kafka.internal.Handover.pollNext(Handover.java:73)
	at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:137)
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:823)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:266)

"Thread-2084" Id=2404 TIMED_WAITING on java.util.LinkedList@1fd868c8
	at java.lang.Object.wait(Native Method)
	-  waiting on java.util.LinkedList@1fd868c8
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:418)

"OutputFlusher for Source: TableSourceScan(table=[[default_catalog, source, kafka_test_log]], fields=[data]) -> IcebergStreamWriter" Id=2400 TIMED_WAITING
	at java.lang.Thread.sleep(Native Method)
	at org.apache.flink.runtime.io.network.api.writer.RecordWriter$OutputFlusher.run(RecordWriter.java:328)

"IcebergFilesCommitter -> Sink: IcebergSink hive_catalog.iceberg_db.ib_test_log (1/1)" Id=2399 WAITING on java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@394fdc1b
	at sun.misc.Unsafe.park(Native Method)
	-  waiting on java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@394fdc1b
	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
	at org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.take(TaskMailboxImpl.java:136)
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:313)
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:188)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:608)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:574)
	...

總結(問題排查思路)

什麼原因造成呢?看官網是這麼做的,看網上其他部落格,都能正常獲取到資料(他們為什麼這麼順利完成這個寫入 demo?)。
其實以為是自己的kafka,flink版本不一致,kafka是2.11的scala,flink,iceberg是2.12的,但也沒有程式碼報錯,我還是統一更改為2.12的scala版本,還是不行 (排除了版本不一致問題)。
還是說明原因呢?

把hive catalog改為hadoop catalog也不行(排除是hive的問題)。

是否是flink對應的kafka版本不一致呢? 換個kafka版本,校驗看看