CoProcessFunction實戰三部曲之二:狀態處理
阿新 • • 發佈:2020-12-04
### 歡迎訪問我的GitHub
[https://github.com/zq2599/blog_demos](https://github.com/zq2599/blog_demos)
內容:所有原創文章分類彙總及配套原始碼,涉及Java、Docker、Kubernetes、DevOPS等;
### 本篇概覽
- 本文是《CoProcessFunction實戰三部曲》的第二篇,咱們要實戰的是雙流連線場景下,處理一號流中的資料時,還要結合該key在二號流中的情況;
- 最簡單的例子:aaa在一號流中的value和二號流的value相加,再輸出到下游,如下圖所示,一號流中的value存入state,在二號流中取出並相加,將結果輸出給下游:
![在這裡插入圖片描述](https://img2020.cnblogs.com/other/485422/202012/485422-20201204081248230-449583855.png)
- 本篇的內容就是編碼實現上圖的功能;
### 參考文章
理解狀態:[《深入瞭解ProcessFunction的狀態操作(Flink-1.10)》](https://xinchen.blog.csdn.net/article/details/106040312)
### 原始碼下載
如果您不想寫程式碼,整個系列的原始碼可在GitHub下載到,地址和連結資訊如下表所示(https://github.com/zq2599/blog_demos):
| 名稱 | 連結 | 備註|
| :-------- | :----| :----|
| 專案主頁| https://github.com/zq2599/blog_demos | 該專案在GitHub上的主頁 |
| git倉庫地址(https)| https://github.com/zq2599/blog_demos.git | 該專案原始碼的倉庫地址,https協議 |
| git倉庫地址(ssh)| [email protected]:zq2599/blog_demos.git | 該專案原始碼的倉庫地址,ssh協議 |
這個git專案中有多個資料夾,本章的應用在flinkstudy資料夾下,如下圖紅框所示:
![在這裡插入圖片描述](https://img2020.cnblogs.com/other/485422/202012/485422-20201204081248701-420602581.png)
### 編碼
1. 字串轉Tuple2的Map函式,以及抽象類AbstractCoProcessFunctionExecutor都和上一篇[《CoProcessFunction實戰三部曲之一:基本功能》](https://xinchen.blog.csdn.net/article/details/109624375)一模一樣;
2. 新增AbstractCoProcessFunctionExecutor的子類AddTwoSourceValue.java,原始碼如下,稍後會說明幾個關鍵點:
```java
package com.bolingcavalry.coprocessfunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author will
* @email [email protected]
* @date 2020-11-11 09:48
* @description 功能介紹
*/
public class AddTwoSourceValue extends AbstractCoProcessFunctionExecutor {
private static final Logger logger = LoggerFactory.getLogger(AddTwoSourceValue.class);
@Override
protected CoProcess