Flink處理函式實戰之二:ProcessFunction類
阿新 • • 發佈:2020-11-20
### 歡迎訪問我的GitHub
[https://github.com/zq2599/blog_demos](https://github.com/zq2599/blog_demos)
內容:所有原創文章分類彙總及配套原始碼,涉及Java、Docker、Kubernetes、DevOPS等;
### Flink處理函式實戰系列連結
1. [深入瞭解ProcessFunction的狀態操作(Flink-1.10)](https://blog.csdn.net/boling_cavalry/article/details/106040312);
2. [ProcessFunction](https://xinchen.blog.csdn.net/article/details/106299035);
3. [KeyedProcessFunction類](https://xinchen.blog.csdn.net/article/details/106299167);
4. [ProcessAllWindowFunction(視窗處理)](https://xinchen.blog.csdn.net/article/details/106453229);
5. [CoProcessFunction(雙流處理)](https://xinchen.blog.csdn.net/article/details/109614001);
### 關於處理函式(Process Function)
如下圖,在常規的業務開發中,SQL、Table API、DataStream API比較常用,處於Low-level的Porcession相對用得較少,從本章開始,我們一起通過實戰來熟悉處理函式(Process Function),看看這一系列的低階運算元可以帶給我們哪些能力?
![在這裡插入圖片描述](https://img2020.cnblogs.com/other/485422/202011/485422-20201120092347364-432672709.png)
### 關於ProcessFunction類
處理函式有很多種,最基礎的應該ProcessFunction類,來看看它的類圖,可見有RichFunction的特性open、close,然後自己有兩個重要的方法processElement和onTimer:
![在這裡插入圖片描述](https://img2020.cnblogs.com/other/485422/202011/485422-20201120092347965-2008655427.png)
常用特性如下所示:
1. 處理單個元素;
2. 訪問時間戳;
3. 旁路輸出;
接下來寫兩個應用體驗上述功能;
### 版本資訊
1. 開發環境作業系統:MacBook Pro 13寸, macOS Catalina 10.15.3
2. 開發工具:IDEA ULTIMATE 2018.3
3. JDK:1.8.0_211
4. Maven:3.6.0
5. Flink:1.9.2
### 原始碼下載
如果您不想寫程式碼,整個系列的原始碼可在GitHub下載到,地址和連結資訊如下表所示(https://github.com/zq2599/blog_demos):
| 名稱 | 連結 | 備註|
| :-------- | :----| :----|
| 專案主頁| https://github.com/zq2599/blog_demos | 該專案在GitHub上的主頁 |
| git倉庫地址(https)| https://github.com/zq2599/blog_demos.git | 該專案原始碼的倉庫地址,https協議 |
| git倉庫地址(ssh)| [email protected]:zq2599/blog_demos.git | 該專案原始碼的倉庫地址,ssh協議 |
這個git專案中有多個資料夾,本章的應用在flinkstudy資料夾下,如下圖紅框所示:
![在這裡插入圖片描述](https://img2020.cnblogs.com/other/485422/202011/485422-20201120092348447-1449527323.png)
### 建立工程
執行以下命令建立一個flink-1.9.2的應用工程:
```shell
mvn \
archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-java \
-DarchetypeVersion=1.9.2
```
按提示輸入groupId:com.bolingcavalry,architectid:flinkdemo
### 第一個demo
第一個demo用來體驗以下兩個特性:
1. 處理單個元素;
2. 訪問時間戳;
建立Simple.java,內容如下:
```java
package com.bolingcavalry.processfunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;
public class Simple {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// 並行度為1
env.setParallelism(1);
// 設定資料來源,一共三個元素
Da