1. 程式人生 > >JMS監聽Oracle AQ

JMS監聽Oracle AQ

- 該文件中,jdk版本1.8,java專案為maven構建的springboot專案,並使用了定時任務來做AQ監聽的重連功能,解決由於外部原因導致連線斷裂之後,需要手動重啟專案才能恢復連線的問題 - [github原始碼位置](https://github.com/wangqq1217/oracleAQ-Jms.git) - [gitee原始碼位置](https://gitee.com/yanlengtong/oracleAQ-Jms.git) # 一、建立佇列 ## 1.1.管理員登入執行 - 管理員登入,執行授權操作,oracle使用佇列需要單獨的授權,預設未開啟,須手動開啟,授權命令如下,username使用自己的使用者名稱即可 ```sql GRANT EXECUTE ON SYS.DBMS_AQ to 'username'; GRANT EXECUTE ON SYS.DBMS_AQADM to 'username'; GRANT EXECUTE ON SYS.DBMS_AQ_BQVIEW to 'username'; GRANT EXECUTE ON SYS.DBMS_AQIN to 'username'; GRANT EXECUTE ON SYS.DBMS_JOB to 'username'; ``` ## 1.2.使用者登入執行執行 ### 1.2.1. 建立訊息負荷payload - 建立的此type用來封裝佇列所帶的,根據實際需求進行建立 ```sql CREATE OR REPLACE TYPE TYPE_QUEUE_INFO AS OBJECT ( param_1 VARCHAR2(100), param_2 VARCHAR2(100) ) ``` ### 1.2.2. 建立隊列表 - 建立對列表,並指定佇列資料的型別,隊列表名自定義即可,資料型別使用上面剛建立的type ```sql begin sys.dbms_aqadm.create_queue_table( queue_table => 'QUEUE_TABLE', queue_payload_type => 'TYPE_QUEUE_INFO', sort_list => 'ENQ_TIME', compatible => '10.0.0', primary_instance => 0, secondary_instance => 0); end; ``` ### 1.2.3. 建立佇列並啟動 - 建立名稱為QUEUE_TEST的佇列,並指定對列表名【同一個oracle使用者下,可以有多個對列表,同一個對列表中,可以有多個佇列】 ```sql begin sys.dbms_aqadm.create_queue( queue_name => 'QUEUE_TEST', queue_table => 'QUEUE_TABLE', queue_type => sys.dbms_aqadm.normal_queue, max_retries => 5, retry_delay => 0, retention_time => 0); end; ``` - 剛建立的佇列的狀態預設是未開啟的,需要手動開啟一下,同理,存在刪除、停止等操作 ```sql begin -- 啟動佇列 sys.dbms_aqadm.start_queue( queue_name => 'QUEUE_TEST' ); -- 暫停佇列 --sys.dbms_aqadm.STOP_QUEUE( -- queue_name => 'QUEUE_TEST' --); -- 刪除佇列 --sys.dbms_aqadm.DROP_QUEUE( -- queue_name => 'QUEUE_TEST' --); -- 刪除對列表 --sys.dbms_aqadm.DROP_QUEUE_TABLE( -- queue_table => 'QUEUE_TABLE' --); end; ``` ### 1.2.4. 建立儲存過程 - 儲存過程的作用為把資料載入到佇列中,生成的新的佇列會自動新增進繫結的對列表中,等待消費者進行消費 ```sql CREATE OR REPLACE PROCEDURE pro_queue(param_1 VARCHAR2, param_2 VARCHAR2) as r_enqueue_options DBMS_AQ.ENQUEUE_OPTIONS_T; r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T; v_message_handle RAW(16); o_payload TYPE_QUEUE_INFO; begin -- 封裝最終訊息 o_payload := TYPE_QUEUE_INFO(param_1, param_2); -- 入隊操作,指定佇列 dbms_aq.enqueue(queue_name => 'QUEUE_TEST', enqueue_options => r_enqueue_options, message_properties => r_message_properties, payload => o_payload, msgid => v_message_handle); -- 出隊操作 --dbms_aq.enqueue(queue_name => 'QUEUE_TEST', -- dequeue_options => r_dequeue_options, -- message_properties => r_message_properties, -- payload => o_payload, -- msgid => v_message_handle); end pro_queue; ``` # 二、Java中JMS的使用 ## 2.1. 專案配置 ### 2.1.1. maven ```xml com.oracle
jmscommon 1.2
com.oracle orai18n 1.2 com.oracle jta 1.2 com.oracle aqapi_g 1.2 ``` ### 2.1.2. yml ~~~yaml spring: datasource: url: jdbc:oracle:thin:@ip:port/sid username: ** password: ** queue: aq: # 該佇列是否可用,用來控制佇列的載入和重連,不可省略 enable: true # 佇列名稱,不可省略 name: QUEUE_TEST # 佇列重連的定時任務對應的時間表達式,不可省略 cron: 0 */1 * * * ? ~~~ ## 2.2. AQ初始化 - 在專案啟動結束後立即執行此類,會根據所配置的佇列名稱監聽對應的佇列 ```java package com.wangqq.jms; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.stereotype.Component; /** * @Title: MessageAQInit.java * @Description: AQ 初始化 * @author wangqq * @date 2020年6月28日 下午3:45:23 * @version 1.0 */ @Component public class MessageAQInit implements CommandLineRunner { @Autowired private MessageAQConfig aqConfig; @Autowired private MessageAQListener listener; @Override public void run(String... args) throws RuntimeException { // 檢查訊息佇列是否啟用 if (aqConfig.enable) { // 設定AQ的訊息監聽器 MessageAQConnection.setListener(listener); // 初始化AQ連線 if (!MessageAQConnection.initFactory(aqConfig)) { throw new RuntimeException("Message Oracle AQ initialization failed!"); } // 建立連線 if (!MessageAQConnection.establishConnection(aqConfig)) { throw new RuntimeException("Message Oracle AQ connection failed!"); } } } } ``` ## 2.3. 配置資訊類 - 配置類,將yml的配置檔案轉為java物件【時間表達式在程式碼中不會以物件屬性的方式被使用,因此在該類中沒有設定】 ~~~java package com.wangqq.jms; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; /** * @Title: MessageAQConfig.java * @Description: ORACLE 訊息佇列配置 * @author wangqq * @date 2020年6月28日 下午3:36:08 * @version 1.0 */ @Component public class MessageAQConfig { /** 是否開啟MessageAq功能 */ @Value("${queue.aq.enable}") public Boolean enable; /** 資料庫使用者名稱 */ @Value("${spring.datasource.username}") public String userName; /** 資料庫密碼 */ @Value("${spring.datasource.password}") public String password; /** 資料庫地址url */ @Value("${spring.datasource.url}") public String url; /** 佇列名稱 */ @Value("${queue.aq.name}") public String queue; } ~~~ ## 2.4. AQ 連線工廠類 - AQ 連結的核心類,根據配置物件以及注入的監聽物件,動態監聽AQ佇列 ~~~java package com.wangqq.jms; import javax.jms.Queue; import javax.jms.Session; import lombok.extern.slf4j.Slf4j; import oracle.jms.AQjmsConnection; import oracle.jms.AQjmsConnectionFactory; import oracle.jms.AQjmsConsumer; import oracle.jms.AQjmsSession; /** * @Title: MessageAQConnection.java * @Description: AQ 連線 * @author wangqq * @date 2020年6月28日 下午3:50:32 * @version 1.0 */ @Slf4j public class MessageAQConnection { private static AQjmsConnectionFactory aQjmsConnectionFactory; private static AQjmsConsumer aQjmsConsumer; private static AQjmsSession aQjmsSession; private static AQjmsConnection aQjmsConnection; private static MessageAQListener listener; /** * 設定JMS監聽器 * * @param messageAqJmsListener * @author wangqq * @date 2020年7月6日 上午8:33:57 */ public static void setListener(MessageAQListener messageAqJmsListener) { listener = messageAqJmsListener; } /** * 初始化 AQ 連線 Factory * * @param aqConfig 訊息佇列配置 * @return 是否成功 */ public static boolean initFactory(MessageAQConfig aqConfig) { try { aQjmsConnectionFactory = new AQjmsConnectionFactory(); aQjmsConnectionFactory.setJdbcURL(aqConfig.url); aQjmsConnectionFactory.setUsername(aqConfig.userName); aQjmsConnectionFactory.setPassword(aqConfig.password); return true; } catch (Exception e) { log.error(e.getMessage(), e); return false; } } /** * 連線訊息佇列 * * @param aqConfig 訊息佇列配置 * @return 是否成功 */ public static boolean establishConnection(MessageAQConfig aqConfig) { try { aQjmsConnection = (AQjmsConnection) aQjmsConnectionFactory.createConnection(); aQjmsSession = (AQjmsSession) aQjmsConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); aQjmsConnection.start(); Queue queue = aQjmsSession.getQueue(aqConfig.userName, aqConfig.queue); aQjmsConsumer = (AQjmsConsumer) aQjmsSession.createConsumer(queue, null, MessageORAData.getFactory(), null, false); aQjmsConsumer.setMessageListener(listener); return true; } catch (Exception e) { log.error(e.getMessage(), e); return false; } } /** * 關閉訊息佇列連線 * * @return 是否成功 */ public static boolean closeConnection() { try { aQjmsConsumer.close(); aQjmsSession.close(); aQjmsConnection.close(); return true; } catch (Exception e) { log.error(e.getMessage(), e); return false; } } } ~~~ ## 2.5. 建立AQ 資料承載類 - 用來接收oracle佇列中所帶的引數,基本保證與資料庫中的type格式相同即可 ~~~java package com.wangqq.bean; import lombok.Builder; import lombok.Data; /** * @Title: Test.java * @Description: AQ 資料承載類 * @author wangqq * @date 2021-01-20 16:19:16 * @version 1.0 */ @Data @Builder public class Test { private String param_1; private String param_2; } ~~~ ## 2.6. 資料型別轉換 - 將oracleAq所承載的資料,轉化為我們自己需要的例項物件,及上述中的Test物件 ~~~java package com.wangqq.jms; import java.sql.Connection; import java.sql.SQLException; import java.sql.Struct; import com.wangqq.bean.Test; import lombok.NoArgsConstructor; import lombok.extern.slf4j.Slf4j; import oracle.sql.Datum; import oracle.sql.ORAData; import oracle.sql.ORADataFactory; /** * @Title: MessageORAData.java * @Description: 資料型別轉換類 * @author synjones * @date 2018年12月3日 上午11:29:50 * @version 1.0 */ @Slf4j @NoArgsConstructor public class MessageORAData implements ORAData, ORADataFactory { private Object[] rawData = new Object[8]; private static final MessageORAData MESSAGE_FACTORY = new MessageORAData(); public static ORADataFactory getFactory() { return MESSAGE_FACTORY; } @Override public ORAData create(Datum datum, int sqlType) throws SQLException { if (datum == null) { return null; } else { try { MessageORAData payOraData = new MessageORAData(); Struct aStruct = (Struct) datum; payOraData.rawData = aStruct.getAttributes(); return payOraData; } catch (Exception e) { log.error(e.getMessage(), e); return null; } } } @Override public Datum toDatum(Connection arg0) throws SQLException { return null; } /** * 訊息內容解析並封裝 * * @return * @author wangqq * @date 2020年7月6日 上午8:38:01 */ public Test getContent() { try { return Test.builder() .param_1(rawData[0] == null ? null : rawData[0].toString()) .param_2(rawData[0] == null ? null : rawData[0].toString()) .build(); } catch (Exception e) { log.error(e.getMessage(), e); return null; } } } ~~~ ## 2.7. AQ 監聽 ~~~java package com.wangqq.jms; import javax.jms.Message; import javax.jms.MessageListener; import org.springframework.stereotype.Component; import com.wangqq.bean.Test; import lombok.extern.slf4j.Slf4j; import oracle.jms.AQjmsAdtMessage; /** * @Title: JMSListener.java * @Description: JMS監聽ORACLEAQ的佇列訊息 * @author wangqq * @date 2020年6月28日 上午11:23:42 * @version 1.0 */ @Slf4j @Component public class MessageAQListener implements MessageListener { @Override public void onMessage(Message message1) { AQjmsAdtMessage adtMessage = (AQjmsAdtMessage)message1; try { MessageORAData payload = (MessageORAData)adtMessage.getAdtPayload(); // 獲取訊息內容 Test test = payload.getContent(); System.out.println(test.toString()); } catch (Exception e) { log.error(e.getMessage(), e); } } } ~~~ ## 2.8. AQ 監控任務, 在AQ斷開後重連 - 通過定時任務,定時查詢是否有入隊時間在5分鐘之內的佇列未被消費【佇列入隊後,會在對列表中產生一條資料,消費之後該資料會被清除掉】,若存在,則說明監聽異常,需要重新建立連線監聽佇列 - 資料庫對列表中的入隊時間在本次測試中為0時區的時間,故而在程式碼中轉換了一下時區,否則無法根據入隊時間查詢資料 ~~~java package com.wangqq.jms; import java.util.Date; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import com.wangqq.mapper.MessageAqMapper; import com.wangqq.util.DateUtil; import lombok.extern.slf4j.Slf4j; /** * @Title: MessageAQMonitor.java * @Description: AQ 監控任務, 在AQ斷開後重連 * @author wangqq * @date 2020年6月28日 下午4:35:31 * @version 1.0 */ @Slf4j @Component public class MessageAQMonitor { @Autowired private MessageAQConfig aqConfig; @Autowired private MessageAqMapper aqMapper; @Scheduled(cron = "${queue.aq.cron}") private void monitorJob() { // 檢查訊息佇列是否啟用 if (!aqConfig.enable) { return; } // 獲取當前時間,並向前推5分鐘 String formatDateTime = DateUtil.formatDate(new Date(System.currentTimeMillis() - 300000)); // 將該時間轉為0時區的時間【資料庫中儲存的佇列時間為0時區的時間】 String zeroZoneTime = DateUtil.timeConvert(formatDateTime, "+08:00", "+00:00", "yyyy-MM-dd HH:mm:ss"); // 查詢是否存在5分鐘以前的佇列未被消費 int selectCount = aqMapper.selectCount(aqConfig.queue, zeroZoneTime); if (selectCount != 0) { // 若存在,則重新啟動監聽 if (MessageAQConnection.closeConnection()) { log.info("-->
AQ connection has been closed."); if (MessageAQConnection.establishConnection(aqConfig)) { log.info("--> AQ connection has been re-established."); } } } } } ~~~ ## 2.9. 隊列表中佇列數量的查詢 - 根據佇列名稱和入隊時間,查詢在入隊時間之後入對的佇列數量 ~~~java package com.wangqq.mapper; import org.apache.ibatis.annotations.Mapper; import org.apache.ibatis.annotations.Select; /** * @Title: MessageAqMapper.java * @Description: oracleAQ的查詢 * @author wangqq * @date 2020年6月28日 下午4:04:50 * @version 1.0 */ @Mapper public interface MessageAqMapper { /** * * 查詢資料庫中的隊列表中符合條件的佇列的條數 * * @param qName 佇列名稱 * @param minDatetime 佇列入隊的最小時間 * @return * @author wangqq * @date 2020-07-10 15:44:43 */ @Select("select count(msgid) from T_QUEUE_TABLE t where t.q_name = #{qName,jdbcType=VARCHAR} " + "and to_char(cast(t.enq_time AS DATE), 'yyyy-MM-dd HH24:mi:ss') < #{minDatetime,jdbcType=VARCHAR}") int selectCount(String qName, String minDatetime); } ~~~ ## 2.10. 日期工具類 ```java package com.wangqq.util; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Date; import java.util.TimeZone; /** * @Title: DateUtil.java * @Description: 日期工具類 * @author wangqq * @date 2018年10月29日 下午5:27:21 * @version 1.0 */ public class DateUtil { /** * 字串轉date,預設格式yyyy-MM-dd HH:mm:ss * * @param source * @return */ public static Date parseDate(String source) { return parseDate(source, "yyyy-MM-dd HH:mm:ss"); } /** * 字串轉date * * @param source * @param pattern * 格式 * @return */ public static Date parseDate(String source, String pattern) { if (source == null || source.equals("")) { return null; } SimpleDateFormat sdf = new SimpleDateFormat(pattern); try { return sdf.parse(source); } catch (ParseException e) { e.printStackTrace(); return null; } } /** * 格式化日期,預設格式yyyy-MM-dd HH:mm:ss * * @param date * @return */ public static String formatDate(Date date) { return formatDate(date, "yyyy-MM-dd HH:mm:ss"); } /** * 格式化日期 * * @param date * @param pattern * 格式 * @return */ public static String formatDate(Date date, String pattern) { if (date == null) { return null; } SimpleDateFormat sdf = new SimpleDateFormat(pattern); return sdf.format(date); } /** * 時區 時間轉換方法:將傳入的時間(可能為其他時區)轉化成目標時區對應的時間 * @param sourceTime 時間格式必須為:yyyy-MM-dd HH:mm:ss * @param sourceId 入參的時間的時區id 比如:+08:00 * @param targetId 要轉換成目標時區id 比如:+09:00 * @param reFormat 返回格式 預設:yyyy-MM-dd HH:mm:ss * @return string 轉化時區後的時間 */ public static String timeConvert(String sourceTime, String sourceId, String targetId,String reFormat){ //校驗入參是否合法 if (null == sourceId || "".equals(sourceId) || null == targetId || "".equals(targetId) || null == sourceTime || "".equals(sourceTime)){ return null; } if(reFormat == null || "".equals(reFormat)){ reFormat = "yyyy-MM-dd HH:mm:ss"; } //校驗 時間格式必須為:yyyy-MM-dd HH:mm:ss String reg = "^[0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2}$"; if (!sourceTime.matches(reg)){ return null; } try{ //時間格式 SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); //根據入參原時區id,獲取對應的timezone物件 TimeZone sourceTimeZone = TimeZone.getTimeZone("GMT"+sourceId); //設定SimpleDateFormat時區為原時區(否則是本地預設時區),目的:用來將字串sourceTime轉化成原時區對應的date物件 df.setTimeZone(sourceTimeZone); //將字串sourceTime轉化成原時區對應的date物件 java.util.Date sourceDate = df.parse(sourceTime); //開始轉化時區:根據目標時區id設定目標TimeZone TimeZone targetTimeZone = TimeZone.getTimeZone("GMT"+targetId); //設定SimpleDateFormat時區為目標時區(否則是本地預設時區),目的:用來將字串sourceTime轉化成目標時區對應的date物件 df.setTimeZone(targetTimeZone); //得到目標時間字串 String targetTime = df.format(sourceDate); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); java.util.Date date = sdf.parse(targetTime); sdf = new SimpleDateFormat(reFormat); return sdf.format(date); } catch (ParseException e){ e.printStackTrace(); } return null;