1. 程式人生 > >Flink處理函式實戰之五:CoProcessFunction(雙流處理)

Flink處理函式實戰之五:CoProcessFunction(雙流處理)

### 歡迎訪問我的GitHub [https://github.com/zq2599/blog_demos](https://github.com/zq2599/blog_demos) 內容:所有原創文章分類彙總及配套原始碼,涉及Java、Docker、Kubernetes、DevOPS等; ### 歡迎訪問我的GitHub > 這裡分類和彙總了欣宸的全部原創(含配套原始碼):[https://github.com/zq2599/blog_demos](https://github.com/zq2599/blog_demos) ### Flink處理函式實戰系列連結 1. [深入瞭解ProcessFunction的狀態操作(Flink-1.10)](https://blog.csdn.net/boling_cavalry/article/details/106040312); 2. [ProcessFunction](https://xinchen.blog.csdn.net/article/details/106299035); 3. [KeyedProcessFunction類](https://xinchen.blog.csdn.net/article/details/106299167); 4. [ProcessAllWindowFunction(視窗處理)](https://xinchen.blog.csdn.net/article/details/106453229); 5. [CoProcessFunction(雙流處理)](https://xinchen.blog.csdn.net/article/details/109614001); ### 本篇概覽 - 本文是《Flink處理函式實戰》系列的第五篇,學習內容是如何同時處理兩個資料來源的資料; - 試想在面對兩個輸入流時,如果這兩個流的資料之間有業務關係,該如何編碼實現呢,例如下圖中的操作,同時監聽9998和9999埠,將收到的輸出分別處理後,再由同一個sink處理(列印): ![在這裡插入圖片描述](https://img2020.cnblogs.com/other/485422/202011/485422-20201124075108456-212078716.png) - Flink支援的方式是擴充套件CoProcessFunction來處理,為了更清楚認識,我們把KeyedProcessFunction和CoProcessFunction的類圖擺在一起看,如下所示: ![在這裡插入圖片描述](https://img2020.cnblogs.com/other/485422/202011/485422-20201124075108871-431161506.png) - 從上圖可見,CoProcessFunction和KeyedProcessFunction的繼承關係一樣,另外CoProcessFunction自身也很簡單,在processElement1和processElement2中分別處理兩個上游流入的資料即可,並且也支援定時器設定; ### 編碼實戰 接下來咱們開發一個應用來體驗CoProcessFunction,功能非常簡單,描述如下: 1. 建兩個資料來源,資料分別來自本地9998和9999埠; 2. 每個埠收到類似aaa,123這樣的資料,轉成Tuple2例項,f0是aaa,f1是123; 3. 在CoProcessFunction的實現類中,對每個資料來源的資料都打日誌,然後全部傳到下游運算元; 4. 下游操作是列印,因此9998和9999埠收到的所有資料都會在控制檯打印出來; 5. 整個demo的功能如下圖所示: ![在這裡插入圖片描述](https://img2020.cnblogs.com/other/485422/202011/485422-20201124075109264-230947484.png) - 接下來編碼實現上述功能; ### 原始碼下載 如果您不想寫程式碼,整個系列的原始碼可在GitHub下載到,地址和連結資訊如下表所示(https://github.com/zq2599/blog_demos): | 名稱 | 連結 | 備註| | :-------- | :----| :----| | 專案主頁| https://github.com/zq2599/blog_demos | 該專案在GitHub上的主頁 | | git倉庫地址(https)| https://github.com/zq2599/blog_demos.git | 該專案原始碼的倉庫地址,https協議 | | git倉庫地址(ssh)| [email protected]:zq2599/blog_demos.git | 該專案原始碼的倉庫地址,ssh協議 | 這個git專案中有多個資料夾,本章的應用在flinkstudy資料夾下,如下圖紅框所示: ![在這裡插入圖片描述](https://img2020.cnblogs.com/other/485422/202011/485422-20201124075109522-1735162736.png) ### Map運算元 1. 做一個map運算元,用來將字串aaa,123轉成Tuple2例項,f0是aaa,f1是123; 2. 運算元名為WordCountMap.java: ```java package com.bolingcavalry.coprocessfunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.StringUtils; public class WordCountMap implements Map