1. 程式人生 > 其它 >Flink使用函式之監控感測器溫度上升提醒

Flink使用函式之監控感測器溫度上升提醒

技術標籤:大資料之Flinkflink大資料實時大資料

使用函式ProcessFunction介紹

Flink的ProcessFunction 函式是低階流處理運算元,可以訪問流應用程式所有(非迴圈)基本構建塊:

  • 事件 (資料流元素)
  • 狀態 (容錯和一致性)
  • 定時器 (事件時間和處理時間)

DataStream API提供了ProcessFunction轉換運算元,可以訪問時間戳、註冊定時事件。輸出特定的一些事件。

ProcessFunction 是一種提供對 KeyedState 和定時器訪問的 FlatMapFunction。每在輸入流中接收到一個事件,就會呼叫來此函式來處理。對於容錯的狀態,ProcessFunction 可以通過 RuntimeContext 訪問 KeyedState。

Timers 定時器可以對處理時間和事件時間的變化做一些處理。每次呼叫 processElement() 都可以獲得一個 Context 物件,通過該物件可以訪問元素的事件時間戳以及 TimerService。TimerService 可以為尚未發生的事件時間/處理時間實註冊回撥。當定時器到達某個時刻時,會呼叫 onTimer() 方法。在呼叫期間,所有狀態再次限定為定時器建立的鍵,允許定時器操作 KeyedState。

ProcessFunction使用

模擬案例場景:感測器狀態資訊(這裡舉例:溫度)會隔幾秒上傳,如果這次上傳的溫度

比上一次上傳的溫度高,溫度10秒以內都是比較高,就進行持續的提示。

1、這裡感測器狀態訊息模擬從socket接收,訊息格式包含二個欄位分別是:感測器標識id,感測器溫度。

2、接收訊息之後對資料流按 感測器標識Id 進行分組,對於溫度比上一次高的訊息,設定定時器10秒以內如果都是對比上一次溫度高,則定時進行提示,如果10秒內對比上一次溫度低,則認為溫度沒有升高,則刪除定時器,取消提示。

感測器訊息類SensorReading,程式碼如下:

public class SensorReading {

    // 感測器 id
    public String sensorId;
    // 時間戳
    public String timeStamp;
    // 溫度
    public Double temperature;

    // 狀態描述
    public  String lowOrhigt;

    // 狀態標識
    public String status;

    public SensorReading() {}
    public  SensorReading(String sensorId,String timeStamp,Double temperature){
        this.sensorId = sensorId;
        this.timeStamp = timeStamp;
        this.temperature = temperature;
    }

    public  SensorReading(String sensorId,Double temperature,String status){
        this.sensorId = sensorId;
        this.temperature = temperature;
        this.status=status;
    }

    public  SensorReading(String sensorId,Double temperature){
        this.sensorId = sensorId;
        this.temperature = temperature;
    }

    @Override
    public String toString() {
        return "SensorReading{" +
                "sensorId='" + sensorId + '\'' +
                ", timeStamp=" + timeStamp +
                ", temperature=" + temperature +
                ", lowOrhigt=" + lowOrhigt +
                ", status=" + status +
                '}';
    }
}

自定義TempIncreKeyedProcessFunction類繼承自KeyedProcessFunction,程式碼如下:

可仔細閱讀程式碼註釋

/**
 * @author: Created By yanshien
 * @company ChinaUnicom Software ysn
 * @date: 2020-12-24 15:13
 * @version: v1.0
 * @description: 監控感測器溫度是否上升
 **/
public class TempIncreKeyedProcessFunction extends KeyedProcessFunction<String, SensorReading, String> {

    ValueState<Double> lastTempState;  // 上一次溫度值
    ValueState<Long> timerTsState;    //  註冊定時器的時間戳

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        lastTempState=getRuntimeContext().getState(new ValueStateDescriptor<>("last-temp", Double.class));
        timerTsState=getRuntimeContext().getState(new ValueStateDescriptor<>("timer-ts", Long.class));
    }

    @Override
    public void processElement(SensorReading sensorReading, Context context, Collector<String> collector) throws Exception {
        // 先取出上一次狀態
        Double lastTemp = lastTempState.value();
        Long timerTs = timerTsState.value();

        // 更新溫度值
        lastTempState.update(sensorReading.temperature);

        // 當前溫度值和上次溫度進行比較,如果溫度上升,且沒有定時器,註冊當前時間10s後進行提醒
        // System.out.println( sensorReading.temperature);
        if (lastTemp == null) {
            lastTemp = sensorReading.temperature;
        }
        if (timerTs == null) {
            timerTs = 0L;
        }

        if( sensorReading.temperature > lastTemp && timerTs == 0 ){
            // 註冊當前時間10s後的定時器
            Long ts = context.timerService().currentProcessingTime()  +10000L;
            context.timerService().registerProcessingTimeTimer(ts);
            timerTsState.update(ts);

        // 如果溫度下降,那麼刪除定時器
        } else if( sensorReading.temperature < lastTemp ){
            context.timerService().deleteProcessingTimeTimer(timerTs);
            timerTsState.clear();
        }

    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
        out.collect("感測器" + ctx.getCurrentKey() + "的溫度連續" + 10000L/1000 + "秒連續上升");
        timerTsState.clear();
    }
}

主函式程式碼如下:

/**
 * @author: Created By yanshien
 * @company ChinaUnicom Software ysn
 * @date: 2020-12-24 15:13
 * @version: v1.0
 * @description: 監控感測器溫度是否上升
 **/
public class ProcessFunctionDemo {

    public static void main(String[] args) throws Exception {

        // 建立流處理的執行環境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        /*設定使用EventTime作為Flink的時間處理標準,不指定預設是ProcessTime*/
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        // 這裡為了便於理解,設定並行度為1,預設並行度是當前機器的cpu數量
        env.setParallelism(1);

        // 指定資料來源 從socket的9000埠接收資料
        // 在 cmd 開啟 nc -L -p 9000
         DataStream<String> inputStream = env.socketTextStream("localhost", 9000);

        DataStream<String> sourceDS = inputStream.filter(new FilterFunction<String>() {
                    @Override
                    public boolean filter(String line) throws Exception {
                        if (null == line || "".equals(line)) {
                            return false;
                        }
                        String[] lines = line.split(",");
                        if (lines.length != 2) {
                            return false;
                        }
                        return true;
                    }
                });

        // map轉換,將資料轉換成SensorReading格式,第一個欄位代表是感測器id,第二個欄位的代表的是感測器溫度
        DataStream<String> warningDS = sourceDS.map(new MapFunction<String, SensorReading>() {
            @Override
            public SensorReading map(String line) throws Exception {
                String[] lines = line.split(",");
                return new SensorReading(lines[0], Double.valueOf(lines[1]));
            }
        }).keyBy(new KeySelector<SensorReading, String>() {
            @Override
            public String getKey(SensorReading value) throws Exception {
                return value.sensorId;
            }
        }).process(new TempIncreKeyedProcessFunction());

        // 列印提醒資訊
        warningDS.print();

        env.execute();


    }

程式測試

在 cmd 開啟執行命令: nc -L -p 9000,然後執行程式,從socket端傳送資料如下:

sensor_1,29.7
sensor_1,30.9
sensor_1,32

測試10秒內傳送溫度比前一次高,則進行提醒輸出,如下圖所示:

如果覺得文章能幫到您,歡迎關注、轉發。