Flink使用函式之監控感測器溫度上升提醒
使用函式ProcessFunction介紹
Flink的ProcessFunction 函式是低階流處理運算元,可以訪問流應用程式所有(非迴圈)基本構建塊:
- 事件 (資料流元素)
- 狀態 (容錯和一致性)
- 定時器 (事件時間和處理時間)
DataStream API提供了ProcessFunction轉換運算元,可以訪問時間戳、註冊定時事件。輸出特定的一些事件。
ProcessFunction 是一種提供對 KeyedState 和定時器訪問的 FlatMapFunction。每在輸入流中接收到一個事件,就會呼叫來此函式來處理。對於容錯的狀態,ProcessFunction 可以通過 RuntimeContext 訪問 KeyedState。
Timers 定時器可以對處理時間和事件時間的變化做一些處理。每次呼叫 processElement() 都可以獲得一個 Context 物件,通過該物件可以訪問元素的事件時間戳以及 TimerService。TimerService 可以為尚未發生的事件時間/處理時間實註冊回撥。當定時器到達某個時刻時,會呼叫 onTimer() 方法。在呼叫期間,所有狀態再次限定為定時器建立的鍵,允許定時器操作 KeyedState。
ProcessFunction使用
模擬案例場景:感測器狀態資訊(這裡舉例:溫度)會隔幾秒上傳,如果這次上傳的溫度
比上一次上傳的溫度高,溫度10秒以內都是比較高,就進行持續的提示。
1、這裡感測器狀態訊息模擬從socket接收,訊息格式包含二個欄位分別是:感測器標識id,感測器溫度。
2、接收訊息之後對資料流按 感測器標識Id 進行分組,對於溫度比上一次高的訊息,設定定時器10秒以內如果都是對比上一次溫度高,則定時進行提示,如果10秒內對比上一次溫度低,則認為溫度沒有升高,則刪除定時器,取消提示。
感測器訊息類SensorReading,程式碼如下:
public class SensorReading { // 感測器 id public String sensorId; // 時間戳 public String timeStamp; // 溫度 public Double temperature; // 狀態描述 public String lowOrhigt; // 狀態標識 public String status; public SensorReading() {} public SensorReading(String sensorId,String timeStamp,Double temperature){ this.sensorId = sensorId; this.timeStamp = timeStamp; this.temperature = temperature; } public SensorReading(String sensorId,Double temperature,String status){ this.sensorId = sensorId; this.temperature = temperature; this.status=status; } public SensorReading(String sensorId,Double temperature){ this.sensorId = sensorId; this.temperature = temperature; } @Override public String toString() { return "SensorReading{" + "sensorId='" + sensorId + '\'' + ", timeStamp=" + timeStamp + ", temperature=" + temperature + ", lowOrhigt=" + lowOrhigt + ", status=" + status + '}'; } }
自定義TempIncreKeyedProcessFunction類繼承自KeyedProcessFunction,程式碼如下:
可仔細閱讀程式碼註釋
/**
* @author: Created By yanshien
* @company ChinaUnicom Software ysn
* @date: 2020-12-24 15:13
* @version: v1.0
* @description: 監控感測器溫度是否上升
**/
public class TempIncreKeyedProcessFunction extends KeyedProcessFunction<String, SensorReading, String> {
ValueState<Double> lastTempState; // 上一次溫度值
ValueState<Long> timerTsState; // 註冊定時器的時間戳
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
lastTempState=getRuntimeContext().getState(new ValueStateDescriptor<>("last-temp", Double.class));
timerTsState=getRuntimeContext().getState(new ValueStateDescriptor<>("timer-ts", Long.class));
}
@Override
public void processElement(SensorReading sensorReading, Context context, Collector<String> collector) throws Exception {
// 先取出上一次狀態
Double lastTemp = lastTempState.value();
Long timerTs = timerTsState.value();
// 更新溫度值
lastTempState.update(sensorReading.temperature);
// 當前溫度值和上次溫度進行比較,如果溫度上升,且沒有定時器,註冊當前時間10s後進行提醒
// System.out.println( sensorReading.temperature);
if (lastTemp == null) {
lastTemp = sensorReading.temperature;
}
if (timerTs == null) {
timerTs = 0L;
}
if( sensorReading.temperature > lastTemp && timerTs == 0 ){
// 註冊當前時間10s後的定時器
Long ts = context.timerService().currentProcessingTime() +10000L;
context.timerService().registerProcessingTimeTimer(ts);
timerTsState.update(ts);
// 如果溫度下降,那麼刪除定時器
} else if( sensorReading.temperature < lastTemp ){
context.timerService().deleteProcessingTimeTimer(timerTs);
timerTsState.clear();
}
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
out.collect("感測器" + ctx.getCurrentKey() + "的溫度連續" + 10000L/1000 + "秒連續上升");
timerTsState.clear();
}
}
主函式程式碼如下:
/**
* @author: Created By yanshien
* @company ChinaUnicom Software ysn
* @date: 2020-12-24 15:13
* @version: v1.0
* @description: 監控感測器溫度是否上升
**/
public class ProcessFunctionDemo {
public static void main(String[] args) throws Exception {
// 建立流處理的執行環境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
/*設定使用EventTime作為Flink的時間處理標準,不指定預設是ProcessTime*/
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// 這裡為了便於理解,設定並行度為1,預設並行度是當前機器的cpu數量
env.setParallelism(1);
// 指定資料來源 從socket的9000埠接收資料
// 在 cmd 開啟 nc -L -p 9000
DataStream<String> inputStream = env.socketTextStream("localhost", 9000);
DataStream<String> sourceDS = inputStream.filter(new FilterFunction<String>() {
@Override
public boolean filter(String line) throws Exception {
if (null == line || "".equals(line)) {
return false;
}
String[] lines = line.split(",");
if (lines.length != 2) {
return false;
}
return true;
}
});
// map轉換,將資料轉換成SensorReading格式,第一個欄位代表是感測器id,第二個欄位的代表的是感測器溫度
DataStream<String> warningDS = sourceDS.map(new MapFunction<String, SensorReading>() {
@Override
public SensorReading map(String line) throws Exception {
String[] lines = line.split(",");
return new SensorReading(lines[0], Double.valueOf(lines[1]));
}
}).keyBy(new KeySelector<SensorReading, String>() {
@Override
public String getKey(SensorReading value) throws Exception {
return value.sensorId;
}
}).process(new TempIncreKeyedProcessFunction());
// 列印提醒資訊
warningDS.print();
env.execute();
}
程式測試
在 cmd 開啟執行命令: nc -L -p 9000,然後執行程式,從socket端傳送資料如下:
sensor_1,29.7
sensor_1,30.9
sensor_1,32
測試10秒內傳送溫度比前一次高,則進行提醒輸出,如下圖所示:
如果覺得文章能幫到您,歡迎關注、轉發。