Esper學習筆記一:介紹
1.CEP(Complex Event Processing複雜事件處理)
事件(Event):一般情況下指的是一個系統中正在發生的事,事件可能發生在系統的各個層面上,它可以是某個動作,例如客戶下單,傳送訊息,提交報告等,也可以是某種狀態的改變,例如溫度的變化,超時等等。事件處理(Event processing)指的是跟蹤系統中發生的事件,分析事件中的資訊並從中得到某種結論。而複雜事件處理,則是結合多個事件源中的事件,從中推斷出更加複雜的情況下的事件。
實現一個CEP引擎我們需要思考的問題:(1)吞吐量;(2)低延遲;(3)複雜邏輯處理。
2.Esper
Esper是一個開源的複雜事件處理引擎,它的目的是讓使用者能夠通過它提供的介面,構建一個用於處理複雜事件的應用程式
從Esper的架構圖中我們可以看出,它包含三個部分:Input adapter,Esper engine,Output adapter。
Input adapter 和 Output adapter
輸入介面卡和輸出介面卡的主要目的是接收來自不同事件源的事件,並向不同的目的地輸出事件。 目前,Esper提供的介面卡包括File Input and Output adpter, Spring JMS Input and Output Adapter, AMQP Input and Output Adapter, Kafka Adapter等等。這些介面卡提供了一系列介面,可以讓使用者從不同的資料來源讀取資料,並將資料傳送給不同的目的資料來源,使用者可以不用自己單獨編寫客戶端程式碼來連線這些資料來源,感覺相當於對這些資料來源提供了一層封裝。
Esper engine
Esper引擎是處理事件的核心,它允許使用者定義需要接收的事件以及對這些事件的處理方式。
Esper支援的事件表現形式
Esper支援多種事件表現形勢:包括遵循JavaBean方式的含有getter方法的Java POJO(普通Java物件),實現了Map介面的物件,物件陣列,XML文件物件,以及Apache Avro(一個支援JSON和Schema的資料序列化系統,可以將資料結構或物件轉化成便於儲存和傳輸的格式)。 這些事件表現形式的共同之處在於,它們都提供了事件型別的元資料,也就是說能夠表示事件的一系列屬性,例如,一個Java物件可以通過其成員變數來表示其事件屬性,一個Map物件能夠通過鍵值對來表示屬性。由此可見,本質上事件是一系列屬性值的集合,對事件的操作即對事件中的部分或全部屬性的操作。
Esper事件處理模型
Esper事件處理模型主要包含兩部分:(1)Statement 利用Esper的事件處理語言EPL宣告對事件進行的操作,Esper中提供了多種型別的事件操作,包括過濾、加窗、事件聚合等等。EPL是一種類似於SQL的語言,從這一點上來看,Esper恰好與資料庫相反,資料庫時儲存資料,並在資料上執行查詢語句,而Esper是儲存查詢語句,在這些查詢上執行資料,只要事件與查詢條件匹配,Esper就會實時進行處理,而不是隻有在查詢提交的時候才處理。(2)Listener用於監聽事件的處理情況,接收事件處理的結果,通過UpdateListener介面來實現,它相當於一個回撥函式,當事件處理完成之後,可以通過該回調函式向結果傳送到目的地。
3.例項
舉例一個使用者註冊事件:定義事件物件包含name和age屬性實現get方法,實時統計註冊使用者的平均年齡。
定義事件類
public class MyEvent {
private String name;
private int age;
public MyEvent() {
}
public MyEvent(String name, int age) {
this.name = name;
this.age = age;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
}
測試對事件的操作
public class EsperTest {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.addEventType("myEvent", MyEvent.class);
EPServiceProvider provider = EPServiceProviderManager.getDefaultProvider(conf);
EPAdministrator admin = provider.getEPAdministrator();
EPStatement statement = admin.createEPL("select name,age,avg(age) as avgAge from myEvent");
statement.addListener(new UpdateListener() {
@Override
public void update(EventBean[] newEvents, EventBean[] oldEvents) {
for (EventBean eb : newEvents) {
System.out.println("註冊使用者姓名:"+eb.get("name")+",註冊使用者年齡:"+eb.get("age")+",平均年齡:"+eb.get("avgAge"));
}
}
});
EPRuntime epr = provider.getEPRuntime();
epr.sendEvent(new MyEvent("sean", 30));
epr.sendEvent(new MyEvent("sime", 25));
epr.sendEvent(new MyEvent("jack",28));
epr.sendEvent(new MyEvent("lulcy",26));
epr.sendEvent(new MyEvent("duck", 35));
}
}
執行結果
註冊使用者姓名:sean,註冊使用者年齡:30,平均年齡:30.0
註冊使用者姓名:sime,註冊使用者年齡:25,平均年齡:27.5
註冊使用者姓名:jack,註冊使用者年齡:28,平均年齡:27.666666666666668
註冊使用者姓名:lulcy,註冊使用者年齡:26,平均年齡:27.25
註冊使用者姓名:duck,註冊使用者年齡:35,平均年齡:28.8
轉載:https://www.cnblogs.com/yitudake/p/6747990.html