Apache Flink CEP學習總結
1. 簡介
Apache Flink是一個計算框架,地位和Spark差不多。裡面的API也有與Spark類似的,例如FlinkKafkaConsumer010對應著Spark裡的讀取Kafka形成流的API,DataStream對應著Spark裡的DStream,也有一系列的transform API例如map/fliter等等。
在yarn上提交任務的方式也十分簡潔:
請注意,它的yarn提交模式中只有yarn-cluster;不像spark那樣,有yarn-client和yarn-cluster。
乍一看,簡直不知道為何會有一個與已存在的框架功能相同的重複框架存在。其實,Apache Flink最亮眼的地方,主要在於它的CEP庫(即Complex Event Processing庫)。
CEP庫允許你在流上定義一系列的模式(pattern),最終使得你可以方便的抽取自己需要的重要的事件出來。
2.相關API
2.1 時間
使用Apache Flink主要是用它來進行關聯分析,大量的關聯分析問題可以歸結於followed by問題,而這種時序性的問題又與時間欄位息息相關。Apache Flink是如何來處理時間問題呢?它定義瞭如下3種時間:
- 處理時間(Processing Time): 開始處理此條資料的時間
- 事件時間(Event Time): 資料的時間(此時間欄位應該來源於資料內容本身,使用該型別的時間是需要用時間水印生成器)
- 進入時間(Ingestion Time): 資料進入Flink的時間
可以在定義environment時指定需要用的時間型別:
如上圖所示,即是指定使用事件時間。
既然是事件時間,那麼就需要指定事件時間是用的資料裡的哪個欄位。Flink要求時間都得是毫秒級時間戳。指定Kafka流中的時間的方式如下程式碼所示:
val source = new FlinkKafkaConsumer010[String]("mytopic",new SimpleStringSchema(),properties) //定義kafka source使用的時間欄位 source.assignTimestampsAndWatermarks(new AscendingTimestampExtractor[String] { override def extractAscendingTimestamp(element: String): Long = { var rtn = -1L try{ rtn = JSON.parseObject(element).getJSONObject("payload").getLong("timestamp")*1000 }catch{ case ex:Exception => { ex.printStackTrace() } } rtn } })
在預設不指定使用的時間型別的情況下,Flink預設是用的處理時間(Processing Time)。如果需要指定是事件時間,如上使用遞增時間生成器(AscendingTimestampExtractor),那麼需要把保證取到的時間欄位的值確實是單調遞增的,否則會觸發Flink的不滿足單調遞增的warning。
鑑於Kafka裡的資料在消費時時間(資料內的時間)已經不是完全有序的了,在使用處理時間的情況下可能followed by模式結果會存在一定的誤差。
2.2 單個模式
型別 | API | 含義 |
量詞API | times() | 模式發生次數 示例: pattern.times(2,4),模式發生2,3,4次 |
timesOrMore() oneOrMore() |
模式發生大於等於N次 示例: pattern.timesOrMore(2),模式發生大於等於2次 |
|
optional() | 模式可以不匹配 示例: pattern.times(2).optional(),模式發生2次或者0次 |
|
greedy() | 模式發生越多越好 示例: pattern.times(2).greedy(),模式發生2次且重複次數越多越好 |
|
條件API | where() | 模式的條件 示例: pattern.where(_.ruleId=43322),模式的條件為ruleId=433322 |
or() | 模式的或條件 示例: pattern.where(_.ruleId=43322).or(_.ruleId=43333),模式條件為ruleId=43322或者43333 |
|
util() | 模式發生直至X條件滿足為止 示例: pattern.oneOrMore().util(condition)模式發生一次或者多次,直至condition滿足為止 |
2.3 聯合模式
API | 含義 |
next() | 嚴格的滿足條件 示例: 模式為begin("first").where(_.name='a').next("second").where(.name='b') 當且僅當資料為a,b時,模式才會被命中。如果資料為a,c,b,由於a的後面跟了c,所以a會被直接丟棄,模式不會命中。 |
followedBy() | 鬆散的滿足條件 示例: 模式為begin("first").where(_.name='a').followedBy("second").where(.name='b') 當且僅當資料為a,b或者為a,c,b,,模式均被命中,中間的c會被忽略掉。 |
followedByAny() | 非確定的鬆散滿足條件 模式為begin("first").where(_.name='a').followedByAny("second").where(.name='b') 當且僅當資料為a,c,b,b時,對於followedBy模式而言命中的為{a,b},對於followedByAny而言會有兩次命中{a,b},{a,b} |
within() | 模式命中的時間間隔限制 |
notNext() notFollowedBy() |
後面的模式不命中(嚴格/非嚴格) |
2.4 忽略策略
忽略策略 | 含義 |
NO_SKIP | 不忽略 在模式為:begin("start").where(_.name='a').oneOrMore().followedBy("second").where(_.name='b') 對於資料:a,a,a,a,b 模式匹配到的是:{a,b},{a,a,b},{a,a,a,b},{a,a,a,a,b} |
SKIP_PAST_LAST_EVENT | 在模式匹配完成之後,忽略掉之前的部分匹配結果 在模式為:begin("start").where(_.name='a').oneOrMore().followedBy("second").where(_.name='b') 對於資料:a,a,a,a,b 模式匹配到的是:{a,a,a,a,b} |
SKIP_TO_FIRST | 在模式匹配完成之後,忽略掉第一個之前的部分匹配結果 |
SKIP_TO_LAST | 在模式匹配完成之後,忽略掉最後一個之前的部分匹配結果 在模式為:begin("start").where(_.name='a').oneOrMore().followedBy("second").where(_.name='b') 對於資料:a,a,a,a,b 模式匹配到的是:{a,b},{a,a,b},{a,a,a,a,b} |
3.示例場景實現
主機被Netcore/Netis漏洞利用成功隨即發起Gafgyt通訊行為。
l 前置條件:主機作為dip觸發41472 Netcore / Netis 路由器後門告警
l 後續條件:主機作為sip觸發41533 Gafgyt僵屍網路通訊通訊告警
兩個條件具備時序關係,且中間的間隔時間不應大於30分鐘。
定義Pattern:
- 在first pattern中,定義的條件是ruleid=41472。
- 在second pattern中,定義的條件是ruleid=41533且此條訊息的源ip==前置條件的目的ip
- within方法,限定了滿足條件必須在30分鐘內
- oneOrMore().greedy,第一個條件滿足一次或者多次,越多次越好
- 忽略策略為AfterMatchSkipStrategy.skipPastLastEvent,在滿足模式之後,忽略掉之前的部分滿足條件
這樣對於輸入的告警是多次sip=1.1,dip=1.2的netcore告警,隨後一條sip=1.2,dip=1.3的gafgyt告警,拿到的命中資料會是{"first":n條netcore告警的list,"second":gafgyt告警}
將Kafka input和pattern組合成為Pattern Stream。然後再在select方法中合併滿足first pattern的告警與滿足second pattern的告警,把他們合併為一條“目的IP被Netcore/Netis攻擊成功並開始進行Gafgypt通訊”告警輸出。
輸出的效果如下圖所示:
需要引入的pom依賴如下:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.6.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>1.6.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.10_2.11</artifactId>
<version>1.6.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep-scala_2.11</artifactId>
<version>1.6.1</version>
</dependency>
完整示例程式碼如下:
package com.flinklearn.main
import java.util.Properties
import com.alibaba.fastjson.{JSON, JSONObject}
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.cep.PatternSelectFunction
import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy
import org.apache.flink.cep.pattern.conditions.IterativeCondition
import org.apache.flink.cep.scala.CEP
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.functions.timestamps.{BoundedOutOfOrdernessTimestampExtractor}
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
import org.apache.flink.streaming.api.scala._
object Main {
def main(args:Array[String]):Unit = {
val env:StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val properties = new Properties()
properties.setProperty("bootstrap.servers", "bootstrap")
properties.setProperty("group.id","com.flinklearn.main.Main")
val source = new FlinkKafkaConsumer010[String]("mytopic",new SimpleStringSchema(),properties)
val input:DataStream[JSONObject] = env.addSource(source)
.map(line=>{
var rtn:JSONObject = null
try{
rtn = JSON.parseObject(line).getJSONObject("payload")
}catch{
case ex:Exception => {
ex.printStackTrace()
}
}
rtn
}).filter(line=>line!=null)
val pattern = Pattern.begin[JSONObject]("first",AfterMatchSkipStrategy.skipPastLastEvent).where(new IterativeCondition[JSONObject] {
override def filter(value: JSONObject, ctx: IterativeCondition.Context[JSONObject]): Boolean = {
//目標主機被netcore漏洞掃描
//41472 Netcore / Netis 路由器後門
value.getLong("ruleid").equals(41472L)
}
}).oneOrMore.greedy.followedBy("second").where(new IterativeCondition[JSONObject] {
override def filter(value: JSONObject, ctx: IterativeCondition.Context[JSONObject]): Boolean = {
//主機主動發起gafgyt通訊行為
//41533 Gafgyt僵屍網路通訊
val iterator:java.util.Iterator[JSONObject] = ctx.getEventsForPattern("first").iterator()
var tag = false
if(value.getLong("ruleid").equals(41533L)){
while (!tag&&iterator.hasNext){
val curitem = iterator.next()
if(curitem.getString("dip").equals(value.getString("sip")) && value.getLong("timestamp") > curitem.getLong("timestamp")){
tag = true
}
}
}
tag
}
}).within(Time.minutes(30L))
val patternStream = CEP.pattern(input,pattern)
val result = patternStream.select(new PatternSelectFunction[JSONObject,JSONObject] {
override def select(pattern: java.util.Map[String, java.util.List[JSONObject]]): JSONObject = {
val first = pattern.get("first")
val second = pattern.get("second")
var startTime = first.get(0).getLong("timestamp")
var endTime = second.get(0).getLong("timestamp")
for(i <- 1 until first.size()){
if(first.get(i).getLong("timestamp") < startTime){
startTime = first.get(i).getLong("timestamp")
}
}
for(i <- 1 until second.size()){
if(second.get(i).getLong("timestamp") > endTime){
endTime = second.get(i).getLong("timestamp")
}
}
val sip = first.get(0).getString("sip")
val dip = first.get(0).getString("dip")
val info1 = second.get(0).getString("dip")
val msg = "目的IP被Netcore/Netis攻擊成功並開始進行Gafgypt通訊"
val obj:JSONObject = new JSONObject()
obj.put("start_time", startTime)
obj.put("end_time", endTime)
obj.put("sip", sip)
obj.put("dip", dip)
obj.put("info1", info1)
obj.put("msg", msg)
obj
}
})
result.print()
env.execute("Event generating test")
}
}