1. 程式人生 > 資料庫 >flink sql讀取kafka-入門級

flink sql讀取kafka-入門級

目錄

環境

flink 1.11版本
Mac系統

下載相關jar包

flink sql讀取kafka需要相關的kafka依賴包,放到本地的lib目錄下,選擇下面這個:
在這裡插入圖片描述

cd /usr/local/Cellar/apache-flink/1.11.2/libexec/lib
cp ~/Downloads/flink-sql-connector-kafka_2.11-1.11.2.jar .

啟動flink叢集與flink sql

啟動flink叢集

/usr/local/Cellar/apache-flink/1.11.2/libexec/bin/start-cluster.sh

啟動flink sql

/usr/local/Cellar/apache-flink/1.11.2/libexec/bin/sql-client.sh embedded

使用tableau模式

SET execution.result-mode=tableau;

建立資料庫並選擇

create database if not exists river_test;
use river_test;

建立flink sql表結構

CREATE TABLE kafkaTable13 (
 item_id BIGINT,
 source_type BIGINT,
 title STRING,
 white_image STRING,
 coupon_name STRING,
 schema STRING
) WITH (
 'connector' = 'kafka',
 'topic' = 'test',
 'properties.bootstrap.servers' = 'localhost:9092',
 'properties.group.id' = 'testGroup',
 'format' = 'json',
 'scan.startup.mode' = 'latest-offset'
);

查詢結果

select * from kafkaTable13;

在這裡插入圖片描述
通過相關flink UI介面,檢視任務:
在這裡插入圖片描述