1. 程式人生 > 實用技巧 >Flink之CEP(Complex Event Processing,複雜事件處理)的使用

Flink之CEP(Complex Event Processing,複雜事件處理)的使用

需要在pom匯入對應的依賴,如下所示:

<!-- flink中的CEP -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-cep-scala_2.11</artifactId>
    <version>1.10.2</version>
</dependency>

需要使用的樣例類如下所示:

case class Login(userId: String, ip: String, eventType: String, eventTime: String)

case class Warning(userId: Long, firstFailTime: Long, lastFailTime: Long, warningMsg: String)

在main函式中的程式碼如下所示:

// 建立執行環境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

// 生成資料,封裝成樣例類,並設定時間屬性
val loginEventStream: DataStream[Login] = env.fromCollection(List( Login("1", "192.168.0.1", "fail", "1558430842"), Login("1", "192.168.0.2", "fail", "1558430843"), Login("1", "192.168.0.3", "fail", "1558430844"), Login("2", "192.168.10.10", "success", "1558430845") )).assignAscendingTimestamps(_.eventTime.toLong)
/** * Flink的CEP中函式的使用說明: * 1、.bgein[]() 一個模式的開始樣式 * 2、.next() 緊跟著上一個樣式(中間不能有其他,嚴格近鄰) * 3、.followedBy() 不要緊跟上一個樣式(中間可以有其他,非嚴格近鄰) * 4、.where() 樣式的條件(傳入的引數為過濾的條件) * 5、.within() 時間限制(為Flink中的Time型別) */ // 定義模式 val loginFailPatten: Pattern[Login, Login] = Pattern .begin[Login]("begin").where(_.eventType == "fail") .next("next").where(_.eventType == "fail") .within(Time.seconds(10))
// 獲取流中符合模式的資料 val patternStream: PatternStream[Login] = CEP.pattern(loginEventStream.keyBy(_.userId), loginFailPatten) // 通過select將資料從patternStream中獲取出來,並可以封裝成需要的型別 val loginFailDataStream: DataStream[Warning] = patternStream.select(new PatternSelectFunction[Login, Warning] { override def select(pattern: util.Map[String, util.List[Login]]): Warning = { val first: Login = pattern.get("begin").get(0) val last: Login = pattern.get("next").get(0) Warning(first.userId.toLong, first.eventTime.toLong, last.eventTime.toLong, "login fail") } }) // 獲取超時資料(即在within時間之後,next中的資料還是沒有時),在呼叫select時,需要傳入一個側輸出流的標籤,會將超時資料放入側輸出流中 val overtimeTag: OutputTag[String] = new OutputTag[String]("overtime") val overtimeStream: DataStream[Warning] = patternStream.select( overtimeTag, new PatternTimeoutFunction[Login, String] { override def timeout(pattern: util.Map[String, util.List[Login]], timeoutTimestamp: Long): String = { "這是超時資料" } }, new PatternSelectFunction[Login, Warning] { override def select(pattern: util.Map[String, util.List[Login]]): Warning = { val first: Login = pattern.get("begin").get(0) val last: Login = pattern.get("next").get(0) Warning(first.userId.toLong, first.eventTime.toLong, last.eventTime.toLong, "login fail") } } ) // 列印資料 loginFailDataStream.print("loginFailDataStream") overtimeStream.print("overtimeStream") overtimeStream.getSideOutput(overtimeTag).print("overtimeTag") // 啟動執行器,執行任務 env.execute("CEPDemo")