SODBASE CEP學習(十六):CEP與資料庫互動
一些時候出於專案需求或複用,需要將CEP和資料庫結合起來用。SODBASE CEP可以很好地支援這型別需求。本文將介紹CEP與資料庫互動的兩種常用方式。
1. 示例操作
(1)為示例操作簡單,下載Oracle Express Edition (11g) Windows版,安裝過程中會提示為sys和system使用者設定密碼,設定為123456安裝完畢後從開始選單執行“Oracle Database 11g Express Edition”->"執行 SQL 命令列"
我們就用Oracle自帶的示例表空間users,自帶的使用者HR
SQL>connect system/123456 SQL>ALTER USER "HR" identified by "123456" SQL>ALTER USER HR ACCOUNT UNLOCK; SQL>disconnect SQL> connect HR/123456 已連線。 SQL> select * from regions; REGION_ID REGION_NAME ---------- -------------------------------------------------- 1 Europe 2 Americas 3 Asia 4 Middle East and Africa
方式一:
在畫板上右鍵->檢視程式碼檢視此模型的EPL語句SELECT * FROM T1:randomEventStream
PATTERN T1
WHERE JAVA:f.DB:in(T1.value2,'select distinct region_id from regions','oracle.jdbc.OracleDriver','jdbc:oracle:thin:@localhost:1521:xe','HR','123456') WITHIN 0
測試執行,結果如下所示,流的value2欄位的值只有是在REGION_ID裡面才會輸出
方式二:
oracleadaptor01是定時查詢資料庫,oracleadaptor01_output列印輸出,執行結果如下所示
2.工作原理
2.1 方式一:函式呼叫方式
可以把資料庫查詢作為一個函式。比如說一個簡單的需求,我們需要在資料流transacation_flow上查出黑名單使用者的交易記錄。當一筆交易資料請求過來時,我們需要檢查交易賬號是否在黑名單裡,即向資料庫發起查詢 select account from black_list 然後在返回的結果裡做判斷。一般來講這種操作會拉慢CEP引擎的速度,但是如果事件量不大 、tps要求不高的時候,這種方案也是可行的。
SELECT * FROM transacation_flow t PATTERN t WHERE JAVA:f.DB:in(t.from_account,'select distinct account from black_list','oracle.jdbc.OracleDriver','jdbc:oracle:thin:@localhost:1521:xe','user','password')
其中from_account是transacation_flow的欄位,表示轉出賬號。
JAVA:f.DB:in(String x /*被查值*/, String sql, String jdbcDriver, String url, String user, String password) 是SODBASE系統實現的自定義函式。
一共6個引數,如果x在資料庫上執行sql語句返回的資料集中,如果包含x,此函式返回true,否則返回false。
其它資料庫示例
mysql:
SELECT * FROM transacation_flow t
PATTERN t
WHERE JAVA:f.DB:in(t.from_account,'select distinct account from black_list','com.mysql.jdbc.Driver','jdbc:mysql://localhost:3306/cep?useUnicode=true&characterEncoding=utf8','username','password')
postgresql:
SELECT * FROM transacation_flow t
PATTERN t
WHERE JAVA:f.DB:in(t.from_account,'select distinct account from black_list',''org.postgresql.Driver','jjdbc:postgresql://ip:port/mydb','username','password')
Microsoft Sql Server:
SELECT * FROM transacation_flow t
PATTERN t
WHERE JAVA:f.DB:in(t.from_account,'select distinct account from black_list',''com.microsoft.sqlserver.jdbc.SQLServerDriver','jdbc:sqlserver://localhost:1433;DatabaseName=mydb','username','password')
使用者也參考原始碼可以進行二次開發,開發出滿足自己需求的函式來。
2.2 方式二:事件觸發方式
流查詢 SELECT * FROM T1:oracleadaptor01 的輸出介面卡中,觸發資料庫查詢'select * from regions;'右鍵點選OUTPUT,檢視配置
如果要使用輸出事件的欄位值作為引數,採用?{},如 'select '?{t_id}','from_account','to_account', from black_list where account = ?{from_account} limit 1'
這種方式可以將資料庫互動納入統一的事件建模中。3. 原始碼
自定義函式JAVA:f.DB:in在sodbase-cep-udfunction-db.jar中,原始碼如下所示,讀者也可以修改二次開發自定義函式,修改完畢後打jar包,放到lib目錄下面即可
/**
*
*/
package f;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
/**
*
*/
public class DB
{
Connection conn = null;
public boolean in(String x, String sql, String jdbcDriver, String url, String user, String password)
{
try
{
Connection conn = getConnection(jdbcDriver, url, user, password);
conn.setAutoCommit(true);
Statement stmt = conn.createStatement();
ResultSet r = stmt.executeQuery(sql );
while(r.next())
{
Object o = r.getObject(1);
if(o!=null&&o.toString().equals(x))
return true;
}
r.close();
stmt.close();
} catch (SQLException e)
{
e.printStackTrace();
}
finally
{
if(conn!=null)
try
{
conn.close();
} catch (SQLException e)
{
e.printStackTrace();
}
conn=null;
}
return false;
}
public Connection getConnection(String jdbcDriver, String dburl,String dbusername,
String dbpassword)
{
String dbclass =jdbcDriver;
try {
Class.forName(dbclass);
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
try {
if(conn!=null&&!conn.isClosed())
return conn;
} catch (SQLException e1) {
e1.printStackTrace();
}
try {
conn =DriverManager.getConnection(dburl, dbusername,
dbpassword);
} catch (SQLException e) {
e.printStackTrace();
}
return conn;
}
}