1. 程式人生 > >Esper學習之十一:EPL語法(七)

Esper學習之十一:EPL語法(七)

轉載請註明出處:http://blog.csdn.net/luonanqin

       元宵過後回公司上班,換了個部門,換了個領導,做的事也換了,不過Esper還是會繼續搞,所以部落格也會慢慢寫的,大家別急。^_^

       上一篇說到了EPL如何訪問關係型資料庫這種資料來源,實際上別的資料來源,比如:webservice、分散式快取、非關係型資料庫等等,Esper提供了統一的資料訪問介面。然後今天會講解如何建立另外一種事件型別——Schema

1.Joining Method Invocation Results
和執行sql的語法類似,呼叫方法的一種觸發方式也是通過join別的事件的屬性來達到效果,且呼叫方法的句子為from子句。語法如下:

method:class_name.method_name(parameter_expressions)
method是固定關鍵字,class_name為類全名,方法名為返回外部資料的方法名,parameter_expressions為方法的引數列表,對應join的事件屬性,多個屬性之間用逗號分隔,引數整體用圓括號括起來。例如:
select * from AssetMoveEvent, method:MyLookupLib.lookupAsset(assetId) // assetId為AssetMoveEvent的屬性之一
除了簡單join,還可以為join加上where條件過濾一些返回結果。例如:
select assetId, assetDesc from AssetMoveEvent as asset,
	method:MyLookupLib.getAssetDescriptions() as desc    //呼叫的方法無參
		where asset.assetid = desc.assetid
Esper不僅能快取執行sql的查詢結果,也能快取執行方法的查詢結果,並且快取策略也是兩種:LRU和Expire Time。具體可以參考上一篇快取配置章節。若存在返回結果,且快取生效後,Esper會自動為返回結果簡歷索引,加快查詢速度。配置範例如下:
// LRU Cache
<method-reference class-name="MyFromClauseWebServiceLib">
	<lru-cache size="1000"/>
</method-reference>

// Expire Time Cache
<method-reference class-name="com.mycompany.MyFromClauseLookupLib">
	<expiry-time-cache max-age-seconds="10" purge-interval-seconds="10" ref-type="weak"/>
</method-reference>
class-name表示方法所在的類,可以是類全名,可以只有類名,前提是包已經import。其他引數的解釋請參見上一篇快取配置章節

2.Polling Method Invocation Results via Iterator
除了Join別的事件來觸發查詢方法,進而觸發Listener,Esper還支援通過API直接執行方法。例如:

select * from method:MyLookupLib.getAssetDescriptions(category_var)   // category_var為註冊到引擎的變數。

3.Method Definition
為了能夠以一種統一的結構訪問外部資料(RDBMS除外),Esper提供了呼叫靜態方法的形式訪問外部資料。具體解釋如下:
a.返回資料的方法必須是公共靜態方法。方法引數可以有多個也可以沒有。
b.如果返回一條資料或無返回資料,則方法的返回型別可以是Java類或者Map型別資料。如果返回多條資料(包括一條),則方法返回型別必須是Java類的陣列或者Map陣列。
c.如果方法的返回型別是Java類或者Java類陣列,則Java的類定義必須包含針對屬性的get方法。
d.如果方法的返回型別是Map或者Map陣列,則Map的key-value定義固定為<String, Object>。
e.如果返回的資料是Map或者Map陣列,除了定義返回資料的方法外,還要定義返回元資料的方法(這個元資料針對返回的資料)。方法是公共靜態方法,且必須是無引數方法。方法返回型別為Map<String, Class>,String表示返回的資料的名稱,Class表示返回的資料的型別。返回元資料的方法名稱=返回資料的方法名稱+Metadata。

下面舉一個完整的例子總結前面說的三點:
package example;

/**
 * 返回Java型別的外部資料
 *
 * Created by Luonanqin on 2/16/14.
 */

class JavaObject {
	private String name;
	private int size;

	public String getName() {
		return name;
	}

	public void setName(String name) {
		this.name = name;
	}

	public int getSize() {
		return size;
	}

	public void setSize(int size) {
		this.size = size;
	}

	public String toString() {
		return "JavaObject{" + "name='" + name + '\'' + ", size=" + size + '}';
	}
}

