flink sql讀取kafka-入門級
阿新 • • 發佈:2020-12-24
目錄
環境
flink 1.11版本
Mac系統
下載相關jar包
flink sql讀取kafka需要相關的kafka依賴包,放到本地的lib目錄下,選擇下面這個:
cd /usr/local/Cellar/apache-flink/1.11.2/libexec/lib
cp ~/Downloads/flink-sql-connector-kafka_2.11-1.11.2.jar .
啟動flink叢集與flink sql
啟動flink叢集
/usr/local/Cellar/apache-flink/1.11.2/libexec/bin/start-cluster.sh
啟動flink sql
/usr/local/Cellar/apache-flink/1.11.2/libexec/bin/sql-client.sh embedded
使用tableau模式
SET execution.result-mode=tableau;
建立資料庫並選擇
create database if not exists river_test;
use river_test;
建立flink sql表結構
CREATE TABLE kafkaTable13 ( item_id BIGINT, source_type BIGINT, title STRING, white_image STRING, coupon_name STRING, schema STRING ) WITH ( 'connector' = 'kafka', 'topic' = 'test', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'testGroup', 'format' = 'json', 'scan.startup.mode' = 'latest-offset' );
查詢結果
select * from kafkaTable13;
通過相關flink UI介面,檢視任務: