通過JMS監聽Oracle AQ,在資料苦表變化時觸發並執行Java程式
環境說明
本實驗環境基於Oracle 12C和JDK1.8,其中Oracle 12C支援多租戶特性,相較於之前的Oracle版本,使用‘C##使用者名稱‘表示使用者,例如如果資料庫使用者叫kevin,則登陸時使用C##kevin進行登陸。
一、Oracle高階訊息佇列AQ
Oracle AQ是Oracle中的訊息佇列,是Oracle中的一種高階應用,每個版本都在不斷的加強,使用DBMS_AQ系統包進行相應的操作,是Oracle的預設元件,只要安裝了Oracle資料庫就可以使用。使用AQ可以在多個Oracle資料庫、Oracle與Java、C等系統中進行資料傳輸。
下面分步驟說明如何建立Oracle AQ
1. 建立訊息負荷payload
Oracle AQ中傳遞的訊息被稱為有效負荷(payloads),格式可以是使用者自定義物件或XMLType或ANYDATA。本例中我們建立一個簡單的物件型別用於傳遞訊息。
create type demo_queue_payload_type as object (message varchar2(4000));
2. 建立隊列表
隊列表用於儲存訊息,在入隊時自動存入表中,出隊時自動刪除。使用DBMS_AQADM包進行資料表的建立,只需要寫表名,同時設定相應的屬性。對於佇列需要設定multiple_consumers為false,如果使用釋出/訂閱模式需要設定為true。
begin
dbms_aqadm.create_queue_table(
queue_table => 'demo_queue_table',
queue_payload_type => 'demo_queue_payload_type',
multiple_consumers => false
);
end;
執行完後可以檢視oracle表中自動生成了demo_queue_table表,可以檢視影響子段(含義比較清晰)。
3. 建立佇列並啟動
建立佇列並啟動佇列:
begin dbms_aqadm.create_queue ( queue_name => 'demo_queue', queue_table => 'demo_queue_table' ); dbms_aqadm.start_queue( queue_name => 'demo_queue' ); end;
至此,我們已經建立了佇列有效負荷,隊列表和佇列。可以檢視以下系統建立了哪些相關的物件:
SELECT object_name, object_type FROM user_objects WHERE object_name != 'DEMO_QUEUE_PAYLOAD_TYPE';
OBJECT_NAME OBJECT_TYPE
------------------------------ ---------------
DEMO_QUEUE_TABLE TABLE
SYS_C009392 INDEX
SYS_LOB0000060502C00030$$ LOB
AQ$_DEMO_QUEUE_TABLE_T INDEX
AQ$_DEMO_QUEUE_TABLE_I INDEX
AQ$_DEMO_QUEUE_TABLE_E QUEUE
AQ$DEMO_QUEUE_TABLE VIEW
DEMO_QUEUE QUEUE
我們看到一個佇列帶出了一系列自動生成物件,有些是被後面直接用到的。不過有趣的是,建立了第二個佇列。這就是所謂的異常佇列(exception queue)。如果AQ無法從我們的佇列接收訊息,將記錄在該異常佇列中。
訊息多次處理出錯等情況會自動轉移到異常的佇列,對於異常佇列如何處理目前筆者還沒有找到相應的寫法,因為我使用的場景並不要求訊息必須一對一的被處理,只要起到通知的作用即可。所以如果訊息轉移到異常佇列,可以執行清空隊列表中的資料
delete from demo_queue_table;
4. 佇列的停止和刪除
如果需要刪除或重建可以使用下面的方法進行操作:
BEGIN
DBMS_AQADM.STOP_QUEUE(
queue_name => 'demo_queue'
);
DBMS_AQADM.DROP_QUEUE(
queue_name => 'demo_queue'
);
DBMS_AQADM.DROP_QUEUE_TABLE(
queue_table => 'demo_queue_table'
);
END;
5. 入隊訊息
入列操作是一個基本的事務操作(就像往隊列表Insert),因此我們需要提交。
declare
r_enqueue_options DBMS_AQ.ENQUEUE_OPTIONS_T;
r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
v_message_handle RAW(16);
o_payload demo_queue_payload_type;
begin
o_payload := demo_queue_payload_type('what is you name ?');
dbms_aq.enqueue(
queue_name => 'demo_queue',
enqueue_options => r_enqueue_options,
message_properties => r_message_properties,
payload => o_payload,
msgid => v_message_handle
);
commit;
end;
通過SQL語句檢視訊息是否正常入隊:
select * from aq$demo_queue_table;
select user_data from aq$demo_queue_table;
6. 出隊訊息
使用Oracle進行出隊操作
declare
r_dequeue_options DBMS_AQ.DEQUEUE_OPTIONS_T;
r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
v_message_handle RAW(16);
o_payload demo_queue_payload_type;
begin
DBMS_AQ.DEQUEUE(
queue_name => 'demo_queue',
dequeue_options => r_dequeue_options,
message_properties => r_message_properties,
payload => o_payload,
msgid => v_message_handle
);
DBMS_OUTPUT.PUT_LINE(
'***** Browse message is [' || o_payload.message || ']****'
);
end;
二、Java使用JMS監聽並處理Oracle AQ佇列
Java使用JMS進行相應的處理,需要使用Oracle提供的jar,在Oracle安裝目錄可以找到:在linux中可以使用find命令進行查詢,例如
find `pwd` -name 'jmscommon.jar'
需要的jar為:
- app/oracle/product/12.1.0/dbhome_1/rdbms/jlib/jmscommon.jar
- app/oracle/product/12.1.0/dbhome_1/jdbc/lib/ojdbc7.jar
- app/oracle/product/12.1.0/dbhome_1/jlib/orai18n.jar
- app/oracle/product/12.1.0/dbhome_1/jlib/jta.jar
- app/oracle/product/12.1.0/dbhome_1/rdbms/jlib/aqapi_g.jar
1. 建立連線引數類
實際使用時可以把引數資訊配置在properties檔案中,使用Spring進行注入。
package org.kevin.jms;
/**
*
* 連線引數資訊
*
*/
public class JmsConfig {
public String username = "c##kevin";
public String password = "a111111111";
public String jdbcUrl = "jdbc:oracle:thin:@127.0.0.1:1521:orcl";
public String queueName = "demo_queue";
}
2. 建立訊息轉換類
因為訊息載荷是Oracle資料型別,需要提供一個轉換工廠類將Oracle型別轉換為Java型別。
package org.kevin.jms;
import java.sql.SQLException;
import oracle.jdbc.driver.OracleConnection;
import oracle.jdbc.internal.OracleTypes;
import oracle.jpub.runtime.MutableStruct;
import oracle.sql.CustomDatum;
import oracle.sql.CustomDatumFactory;
import oracle.sql.Datum;
import oracle.sql.STRUCT;
/**
*
* 資料型別轉換類
*
*/
@SuppressWarnings("deprecation")
public class QUEUE_MESSAGE_TYPE implements CustomDatum, CustomDatumFactory {
public static final String _SQL_NAME = "QUEUE_MESSAGE_TYPE";
public static final int _SQL_TYPECODE = OracleTypes.STRUCT;
MutableStruct _struct;
// 12表示字串
static int[] _sqlType = { 12 };
static CustomDatumFactory[] _factory = new CustomDatumFactory[1];
static final QUEUE_MESSAGE_TYPE _MessageFactory = new QUEUE_MESSAGE_TYPE();
public static CustomDatumFactory getFactory() {
return _MessageFactory;
}
public QUEUE_MESSAGE_TYPE() {
_struct = new MutableStruct(new Object[1], _sqlType, _factory);
}
public Datum toDatum(OracleConnection c) throws SQLException {
return _struct.toDatum(c, _SQL_NAME);
}
public CustomDatum create(Datum d, int sqlType) throws SQLException {
if (d == null)
return null;
QUEUE_MESSAGE_TYPE o = new QUEUE_MESSAGE_TYPE();
o._struct = new MutableStruct((STRUCT) d, _sqlType, _factory);
return o;
}
public String getContent() throws SQLException {
return (String) _struct.getAttribute(0);
}
}
3. 主類進行訊息處理
package org.kevin.jms;
import java.util.Properties;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.Session;
import oracle.jms.AQjmsAdtMessage;
import oracle.jms.AQjmsDestination;
import oracle.jms.AQjmsFactory;
import oracle.jms.AQjmsSession;
/**
*
* 訊息處理類
*
*/
public class Main {
public static void main(String[] args) throws Exception {
JmsConfig config = new JmsConfig();
QueueConnectionFactory queueConnectionFactory = AQjmsFactory.getQueueConnectionFactory(config.jdbcUrl,
new Properties());
QueueConnection conn = queueConnectionFactory.createQueueConnection(config.username, config.password);
AQjmsSession session = (AQjmsSession) conn.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
conn.start();
Queue queue = (AQjmsDestination) session.getQueue(config.username, config.queueName);
MessageConsumer consumer = session.createConsumer(queue, null, QUEUE_MESSAGE_TYPE.getFactory(), null, false);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
System.out.println("ok");
AQjmsAdtMessage adtMessage = (AQjmsAdtMessage) message;
try {
QUEUE_MESSAGE_TYPE payload = (QUEUE_MESSAGE_TYPE) adtMessage.getAdtPayload();
System.out.println(payload.getContent());
} catch (Exception e) {
e.printStackTrace();
}
}
});
Thread.sleep(1000000);
}
}
使用Oracle程式塊進行入隊操作,在沒有啟動Java時看到隊列表中存在資料。啟動Java後,控制檯正確的輸出的訊息;通過Oracle程式塊再次寫入訊息,發現控制檯正確處理訊息。Java的JMS監聽不是立刻進行處理,可能存在幾秒中的時間差,時間不等。
三、監控表記錄變化通知Java
下面的例子建立一個數據表,然後在表中新增觸發器,當資料變化後觸發器呼叫儲存過程給Oracle AQ傳送訊息,然後使用Java JMS對訊息進行處理。
1. 建立表
建立student表,包含username和age兩個子段,其中username時varchar2型別,age時number型別。
2. 建立儲存過程
建立send_aq_msg儲存過程,因為儲存過程中呼叫dbms資料包,系統包在儲存過程中執行需要進行授權(使用sys使用者進行授權):
grant execute on dbms_aq to c##kevin;
- 1
注意儲存過程中包含commit語句。
create or replace
PROCEDURE send_aq_msg (info IN VARCHAR2) as
r_enqueue_options DBMS_AQ.ENQUEUE_OPTIONS_T;
r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
v_message_handle RAW(16);
o_payload demo_queue_payload_type;
begin
o_payload := demo_queue_payload_type(info);
dbms_aq.enqueue(
queue_name => 'demo_queue',
enqueue_options => r_enqueue_options,
message_properties => r_message_properties,
payload => o_payload,
msgid => v_message_handle
);
commit;
end send_aq_msg;
3. 建立觸發器
在student表中建立觸發器,當資料寫入或更新時,如果age=18,則進行入隊操作。需要呼叫儲存過程傳送訊息,但觸發器中不能包含事物提交語句,因此需要使用pragma autonomous_transaction;宣告自由事物:
CREATE OR REPLACE TRIGGER STUDENT_TR
AFTER INSERT OR UPDATE OF AGE ON STUDENT FOR EACH ROW
DECLARE
pragma autonomous_transaction;
BEGIN
if :new.age = 18 then
send_aq_msg(:new.username);
end if;
END;
相關推薦
通過JMS監聽Oracle AQ,在資料苦表變化時觸發並執行Java程式
環境說明 本實驗環境基於Oracle 12C和JDK1.8,其中Oracle 12C支援多租戶特性,相較於之前的Oracle版本,使用‘C##使用者名稱‘表示使用者,例如如果資料庫使用者叫kevin,則登陸時使用C##kevin進行登陸。 一、Oracle高階訊息
JMS監聽Oracle AQ
- 該文件中,jdk版本1.8,java專案為maven構建的springboot專案,並使用了定時任務來做AQ監聽的重連功能,解決由於外部原因導致連線斷裂之後,需要手動重啟專案才能恢復連線的問題 - [github原始碼位置](https://github.com/wangqq1217/oracleAQ-
oracle 百萬條資料 update所有記錄時的 sql 執行效率問題
需求: 有一張臨時表 , 資料總數100w條, 其中 50w條 , state = 1 50w條 , state = 0 因為資料無用 ,
vue中通過watch監聽資料變化,帶來的效能優化
問題背景 為什麼要用 vuex? 在使用 Vue 進行元件化開發時,元件通訊是一個十分重要的部分。在 Vue 中,父子元件的關係可以總結為 父子元件通訊:父元件通過 props 向下傳遞資料給子元件 子父元件通訊:子元件通過 events 給父元件傳送訊息 使
Oracle資料庫監聽非常慢,基本hang住故障處理
測試人員郵件反饋: 訂購資料庫的連線非常慢,甚至是無法連線,想要我檢檢視看。 經檢視: [email protected]:~/app/admin/wdadb/adump> lsnrctl status LSNRCTL for Linux: Version
Android 通過註冊廣播,實時監聽網路連線與斷開狀態的變化
很多時候我們都需要實時監聽網路狀態,當網路狀態發生變化之後立即通知程式進行不同的操作。 監聽廣播的兩種方式: (1)在AndroidManifest.xml配置檔案中宣告 <receiver android:name=".NetworkConn
實戰Android:通過BroadcastReceiver監聽Home,電源Power,和音量變化Volume鍵
上一個例子是採用AccessibilityService來實現按鍵的監聽。這次我們採用BroadcastReceiver來完成按鍵的監聽。 缺點:我嘗試了一下,暫時還不知道如何停止按 鍵的預設行為,比如我確實監聽到了電源按鍵,但卻沒法阻止此刻螢幕變黑的行為。先在這記下。以後
電話狀態的監聽。響鈴,靜止,接起
tel mis one iss cal list ack res extend package com.sharpcj.telephonestatelistenerdemo; import android.content.Context; import android.
觀察者模式實際應用:監聽線程,意外退出線程後自動重啟
lee text 實時 之間 最終 ren tap instance and 摘要: 觀察者模式,定義對象之間的一種一對多的依賴關系,當對象的狀態發生改變時,所有依賴於它的對象都得到通知並且被自動更新。觀察者模式在JDK中有現成的實現,java.util.Obsera
ionic 監聽頁面滾動,點擊停止滾動
ret 有一個 state start lin 開始 sta 點擊 需要 類似今日頭條,頁面上有很多card,點擊每個card跳轉該card的詳情頁面。這裏有一個問題,當我滾動頁面時,會先後觸發touchstart、touchmove、touchend,但是當touchen
(二)僅僅通過Application監聽用戶行為及App的在線狀態和在線時長
活躍 and rem HR 再看 andro void put 初始 先要實現功能,還是先從API去找。看看有沒有你想要的。這裏其實就是監聽App內activity的狀態。怎麽辦? 給個API所在地址:http://www.android-doc.com/refer
監聽瀏覽器返回,pushState,popstate 事件,window.history對象
當前 rep www 簡單介紹 itl 成了 stat 直接 clas 在WebApp或瀏覽器中,會有點擊返回、後退、上一頁等按鈕實現自己的關閉頁面、調整到指定頁面、確認離開頁面或執行一些其它操作的需求。可以使用 popstate 事件進行監聽返回、後退、上一頁操作。 一、
list-server監聽資源修改,自動重新整理瀏覽器
名稱:list-server 參考地址: http://blog.csdn.net/alabadazi/article/details/53334161 作用:輕量級的僅適用於開發 的 node 伺服器, 它僅支援 web app, 它能夠為你開啟瀏覽器, 當你的html或是Java
數據庫安全檢查監聽是重點,設置監聽密碼
quit proto change rod 方法 1.2 uri tro oca Oracle 數據庫監聽的安全管理是比較容易忽略的一個問題,做一個測試禁用監聽的本地驗證功能,設置監聽密碼,數據庫版本為11.2.0.4 1、默認配置listener.ora LISTENER
Nginx同時監聽IPV6+IPV4,實現正向和反向
nginx.conf 配置如下 user root; worker_processes 8; #error_log /opt/server/department/nginx/logs/error.log warn; error_log /opt/server/department
vue 監聽滑動事件,是否滑動到dom元素的區域
//監聽滾動事件 window.addEventListener('scroll',that.handleScroll) //垂直滾動的值相容問題 let scrollTopE = window.pageYOffset || document.documentElement.scrollTop |
ReactNative Android監聽返回鍵,在某個頁面返回鍵退出應用
在之前專案中,在進行返回鍵退出應用時,應用的程式碼如下: componentWillMount() { if (Platform.OS === 'android') { BackHandler.addEventListener('hardwa
ionic 監聽頁面滾動,點選停止滾動
原文出處:https://www.cnblogs.com/lee-xiumei/p/7449021.html 類似今日頭條,頁面上有很多card,點選每個card跳轉該card的詳情頁面。這裡有一個問題,當我滾動頁面時,會先後觸發touchstart、touchmove、touchend,但是當
Python pygame,事件,監聽使用者事件,pygame.event.get()
demo.py(事件,監聽事件): import pygame pygame.init() # 初始化所有pygame模組 # 建立遊戲主視窗 480 * 700 screen = pygame.display.set_mode((480, 700)) # 建立
nginx監聽相同埠,根據域名請求不同的server
#同時監聽相同埠,可以通過匹配server_name 來決定最終匹配哪個server #server1 server { listen 80; #server_name localhost;