CoProcessFunction實戰三部曲之一:基本功能
阿新 • • 發佈:2020-11-30
### 歡迎訪問我的GitHub
[https://github.com/zq2599/blog_demos](https://github.com/zq2599/blog_demos)
內容:所有原創文章分類彙總及配套原始碼,涉及Java、Docker、Kubernetes、DevOPS等;
### 關於《CoProcessFunction實戰三部曲》系列
- 《CoProcessFunction實戰三部曲》旨在通過三次實戰,由淺入深的學習和掌握Flink低階處理函式CoProcessFunction的用法;
- 整個系列的開篇先介紹CoProcessFunction,然後迅速進入實戰,瞭解CoProcessFunction的基本功能;
- 下一篇會結合狀態,讓雙流元素的處理彼此保持關係;
- 終篇的實戰會加入定時器功能,確保同一個key的資料在雙流場景下能夠及時處理;
### 版本資訊
1. 開發環境作業系統:MacBook Pro 13寸, macOS Catalina 10.15.3
2. 開發工具:IDEA ULTIMATE 2018.3
3. JDK:1.8.0_211
4. Maven:3.6.0
5. Flink:1.9.2
### 系列文章連結
1. [基本功能](https://xinchen.blog.csdn.net/article/details/109624375)
2. [狀態處理](https://xinchen.blog.csdn.net/article/details/109629119)
3. [定時器和側輸出](https://xinchen.blog.csdn.net/article/details/109645214)
### 關於CoProcessFunction
- CoProcessFunction的作用是同時處理兩個資料來源的資料;
- 試想在面對兩個輸入流時,如果這兩個流的資料之間有業務關係,該如何編碼實現呢,例如下圖中的操作,同時監聽9998和9999埠,將收到的輸出分別處理後,再由同一個sink處理(列印):
![在這裡插入圖片描述](https://img2020.cnblogs.com/other/485422/202011/485422-20201130075837289-720933326.png)
- Flink支援的方式是擴充套件CoProcessFunction來處理,為了更清楚認識,我們把KeyedProcessFunction和CoProcessFunction的類圖擺在一起看,如下所示:
![在這裡插入圖片描述](https://img2020.cnblogs.com/other/485422/202011/485422-20201130075837678-1470633056.png)
- 從上圖可見,CoProcessFunction和KeyedProcessFunction的繼承關係一樣,另外CoProcessFunction自身也很簡單,在processElement1和processElement2中分別處理兩個上游流入的資料即可,並且也支援定時器設定;
### 本篇實戰功能簡介
本篇咱們要開發的應用,其功能非常簡單,描述如下:
1. 建兩個資料來源,資料分別來自本地9998和9999埠;
2. 每個埠收到類似aaa,123這樣的資料,轉成Tuple2例項,f0是aaa,f1是123;
3. 在CoProcessFunction的實現類中,對每個資料來源的資料都打日誌,然後全部傳到下游運算元;
4. 下游操作是列印,因此9998和9999埠收到的所有資料都會在控制檯打印出來;
5. 整個demo的功能如下圖所示:
![在這裡插入圖片描述](https://img2020.cnblogs.com/other/485422/202011/485422-20201130075837972-465920736.png)
- 接下來開始編碼;
### 原始碼下載
如果您不想寫程式碼,整個系列的原始碼可在GitHub下載到,地址和連結資訊如下表所示(https://github.com/zq2599/blog_demos):
| 名稱 | 連結 | 備註|
| :-------- | :----| :----|
| 專案主頁| https://github.com/zq2599/blog_demos | 該專案在GitHub上的主頁 |
| git倉庫地址(https)| https://github.com/zq2599/blog_demos.git | 該專案原始碼的倉庫地址,https協議 |
| git倉庫地址(ssh)| [email protected]:zq2599/blog_demos.git | 該專案原始碼的倉庫地址,ssh協議 |
這個git專案中有多個資料夾,本章的應用在flinkstudy資料夾下,如下圖紅框所示:
![在這裡插入圖片描述](https://img2020.cnblogs.com/other/485422/202011/485422-20201130075838383-357010033.png)
### 程式碼簡介
1. 開發一個Map運算元,將字串轉成Tuple2;
2. 再開發抽象類AbstractCoProcessFunctionExecutor,功能包括:flink啟動、監聽埠、呼叫運算元處理資料、雙流連線、將雙流處理結果打印出來;
3. 從上面的描述可見,AbstractCoProcessFunctionExecutor做了很多事情,唯獨沒有實現雙流連線後的具體業務邏輯,這些沒有做的是留給子類來實現的,整個三部曲系列的重點都集中在AbstractCoProcessFunctionExecutor的子類上,把雙流連線後的業務邏輯做好,如下圖所示,紅色為CoProcessFunction的業務程式碼,其他的都在抽象類中完成:
![在這裡插入圖片描述](https://img2020.cnblogs.com/other/485422/202011/485422-20201130075838646-1963831744.png)
### Map運算元
1. 做一個map運算元,用來將字串aaa,123轉成Tuple2例項,f0是aaa,f1是123;
2. 運算元名為WordCountMap.java:
```java
package com.bolingcavalry.coprocessfunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.StringUtils;
public class WordCountMap implements Map