Flink的DataSource三部曲之三:自定義
阿新 • • 發佈:2020-11-07
### 歡迎訪問我的GitHub
[https://github.com/zq2599/blog_demos](https://github.com/zq2599/blog_demos)
內容:所有原創文章分類彙總及配套原始碼,涉及Java、Docker、Kubernetes、DevOPS等;
### 本篇概覽
本文是《Flink的DataSource三部曲》的終篇,前面都是在學習Flink已有的資料來源功能,但如果這些不能滿足需要,就要自定義資料來源(例如從資料庫獲取資料),也就是今天實戰的內容,如下圖紅框所示:
![在這裡插入圖片描述](https://img2020.cnblogs.com/other/485422/202011/485422-20201107100142690-985742068.png)
### Flink的DataSource三部曲文章連結
1. [《Flink的DataSource三部曲之一:直接API》](https://blog.csdn.net/boling_cavalry/article/details/105467076)
2. [《Flink的DataSource三部曲之二:內建connector》](https://blog.csdn.net/boling_cavalry/article/details/105471798)
3. [《Flink的DataSource三部曲之三:自定義》](https://blog.csdn.net/boling_cavalry/article/details/105472218)
### 環境和版本
本次實戰的環境和版本如下:
1. JDK:1.8.0_211
2. Flink:1.9.2
3. Maven:3.6.0
4. 作業系統:macOS Catalina 10.15.3 (MacBook Pro 13-inch, 2018)
5. IDEA:2018.3.5 (Ultimate Edition)
### 在伺服器上搭建Flink服務
1. 前面兩章的程式都是在IDEA上執行的,本章需要通過Flink的web ui觀察執行結果,因此要單獨部署Flink服務,我這裡是在CentOS環境通過docker-compose部署的,以下是docker-compose.yml的內容,用於參考:
```yml
version: "2.1"
services:
jobmanager:
image: flink:1.9.2-scala_2.12
expose:
- "6123"
ports:
- "8081:8081"
command: jobmanager
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
taskmanager1:
image: flink:1.9.2-scala_2.12
expose:
- "6121"
- "6122"
depends_on:
- jobmanager
command: taskmanager
links:
- "jobmanager:jobmanager"
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
taskmanager2:
image: flink:1.9.2-scala_2.12
expose:
- "6121"
- "6122"
depends_on:
- jobmanager
command: taskmanager
links:
- "jobmanager:jobmanager"
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
```
2. 下圖是我的Flink情況,有兩個Task Maganer,共八個Slot全部可用:
![在這裡插入圖片描述](https://img2020.cnblogs.com/other/485422/202011/485422-20201107100142972-113772455.png)
### 原始碼下載
如果您不想寫程式碼,整個系列的原始碼可在GitHub下載到,地址和連結資訊如下表所示(https://github.com/zq2599/blog_demos):
| 名稱 | 連結 | 備註|
| :-------- | :----| :----|
| 專案主頁| https://github.com/zq2599/blog_demos | 該專案在GitHub上的主頁 |
| git倉庫地址(https)| https://github.com/zq2599/blog_demos.git | 該專案原始碼的倉庫地址,https協議 |
| git倉庫地址(ssh)| [email protected]:zq2599/blog_demos.git | 該專案原始碼的倉庫地址,ssh協議 |
這個git專案中有多個資料夾,本章的應用在flinkdatasourcedemo資料夾下,如下圖紅框所示:
![在這裡插入圖片描述](https://img2020.cnblogs.com/other/485422/202011/485422-20201107100143262-44184397.png)
準備完畢,開始開發;
### 實現SourceFunctionDemo介面的DataSource
1. 從最簡單的開始,開發一個不可並行的資料來源並驗證;
2. 實現SourceFunction介面,在工程flinkdatasourcedemo中增加SourceFunctionDemo.java:
```java
package com.bolingcavalry.customize;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
public class SourceFunctionDemo {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//並行度為2
env.setParallelism(2);
Da