ActiveMQ 持久訂閱者,執行結果與初衷相違背,驗證離線訂閱者無效,問題解決
阿新 • • 發佈:2020-05-11
導讀
最新在接觸ActiveMQ,裡面有個持久訂閱者模組,功能是怎麼樣也演示不出來效果。配置引數比較簡單(配置沒幾個引數),消費者第一次執行時,需要指定ClientID(此時Broker已經記錄離線訂閱者資訊),在啟動提供者,此時訊息佇列存在一條記錄,然後在啟動消費者,但是怎麼樣也獲取不到訊息,阿西吧~~~什麼鬼,百度上一大堆,都是這樣步驟,消費者端,指定以下ClientID就好了,可,想要的效果死活不出來。。。。。。
採坑之路
廢話不多說,先上程式碼,後面再分析
消費者端程式碼
public void testTopicConsumer2() throws Exception { //第一步:建立ConnectionFactory String brokerURL = "tcp://192.168.31.215:61616"; ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL); //第二步:通過工廠,建立Connection Connection connection = connectionFactory.createConnection(); //設定持久訂閱的客戶端ID String clientId = "10086"; connection.setClientID(clientId); //第三步:開啟連結 connection.start(); //第四步:通過Connection建立session Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //第五步:通過session建立Consumer Topic topic = session.createTopic("cyb-topic"); //建立持久訂閱的消費者客戶端 //第一個引數是指定Topic //第二個引數是自定義的ClientId MessageConsumer consumer = session.createDurableSubscriber(topic, clientId); consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { //第七步:處理資訊 if (message instanceof TextMessage){ TextMessage tm=(TextMessage)message; try{ System.out.println(tm.getText()); } catch (Exception e){ e.printStackTrace(); } } } }); //session.commit(); //第八步:關閉資源 consumer.close(); session.close(); connection.close(); }
只需要制定ClientID和建立持久客戶端即可
提供者端程式碼
public void testTopicProducer() throws Exception { Connection connection = null; MessageProducer producer = null; Session session = null; try { //第一步:建立ConnectionFactory,用於連線broker String brokerURL = "tcp://192.168.31.215:61616"; ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL); //設定 //((ActiveMQConnectionFactory) connectionFactory).setProducerWindowSize(1000); //第二步:通過工廠,建立Connection connection = connectionFactory.createConnection(); //第三步:連線啟動 connection.start(); //第四步:通過連接獲取session會話 //第一個引數:是否啟用ActiveMQ事務,如果為true,第二個引數無用 //第二個引數:應答模式,AUTO_ACKNOWLEDGE為自動應答 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //第五步:通過session建立destination,兩種目的地:Queue、Topic //引數:訊息佇列的名稱,在後臺管理系統中可以看到 Topic topic = session.createTopic("cyb-topic"); //第六步:通過session建立MessageProducer producer = session.createProducer(topic); //producer.setDeliveryMode(DeliveryMode.PERSISTENT); //第七步:建立Message //方式一 //TextMessage message=new ActiveMQTextMessage(); //message.setText("queue test"); //方式二 TextMessage message1 = session.createTextMessage("topic->部落格園地址:https://www.cnblogs.com/chenyanbin/"); //第八步:通過producer傳送訊息 producer.send(message1); //session.commit(); } catch (Exception e) { e.printStackTrace(); } finally { //第九步:關閉資源 producer.close(); session.close(); connection.close(); } }
驗證離線訂閱者功能
失敗的驗證
正確的驗證方式
首先明確一點,上面的程式碼是沒有一點問題的。為了節省時間,驗證步驟和上面的差不多,不啟動前兩步了,直接啟動第三步,也就是:
- 先啟動消費者(記錄持久訂閱者ClientID);
- 在啟動提供者;
- 啟動消費者(在下面加個死迴圈)
問題剖析
第一次執行消費者時,此時Broker已經記錄訂閱者ClientID,然後程式一閃而過,進入到藍色框中的,離線訂閱者中,然後在執行提供者,此時,Topic中,已經入隊一次,再次執行消費者時,執行是非同步獲取的,執行一閃而過(鄙人猜測,可能是ActiveMQ機制問題,內部邏輯大概是,先遍歷非持久訂閱者,然後在檢視持久訂閱者,問題出在,程式執行太快,還沒到檢視持久訂閱者時,程式就執行完了,所以第二次執行消費者時,加了個死迴圈,不停監聽佇列訊息,具體ActiveMQ底層程式碼沒看過,有興趣的可以研究下,底層程式碼找到相應位置後,記得告訴我哦~~~)
這個小問題,搗鼓一下午,百度上也說,就這2步驟配置即可,執行結果與初衷相違背,大半夜的都打算洗洗睡了,頭腦風暴想出來這個方法,在下面寫個死迴圈,不停監聽佇列訊息,這才有了這篇部落格,好啦...好啦,時間不早了,馬上都快凌晨1點鐘了,明個還得上班,洗洗睡了zZZZZZZZZZ