Esper學習和原理分析
publicclassOrderEvent{ privateString itemName; privatedouble price; publicOrderEvent(String itemName,double price){ this.itemName = itemName; this.price = price
2.EPL: EPL是ESPER的核心,它類似於SQL,但是和SQL的執行方式不同。 SQL是資料在那裡,你每次執行SQL就會觸發一次查詢;而EPL是查詢在這裡,資料輸入達到一定條件即可觸發查詢。 這個條件可以有多種: a).每個event物件來就觸發一次查詢,並只處理當前物件
select*fromOrderEvent
這個EPL語句會在每個OrderEvent物件到達後,並將該event交給後續的Listener(後面會降到)來進行處理。但是這種用法不多見,意義不大。 b).視窗處理模式: EPL最大的特色就是這個視窗處理模式,有兩種視窗,時間視窗和長度視窗。 時間視窗 : 大家想一下,如果有一個場景,要獲取最近3秒內OrderEvent的price的平均值,那該怎麼做呢?一般的做法需要做個後臺執行緒來做3秒的時間統計,時間到了再做後續處理,雖然不復雜,但是也挺繁瑣的。 看看EPL是怎麼做的
select avg(price)from test.OrderEvent.win:time(3 sec)
win:time(3 sec)就是定義了3秒的時間視窗,avg(price)就是統計了3秒內的OrderEvent物件的price的平均值 長度視窗: 長度視窗和時間視窗比較類似select avg(price)from test.OrderEvent.win:length(100)
win:length(10)就是定義了10個Event的,avg(price)就是統計了最近10個的OrderEvent物件的price的平均值 以上這些都比較容易理解,雖然知道了處理方法,也比較好用,我還是比較喜歡鑽研一下他的內部實現方式。先來看一張時間視窗模式的圖 他僅保留最近時間視窗的物件內容,但是每個Event到來都會觸發一次UpdateListener的操作 EPL語句會作為一個Statement來監聽事件的到來,當New Events有新事件時就會觸發UpdateListener的操作,下面是一個updateListener的簡單例子,event.get("avg(price))就可以獲得EPL查詢所獲得的price平均值,然後就可以加入自己的程式碼進行處理,比如將結果寫入本地檔案 而New Events和Old Events就是他的輸入,而ave(price)操作所計算的物件就是Length Window中的內容。publicclassMyListenerimplementsUpdateListener{ publicvoid update(EventBean[] newEvents,EventBean[] oldEvents){ EventBeanevent= newEvents[0]; System.out.println("avg="+event.get("avg(price)")); } }
事件視窗也基本類似。 EPL的時間視窗的計時是怎麼實現的呢?我們來看下他的原始碼ScheduledThreadPoolExecutor timer;//省略構造
timerTask = new EPLTimerTask(timerCallback);
ScheduledFuture<?> future = timer.scheduleAtFixedRate(timerTask, 0, msecTimerResolution, TimeUnit.MILLISECONDS);//估計每100毫秒執行一次
...
_lastDrift = Math.abs(future.getDelay(TimeUnit.MILLISECONDS));//計算延遲
...
CurrentTimeEvent currentTimeEvent = new CurrentTimeEvent(msec); sendEvent(currentTimeEvent);//傳送時間控制Event
貼得比較亂 簡單解釋一下,ScheduledThreadPoolExecutor.scheduleAtFixedRate固定每100ms(可配置)執行一次,並且通過計算延遲future.getDelay來確保計時精確,接下來通過傳送一個CurrentTimeEvent來推送時間前進100+delay(ms),也就是說ESPER中的時間不是完全受機器時間控制的,而是通過傳送TimeEvent由應用來進行控制的,這方便做很多的擴充套件。 c)批量視窗處理模式 視窗模式是會在每個Event來都觸發一次UpdateListener操作,如果每秒Event數量達到很大的話這種方式明顯是不行的 CPU消耗會很厲害 批量視窗處理模式正好可以解決這個問題 批量時間視窗模式select avg(price)from test.OrderEvent.win:time_batch(3 sec)
批量長度視窗模式select avg(price)from test.OrderEvent.win:length_batch(10)
時間批量模式的操作圖如下 上圖的時間視窗大小為4s,他會在4s的視窗時間到達以後才將視窗中的內容一起扔給UpdateListener來進行處理,效能相對節約很多,特別是大資料量的情況下。長度批量視窗的處理模式也是類似。 上述視窗模式下記憶體使用情況又是如何呢?經過本人測試和研究程式碼發現,它會保留兩個視窗的記憶體使用量,一個儲存當前視窗的Events,一個儲存上一個視窗的Events,因此在估算一個數據分析程式佔用多少記憶體要看上面監聽的EPL語句開的視窗的大小以及資料的TPS,防止記憶體OOM。 掌握了上面的視窗的概念,後面其他的內容都很好理解了 d) 過濾 where過濾select avg(price)from test.OrderEvent.win:time_batch(3 sec)where price>10
having過濾select avg(price)from test.OrderEvent.win:time_batch(3 sec) having price>10
似曾相識啊,執行方式也基本和SQL裡的where 和 having差不多。 在EPL裡where 是在incoming Events到window之間進行過濾,having是在window到New Eventing之間進行過濾 e)聚合 countselect count(price)from test.OrderEvent.win:time_batch(3 sec)where price>10
sum
select sum(price)from test.OrderEvent.win:time_batch(3 sec)where price>10
group byselectitemName,sum(price)from test.OrderEvent.win:time_batch(3 sec)where price>10groupby itemName
都很簡單,瞭解SQL的都狠容易上手 f) 函式 ESPER預設載入 java.lang.* java.math.* java.text.* java.util.* 支援這些包下的函式方法,例如
selectMath.round(sum(price))from test.OrderEvent.win:time_batch(3 sec)where price>10
它還支援自定義函式,舉個例子,做個計算百分比的函式publicclassUtil{ publicstaticdouble computePercent(double amount,double total){ return amount / total *100; } }
配置一下<plugin-singlerow-functionname="percent" function-class="mycompany.MyUtilityClass"function-method="computePercent"/>
OK了,可以用了select percent(price,total)fromOrderEvent
總體來說,ESPER的EPL功能非常強大,而且基本和SQL類似,入門容易,構造一個實時資料分析系統比較簡單,且維護成本低,新應用進來只需要簡單配置一下EPL語句就可以了,方便快捷,對大部分的系統還是比較適合的。