1. 程式人生 > >CoProcessFunction實戰三部曲之二:狀態處理

CoProcessFunction實戰三部曲之二:狀態處理

### 歡迎訪問我的GitHub [https://github.com/zq2599/blog_demos](https://github.com/zq2599/blog_demos) 內容:所有原創文章分類彙總及配套原始碼,涉及Java、Docker、Kubernetes、DevOPS等; ### 本篇概覽 - 本文是《CoProcessFunction實戰三部曲》的第二篇,咱們要實戰的是雙流連線場景下,處理一號流中的資料時,還要結合該key在二號流中的情況; - 最簡單的例子:aaa在一號流中的value和二號流的value相加,再輸出到下游,如下圖所示,一號流中的value存入state,在二號流中取出並相加,將結果輸出給下游: ![在這裡插入圖片描述](https://img2020.cnblogs.com/other/485422/202012/485422-20201204081248230-449583855.png) - 本篇的內容就是編碼實現上圖的功能; ### 參考文章 理解狀態:[《深入瞭解ProcessFunction的狀態操作(Flink-1.10)》](https://xinchen.blog.csdn.net/article/details/106040312) ### 原始碼下載 如果您不想寫程式碼,整個系列的原始碼可在GitHub下載到,地址和連結資訊如下表所示(https://github.com/zq2599/blog_demos): | 名稱 | 連結 | 備註| | :-------- | :----| :----| | 專案主頁| https://github.com/zq2599/blog_demos | 該專案在GitHub上的主頁 | | git倉庫地址(https)| https://github.com/zq2599/blog_demos.git | 該專案原始碼的倉庫地址,https協議 | | git倉庫地址(ssh)| [email protected]:zq2599/blog_demos.git | 該專案原始碼的倉庫地址,ssh協議 | 這個git專案中有多個資料夾,本章的應用在flinkstudy資料夾下,如下圖紅框所示: ![在這裡插入圖片描述](https://img2020.cnblogs.com/other/485422/202012/485422-20201204081248701-420602581.png) ### 編碼 1. 字串轉Tuple2的Map函式,以及抽象類AbstractCoProcessFunctionExecutor都和上一篇[《CoProcessFunction實戰三部曲之一:基本功能》](https://xinchen.blog.csdn.net/article/details/109624375)一模一樣; 2. 新增AbstractCoProcessFunctionExecutor的子類AddTwoSourceValue.java,原始碼如下,稍後會說明幾個關鍵點: ```java package com.bolingcavalry.coprocessfunction; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.co.CoProcessFunction; import org.apache.flink.util.Collector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * @author will * @email [email protected] * @date 2020-11-11 09:48 * @description 功能介紹 */ public class AddTwoSourceValue extends AbstractCoProcessFunctionExecutor { private static final Logger logger = LoggerFactory.getLogger(AddTwoSourceValue.class); @Override protected CoProcess