public class InvocationMethodJava {

	public static JavaObject[] getJavaObject(int times) {
		JavaObject[] javaObjects = new JavaObject[2];
		JavaObject javaObject1 = new JavaObject();
		javaObject1.setName("javaObject1");
		javaObject1.setSize(1 * times);
		JavaObject javaObject2 = new JavaObject();
		javaObject2.setName("javaObject2");
		javaObject2.setSize(2 * times);

		javaObjects[0] = javaObject1;
		javaObjects[1] = javaObject2;

		return javaObjects;
	}
}

package example;

import java.util.HashMap;
import java.util.Map;

/**
 * 返回Map型別的外部資料
 *
 * Created by Luonanqin on 2/16/14.
 */
public class InvocationMethodMap {

	public static Map<String, Object> getMapObject() {
		Map<String, Object> map = new HashMap<String, Object>();
		map.put("name", "mapObject1");
		map.put("size", 1);

		return map;
	}

	public static Map<String, Class> getMapObjectMetadata() {
		Map<String, Class> map = new HashMap<String, Class>();
		map.put("name", String.class);
		map.put("size", int.class);

		return map;
	}
}

package example;

import java.util.Iterator;

import com.espertech.esper.client.EPAdministrator;
import com.espertech.esper.client.EPRuntime;
import com.espertech.esper.client.EPServiceProvider;
import com.espertech.esper.client.EPServiceProviderManager;
import com.espertech.esper.client.EPStatement;
import com.espertech.esper.client.EventBean;
import com.espertech.esper.client.UpdateListener;

/**
 * Created by Luonanqin on 2/16/14.
 */

class Times {
	private int times;

	public int getTimes() {
		return times;
	}

	public void setTimes(int times) {
		this.times = times;
	}
}

class InvocationMethodListener implements UpdateListener {

	public void update(EventBean[] newEvents, EventBean[] oldEvents) {
		if (newEvents != null) {
			System.out.println(newEvents[0].getUnderlying());
			System.out.println(newEvents[1].getUnderlying());
		}
	}
}

public class InvocationMethodTest {
	public static void main(String arg[]) {
		EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider();
		EPRuntime runtime = epService.getEPRuntime();
		EPAdministrator admin = epService.getEPAdministrator();

		/**
		 * 呼叫外部方法返回Java型別資料
		 */
		String timesName = Times.class.getName();
		String ijName = InvocationMethodJava.class.getName();
		String epl1 = "select ij.* from " + timesName + " as t, method:" + ijName + ".getJavaObject(times) as ij";
		System.out.println(epl1+"\n");

		EPStatement state1 = admin.createEPL(epl1);
		state1.addListener(new InvocationMethodListener());

		Times times = new Times();
		times.setTimes(2);

		runtime.sendEvent(times);

		System.out.println("");

		/**
		 * 呼叫外部方法返回Map型別資料
		 */
		String imName = InvocationMethodMap.class.getName();
		String epl2 = "select * from method:" + imName + ".getMapObject()";
		System.out.println(epl2+"\n");

		EPStatement state2 = admin.createEPL(epl2);
		Iterator<EventBean> iter = state2.iterator();
		while (iter.hasNext()) {
			EventBean event = iter.next();
			System.out.println(event.getUnderlying());
		}
	}
}

4.Declare an Event Type by Providing Names and Types
       我曾經在《Esper學習之二:事件型別》裡說過,事件型別的定義可以是POJO,陣列,Map,或者XML。實際上還有另一種定義事件型別的方法,那就是schema。這個schema可不是資料庫中的schema,而是用EPL定義的事件型別,所以說此類事件型別是針對Esper設計的,並不能拿出來通用。

我們先從語法開始說起。

