Flink的sink實戰之二:kafka
阿新 • • 發佈:2020-11-09
### 歡迎訪問我的GitHub
[https://github.com/zq2599/blog_demos](https://github.com/zq2599/blog_demos)
內容:所有原創文章分類彙總及配套原始碼,涉及Java、Docker、Kubernetes、DevOPS等;
### 本篇概覽
本文是《Flink的sink實戰》系列的第二篇,前文[《Flink的sink實戰之一:初探》](https://blog.csdn.net/boling_cavalry/article/details/105597628)對sink有了基本的瞭解,本章來體驗將資料sink到kafka的操作;
### 全系列連結
1. [《Flink的sink實戰之一:初探》](https://blog.csdn.net/boling_cavalry/article/details/105597628)
2. [《Flink的sink實戰之二:kafka》](https://blog.csdn.net/boling_cavalry/article/details/105598224)
3. [《Flink的sink實戰之三:cassandra3》](https://blog.csdn.net/boling_cavalry/article/details/105598968)
4. [《Flink的sink實戰之四:自定義》](https://blog.csdn.net/boling_cavalry/article/details/105599511)
### 版本和環境準備
本次實戰的環境和版本如下:
1. JDK:1.8.0_211
2. Flink:1.9.2
3. Maven:3.6.0
4. 作業系統:macOS Catalina 10.15.3 (MacBook Pro 13-inch, 2018)
5. IDEA:2018.3.5 (Ultimate Edition)
6. Kafka:2.4.0
7. Zookeeper:3.5.5
請確保上述環境和服務已經就緒;
### 原始碼下載
如果您不想寫程式碼,整個系列的原始碼可在GitHub下載到,地址和連結資訊如下表所示(https://github.com/zq2599/blog_demos):
| 名稱 | 連結 | 備註|
| :-------- | :----| :----|
| 專案主頁| https://github.com/zq2599/blog_demos | 該專案在GitHub上的主頁 |
| git倉庫地址(https)| https://github.com/zq2599/blog_demos.git | 該專案原始碼的倉庫地址,https協議 |
| git倉庫地址(ssh)| [email protected]:zq2599/blog_demos.git | 該專案原始碼的倉庫地址,ssh協議 |
這個git專案中有多個資料夾,本章的應用在flinksinkdemo資料夾下,如下圖紅框所示:
![在這裡插入圖片描述](https://img2020.cnblogs.com/other/485422/202011/485422-20201109081046069-1123443178.png)
準備完畢,開始開發;
### 準備工作
正式編碼前,先去官網檢視相關資料瞭解基本情況:
1. 地址:https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/connectors/kafka.html
2. 我這裡用的kafka是2.4.0版本,在官方文件查詢對應的庫和類,如下圖紅框所示:
![在這裡插入圖片描述](https://img2020.cnblogs.com/other/485422/202011/485422-20201109081046570-900671175.png)
### kafka準備
1. 建立名為test006的topic,有四個分割槽,參考命令:
```shell
./kafka-topics.sh \
--create \
--bootstrap-server 127.0.0.1:9092 \
--replication-factor 1 \
--partitions 4 \
--topic test006
```
2. 在控制檯消費test006的訊息,參考命令:
```shell
./kafka-console-consumer.sh \
--bootstrap-server 127.0.0.1:9092 \
--topic test006
```
3. 此時如果該topic有訊息進來,就會在控制檯輸出;
4. 接下來開始編碼;
### 建立工程
1. 用maven命令建立flink工程:
```shell
mvn \
archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-java \
-DarchetypeVersion=1.9.2
```
2. 根據提示,groupid輸入com.bolingcavalry,artifactid輸入flinksinkdemo,即可建立一個maven工程;
3. 在pom.xml中增加kafka依賴庫:
```xml
```
4. 工程建立完成,開始編寫flink任務的程式碼;
### 傳送字串訊息的sink
先嚐試傳送字串型別的訊息:
1. 建立KafkaSerializationSchema介面的實現類,後面這個類要作為建立sink物件的引數使用:
```java
package com.bolingcavalry.addsink;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.nio.charset.StandardCharsets;
public class ProducerStringSerializationSchema implements KafkaSerializati