大資料實戰(六):flink(六) 電商使用者行為分析(六)惡意登入監控
阿新 • • 發佈:2020-08-12
1 模組建立和資料準備
繼續在 UserBehaviorAnalysis 下新建一個 maven module 作為子專案,命名為LoginFailDetect。在這個子模組中,我們將會用到 flink 的 CEP 庫來實現事件流的模 式匹配,所以需要在 pom 檔案中引入 CEP 的相關依賴:<dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-cep-scala_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> </dependencies>
packagecom.atguigu import org.apache.flink.api.common.state.{ListState, ListStateDescriptor, ValueState, ValueStateDescriptor} import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.KeyedProcessFunction import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractorimport org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.util.Collector import scala.collection.mutable.ListBuffer //定義輸入輸出類 case class LoginEvent(userId:Long, ip:String, eventType:String, eventTime: Long) case class Warning(userId:Long, firstFailTime:Long, lastFailTime:Long, warningMsg:String) object LoginFail { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val resource = getClass.getResource("/LoginLog.csv") //val loginEventStream:DataStream[LoginEvent] = env.readTextFile(resource.getPath) val loginEventStream:DataStream[LoginEvent] = env.readTextFile("C:\\Users\\DELL\\IdeaProjects\\UserBehaviorAnalysis\\LoginFailDetect\\src\\main\\resources\\LoginLog.csv") .map(data => { val dataArray = data.split(",") LoginEvent(dataArray(0).toLong, dataArray(1), dataArray(2), dataArray(3).toLong) }) .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[LoginEvent](Time.seconds(3)) { override def extractTimestamp(t: LoginEvent): Long = t.eventTime*1000L }) val loginWarningStream:DataStream[Warning] = loginEventStream .keyBy(_.userId) .process( new LoginFailWarning(2)) loginWarningStream.print() env.execute("login fail job") } } // 實現自定義的ProcessFunction class LoginFailWarning(maxFailTime: Int) extends KeyedProcessFunction[Long, LoginEvent, Warning]{ // 定義list狀態,用來儲存2秒內所有的登入失敗事件 lazy val LoginFailListState: ListState[LoginEvent] = getRuntimeContext.getListState(new ListStateDescriptor[LoginEvent]("saved-logingfail",classOf[LoginEvent])) // 定義value狀態,用來儲存定時器的時間戳 lazy val timerTsState: ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("time-ts",classOf[Long])) override def processElement(value: LoginEvent, context: KeyedProcessFunction[Long, LoginEvent, Warning]#Context, collector: Collector[Warning]): Unit = { if(value.eventType == "fail"){ LoginFailListState.add(value) if(timerTsState.value()==0){ val ts = value.eventTime*1000L + 2000L context.timerService().registerEventTimeTimer(ts) timerTsState.update(ts) } }else{ context.timerService().deleteEventTimeTimer(timerTsState.value()) LoginFailListState.clear() timerTsState.clear() } } override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, LoginEvent, Warning]#OnTimerContext, out: Collector[Warning]): Unit = { val allLoginFailList:ListBuffer[LoginEvent] = new ListBuffer[LoginEvent] val iter = LoginFailListState.get().iterator() while(iter.hasNext){ allLoginFailList += iter.next() } if(allLoginFailList.length >= maxFailTime){ out.collect(Warning( ctx.getCurrentKey, allLoginFailList.head.eventTime, allLoginFailList.last.eventTime, "login fall in 2s for " + allLoginFailList.length + " times.")) } LoginFailListState.clear() timerTsState.clear() } }
CEP
package com.atguigu.loginfail_detect import java.util import com.atguigu.LoginFail.getClass import org.apache.flink.cep.PatternSelectFunction import org.apache.flink.cep.scala.CEP import org.apache.flink.cep.scala.pattern.Pattern import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.util.Collector import scala.collection.mutable.ListBuffer //定義輸入輸出類 case class LoginEvent(userId:Long, ip:String, eventType:String, eventTime: Long) case class Warning(userId:Long, firstFailTime:Long, lastFailTime:Long, warningMsg:String) object LoginFailCEP { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val resource = getClass.getResource("/LoginLog.csv") //val loginEventStream:DataStream[LoginEvent] = env.readTextFile(resource.getPath) val loginEventStream:DataStream[LoginEvent] = env.readTextFile("C:\\Users\\DELL\\IdeaProjects\\UserBehaviorAnalysis\\LoginFailDetect\\src\\main\\resources\\LoginLog.csv") .map(data => { val dataArray = data.split(",") LoginEvent(dataArray(0).toLong, dataArray(1), dataArray(2), dataArray(3).toLong) }) .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[LoginEvent](Time.seconds(3)) { override def extractTimestamp(t: LoginEvent): Long = t.eventTime*1000L }) // 1.定義匹配的模式 val loginFailPattern: Pattern[LoginEvent, LoginEvent] = Pattern .begin[LoginEvent]("firstFail").where(_.eventType == "fail") .next("secondFail").where(_.eventType == "fail") .within(Time.seconds(2)) // 2 在分組之後的資料流上應用模式,等到一個PatternStream val patternStream = CEP.pattern(loginEventStream.keyBy(_.userId), loginFailPattern) // 3 將檢測到的事件序列,轉換輸出報警資訊 val loginFailStream: DataStream[Warning] = patternStream.select( new LoginFailDetect()) // 4 列印輸出 loginFailStream.print() env.execute("login fail job") } } // 自定義PatternSelectFunction, 用來檢測到的連續登陸失敗事件,包裝成報警資訊輸出 class LoginFailDetect extends PatternSelectFunction[LoginEvent, Warning]{ override def select(map: util.Map[String, util.List[LoginEvent]]): Warning = { // map 例存放的就是匹配到的一組事件,key是定義好的事件模式名稱 val firstLoginFail = map.get("firstFail").get(0) val secondLoginFail = map.get("secondFail").get(0) Warning( firstLoginFail.userId, firstLoginFail.eventTime, secondLoginFail.eventTime, "login fail") } }