Flink處理函式實戰之五:CoProcessFunction(雙流處理)
阿新 • • 發佈:2020-11-24
### 歡迎訪問我的GitHub
[https://github.com/zq2599/blog_demos](https://github.com/zq2599/blog_demos)
內容:所有原創文章分類彙總及配套原始碼,涉及Java、Docker、Kubernetes、DevOPS等;
### 歡迎訪問我的GitHub
> 這裡分類和彙總了欣宸的全部原創(含配套原始碼):[https://github.com/zq2599/blog_demos](https://github.com/zq2599/blog_demos)
### Flink處理函式實戰系列連結
1. [深入瞭解ProcessFunction的狀態操作(Flink-1.10)](https://blog.csdn.net/boling_cavalry/article/details/106040312);
2. [ProcessFunction](https://xinchen.blog.csdn.net/article/details/106299035);
3. [KeyedProcessFunction類](https://xinchen.blog.csdn.net/article/details/106299167);
4. [ProcessAllWindowFunction(視窗處理)](https://xinchen.blog.csdn.net/article/details/106453229);
5. [CoProcessFunction(雙流處理)](https://xinchen.blog.csdn.net/article/details/109614001);
### 本篇概覽
- 本文是《Flink處理函式實戰》系列的第五篇,學習內容是如何同時處理兩個資料來源的資料;
- 試想在面對兩個輸入流時,如果這兩個流的資料之間有業務關係,該如何編碼實現呢,例如下圖中的操作,同時監聽9998和9999埠,將收到的輸出分別處理後,再由同一個sink處理(列印):
![在這裡插入圖片描述](https://img2020.cnblogs.com/other/485422/202011/485422-20201124075108456-212078716.png)
- Flink支援的方式是擴充套件CoProcessFunction來處理,為了更清楚認識,我們把KeyedProcessFunction和CoProcessFunction的類圖擺在一起看,如下所示:
![在這裡插入圖片描述](https://img2020.cnblogs.com/other/485422/202011/485422-20201124075108871-431161506.png)
- 從上圖可見,CoProcessFunction和KeyedProcessFunction的繼承關係一樣,另外CoProcessFunction自身也很簡單,在processElement1和processElement2中分別處理兩個上游流入的資料即可,並且也支援定時器設定;
### 編碼實戰
接下來咱們開發一個應用來體驗CoProcessFunction,功能非常簡單,描述如下:
1. 建兩個資料來源,資料分別來自本地9998和9999埠;
2. 每個埠收到類似aaa,123這樣的資料,轉成Tuple2例項,f0是aaa,f1是123;
3. 在CoProcessFunction的實現類中,對每個資料來源的資料都打日誌,然後全部傳到下游運算元;
4. 下游操作是列印,因此9998和9999埠收到的所有資料都會在控制檯打印出來;
5. 整個demo的功能如下圖所示:
![在這裡插入圖片描述](https://img2020.cnblogs.com/other/485422/202011/485422-20201124075109264-230947484.png)
- 接下來編碼實現上述功能;
### 原始碼下載
如果您不想寫程式碼,整個系列的原始碼可在GitHub下載到,地址和連結資訊如下表所示(https://github.com/zq2599/blog_demos):
| 名稱 | 連結 | 備註|
| :-------- | :----| :----|
| 專案主頁| https://github.com/zq2599/blog_demos | 該專案在GitHub上的主頁 |
| git倉庫地址(https)| https://github.com/zq2599/blog_demos.git | 該專案原始碼的倉庫地址,https協議 |
| git倉庫地址(ssh)| [email protected]:zq2599/blog_demos.git | 該專案原始碼的倉庫地址,ssh協議 |
這個git專案中有多個資料夾,本章的應用在flinkstudy資料夾下,如下圖紅框所示:
![在這裡插入圖片描述](https://img2020.cnblogs.com/other/485422/202011/485422-20201124075109522-1735162736.png)
### Map運算元
1. 做一個map運算元,用來將字串aaa,123轉成Tuple2例項,f0是aaa,f1是123;
2. 運算元名為WordCountMap.java:
```java
package com.bolingcavalry.coprocessfunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.StringUtils;
public class WordCountMap implements Map