create [map | objectarray] schema schema_name [as]
 (property_name property_type [,property_name property_type [,...])
 [inherits inherited_event_type[, inherited_event_type] [,...]] [starttimestamp timestamp_property_name]
 [endtimestamp timestamp_property_name]
 [copyfrom copy_type_name [, copy_type_name] [,...]]
解釋如下:

a.map objectarray分別表示當前定義的schema是map結構還是陣列結構。
b.schema_name表示schema的名字,全域性唯一。as為可選內容。
c.property_name表示schema所包含的屬性名稱,property_type為屬性的型別,多個屬性用逗號分隔,所有屬性用圓括號括起來可以使用的型別有:int、String等已經內建的Java型別;已經通過Configuration介面註冊的事件型別,比如Map,陣列,schema等;可以使用POJO類,如果沒有import就要寫全名,否則寫類名即可。
d.inherits表示繼承別的事件型別,後面跟著的是要繼承的事件型別。如果使用了繼承,則當前定義的schema包含了繼承的事件型別的所有屬性。並且可以繼承多個,用逗號分隔。
e.starttimestamp和endtimestamp是兩個特殊的關鍵字。單獨使用starttimestamp時,表示為schema記一個時間戳。後面跟著已經宣告的屬性並且能夠返回一個data-time值。如果聯合endtimestamp使用,則表示這個schema只能在某段事件內使用。後面跟著的同樣也是已經宣告的屬性並且能夠返回一個data-time值。注意endtimestamp不能單獨使用。
f.copyfrom表示複製別的事件的所有屬性到當前定義的schema中,並且可以copy多個事件,用逗號分隔。


下面用幾個例子來一一展示如何使用上面的語法:

// 宣告SecurityEvent
create schema SecurityEvent as (ipAddress string, userId String, numAttempts int)

// 宣告AuthorizationEvent,並且包含com.mycompany.HostNameInfo型別的hostinfo屬性
create schema AuthorizationEvent(group String, roles String[], hostinfo com.mycompany.HostNameInfo)

// 宣告CompositeEvent,並且包含了SecurityEvent陣列作為innerEvents屬性的型別
create schema CompositeEvent(group String, innerEvents SecurityEvent[])

// 宣告WebPageVisitEvent,自己定義了userId屬性,並且繼承了PageHitEvent的所有屬性
create schema WebPageVisitEvent(userId String) inherits PageHitEvent

// 宣告RoboticArmMovement,並且開始於startts,結束於endts
create schema RoboticArmMovement (robotId string, startts long, endts long) starttimestamp startts endtimestamp endts

// 宣告ExtendedSecurityEvent,並且複製SecurityEvent事件的所有屬性到ExtendedSecurityEvent
create schema ExtendedSecurityEvent (userName string) copyfrom SecurityEvent

// 宣告WebSecurityEvent,並且複製SecurityEvent和WebPageVisitEvent事件的所有屬性到WebSecurityEvent
create schema WebSecurityEvent (userName string) copyfrom SecurityEvent, WebPageVisitEvent
這裡要額外說一下,繼承不僅僅繼承了事件的屬性,如果被繼承的事件定義了starttimestamp或者endtimestamp,同樣也會被繼承下來。但是copyfrom是不會吧starttimestamp和endtimestamp複製的。這點一定要注意。

對於map和objectarray這兩個關鍵字,可以用註解達到同樣的效果。例如:
// 宣告陣列型別的schema
create objectarray schema SchemaTest1 as (prop1 string);
... equals ...
@EventRepresentation(array=true)create schema SchemaTest1 as (prop1 string);

// 宣告Map型別的schema
create map schema SchemaTest2 as (prop1 string);
... equals ...
@EventRepresentation(array=false)create schema SchemaTest2 as (prop1 string);

5.Declare Variant Stream
Variant Stream簡單來說就是包含了各種不同的事件型別的事件型別。所以語法也很明瞭:

create variant schema schema_name [as] eventtype_name|* [, eventtype_name|*] [,...]
variant為關鍵字,表明這是Variant Stream,eventtype_name為事件的定義名,多個事件定義用逗號分隔開。*表示接收任何的事件型別,不過一般來說沒有需求會到這種程度。舉例如下:
// 宣告SecurityVariant,包含了LoginEvent和LogoutEvent
create variant schema SecurityVariant as LoginEvent, LogoutEvent
// 宣告AnyEvent,包含任何型別的事件
create variant schema AnyEvent as *

以上就是呼叫外部方法以及schema的建立講解,尤其是schema的建立,可能大家會用的更多一些,最好能記住。