1. 程式人生 > >Flink處理函式實戰之三:KeyedProcessFunction類

Flink處理函式實戰之三:KeyedProcessFunction類

### 歡迎訪問我的GitHub [https://github.com/zq2599/blog_demos](https://github.com/zq2599/blog_demos) 內容:所有原創文章分類彙總及配套原始碼,涉及Java、Docker、Kubernetes、DevOPS等; ### Flink處理函式實戰系列連結 1. [深入瞭解ProcessFunction的狀態操作(Flink-1.10)](https://blog.csdn.net/boling_cavalry/article/details/106040312); 2. [ProcessFunction](https://xinchen.blog.csdn.net/article/details/106299035); 3. [KeyedProcessFunction類](https://xinchen.blog.csdn.net/article/details/106299167); 4. [ProcessAllWindowFunction(視窗處理)](https://xinchen.blog.csdn.net/article/details/106453229); 5. [CoProcessFunction(雙流處理)](https://xinchen.blog.csdn.net/article/details/109614001); ### 本篇概覽 本文是《Flink處理函式實戰》系列的第三篇,上一篇[《Flink處理函式實戰之二:ProcessFunction類》](https://blog.csdn.net/boling_cavalry/article/details/106299035)學習了最簡單的ProcessFunction類,今天要了解的KeyedProcessFunction,以及該類帶來的一些特性; ### 關於KeyedProcessFunction 通過對比類圖可以確定,KeyedProcessFunction和ProcessFunction並無直接關係: ![在這裡插入圖片描述](https://img2020.cnblogs.com/other/485422/202011/485422-20201121091101156-1840760553.png) KeyedProcessFunction用於處理KeyedStream的資料集合,相比ProcessFunction類,KeyedProcessFunction擁有更多特性,官方文件如下圖紅框,狀態處理和定時器功能都是KeyedProcessFunction才有的: ![在這裡插入圖片描述](https://img2020.cnblogs.com/other/485422/202011/485422-20201121091101608-1755025123.png) 介紹完畢,接下來通過例項來學習吧; ### 版本資訊 1. 開發環境作業系統:MacBook Pro 13寸, macOS Catalina 10.15.3 2. 開發工具:IDEA ULTIMATE 2018.3 3. JDK:1.8.0_211 4. Maven:3.6.0 5. Flink:1.9.2 ### 原始碼下載 如果您不想寫程式碼,整個系列的原始碼可在GitHub下載到,地址和連結資訊如下表所示(https://github.com/zq2599/blog_demos): | 名稱 | 連結 | 備註| | :-------- | :----| :----| | 專案主頁| https://github.com/zq2599/blog_demos | 該專案在GitHub上的主頁 | | git倉庫地址(https)| https://github.com/zq2599/blog_demos.git | 該專案原始碼的倉庫地址,https協議 | | git倉庫地址(ssh)| [email protected]:zq2599/blog_demos.git | 該專案原始碼的倉庫地址,ssh協議 | 這個git專案中有多個資料夾,本章的應用在flinkstudy資料夾下,如下圖紅框所示: ![在這裡插入圖片描述](https://img2020.cnblogs.com/other/485422/202011/485422-20201121091102025-1803904405.png) ### 實戰簡介 本次實戰的目標是學習KeyedProcessFunction,內容如下: 1. 監聽本機9999埠,獲取字串; 2. 將每個字串用空格分隔,轉成Tuple2例項,f0是分隔後的單詞,f1等於1; 3. 上述Tuple2例項用f0欄位分割槽,得到KeyedStream; 4. KeyedSteam轉入自定義KeyedProcessFunction處理; 5. 自定義KeyedProcessFunction的作用,是記錄每個單詞最新一次出現的時間,然後建一個十秒的定時器,十秒後如果發現這個單詞沒有再次出現,就把這個單詞和它出現的總次數傳送到下游運算元; ### 編碼 1. 繼續使用[《Flink處理函式實戰之二:ProcessFunction類》](https://blog.csdn.net/boling_cavalry/article/details/106299035)一文中建立的工程flinkstudy; 2. 建立bean類CountWithTimestamp,裡面有三個欄位,為了方便使用直接設為public: ```java package com.bolingcavalry.keyedprocessfunction; public class CountWithTimestamp { public String key; public long count; public long lastModified; } ``` 3. 建立FlatMapFunction的實現類Splitter,作用是將字串分割後生成多個Tuple2例項,f0是分隔後的單詞,f1等於1: ```java package com.bolingcavalry; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector; import org.apache.flink.util.StringUtils; public class Splitter implements FlatMap