Esper系列(二)時間視窗、長度視窗、cast、註解、自定義函式、靜態方法
上圖長度視窗為5,事件W1至W5進入引擎後屬於NewEvents佇列,事件W6進入引擎後,W2至W6就屬於NewEvents佇列,而事件W1就屬於OldEvents隊列了。NewEvents為先進先出佇列,佇列長度為EPL語句中制定的長度視窗大小,OldEvent佇列為過期資料的存放佇列。
EPL長度視窗示例
1 | selectcount(*)asresultfromorderEvent.win:time_batch(3sec) |
時間視窗實現原理圖
說明:
從此圖可以看出,隨著時間推移,每個進入到引擎的W事件都是newEvents,即Insert Stream。W後括號裡的值為屬性值(可忽略),超過EPL語句設定的時間視窗值的事件將進入OldEvent佇列.
時間週期格式
year-part : (number|variable_name) ("years" | "year")
month-part : (number|variable_name) ("months" | "month")
week-part : (number|variable_name) ("weeks" | "week")
day-part : (number|variable_name) ("days" | "day")
hour-part : (number|variable_name) ("hours" | "hour")
minute-part : (number|variable_name) ("minutes" | "minute" | "min")
seconds-part : (number|variable_name) ("seconds" | "second" | "sec")
milliseconds-part : (number|variable_name) ("milliseconds" | "millisecond" | "msec")
EPL時間視窗示例
1 | //統計最近三秒內獲取的事件中salary的平均值 |
2 | Stringepsql="selectavg(salary)asresultfromorderEvent.win:time(3sec)"; |
資料型別轉換(cast)
示例:
1 | //avg(salary)預設返回為Double型別,cast(avg(salary),int)轉換為int型別; |
2 | //30秒內salary的平均值 |
3 | String |
Annotation(註解)
- @Name 指定EPL的名稱,引數只有一個。例如:@Name("MyEPL")
- @Description 對EPL進行描述,引數只有一個。例如:@Description("Hello World")
- @Tag 對EPL進行額外的說明,引數有兩個,分別為Tag的名稱和Tag的值,用逗號分隔。例如:@Tag(name="author",value="luonanqin")
- @Priority 指定EPL的優先順序,引數只有一個,並且整數(可負可正)。例如:@Priority(10)
- @Drop 指定事件經過此EPL後不再參與其他的EPL計算,該註解無引數。
- @Hint 為EPL加上某些標記,讓引擎對此EPL產生其它的操作,會改變EPL例項的記憶體佔用,但通常不會改變輸出。其引數固定,由Esper提供。
- @Audit EPL新增此註解後,可以額外輸出EPL執行情況,有點類似日誌的感覺(當然沒有日誌的功能全啦)。
- @Hook 與SQL相關。
- @EventRepresentation 這是用來指定EPL產生的計算結果事件包含的資料形式。引數只有一個,即array=true或array=false。false為預設值,代表資料形式為Map,若為true,則資料形式為陣列。
示例:
1 | Stringepsql="@Name(\"EsperEvent\")selectavg(salary)fromOrderEventMap.win:length_batch(2)"; |
2 | |
3 | EPStatementepstate=epAdmin.createEPL(epsql); |
4 | epstate.addListener(neworderListener()); |
5 | System.out.println("Nameis["+epstate.getName()+"]"); |
Expression(自定義函式)
格式:
Expression expression_name { expression_body }
expression_name為自定義的Expression名,expression_body為Expression的具體內容。
expression_body表現形式為:(input_param[,…] ]) => expression
input_param為事件流別名(不能和事件流同名)
示例:
1 | //建立轉換函式add |
2 | StringexpSql="createexpressionadd{x=>x.salary+500}"; |
3 | //將oe(orderEvent)事件流中的salary屬性值加500 |
4 | Stringepsql="selectadd(oe)assfromorderEvent.win:length_batch(1)asoe"; |
5 | |
6 | EPStatementexpstate=epAdmin.createEPL(expSql); |
7 | expstate.addListener(neworderListener()); |
8 | EPStatementepstate=epAdmin.createEPL(epsql); |
9 | epstate.addListener(neworderListener()); |
自定義靜態方法的應用
示例(事件流過濾)
1 | //判斷總數是否等於0 |
2 | publicclassIsZero |
3 | { |
4 | publicstaticbooleanisZero(intsum) |
5 | { |
6 | returnsum==0; |
7 | } |
8 | } |
9 | |
10 | //載入 |
11 | epService.getEPAdministrator().getConfiguration().addImport(IsZero.class); |
12 | |
13 | //查詢沒有錢的使用者的name值(User包含name和money屬性) |
14 | selectnamefromorderEvent(IsZero.isZero(salary)) |
注意:
1、要過濾的屬性只能是數字和字串。
2、過濾表示式中不能使用聚合函式。
示例(事件流轉換輸出)
通過自定義類的靜態方法轉換事件流的輸出屬性。
BaseUntil.java(靜態方法實現類)
1 | publicclassBaseUntil{ |
2 | |
3 | publicstaticintAdd(intn){ |
4 | returnn+100; |
5 | } |
6 | |
7 | publicstaticStringUpdataText(Stringstr){ |
8 | returnstr+",你好!"; |
9 | } |
10 | } |
11 | |
12 | //位元組碼載入 |
13 | epService.getEPAdministrator().getConfiguration().addImport(BaseUntil.class); |
14 | |
15 | Stringepsql="selectBaseUntil.UpdataText(name)asresultfromorderEvent"; |