1. 程式人生 > >ActiveMQ 持久訂閱者,執行結果與初衷相違背,驗證離線訂閱者無效,問題解決

ActiveMQ 持久訂閱者,執行結果與初衷相違背,驗證離線訂閱者無效,問題解決

導讀

  最新在接觸ActiveMQ,裡面有個持久訂閱者模組,功能是怎麼樣也演示不出來效果。配置引數比較簡單(配置沒幾個引數),消費者第一次執行時,需要指定ClientID(此時Broker已經記錄離線訂閱者資訊),在啟動提供者,此時訊息佇列存在一條記錄,然後在啟動消費者,但是怎麼樣也獲取不到訊息,阿西吧~~~什麼鬼,百度上一大堆,都是這樣步驟,消費者端,指定以下ClientID就好了,可,想要的效果死活不出來。。。。。。

採坑之路

廢話不多說,先上程式碼,後面再分析

消費者端程式碼

    public void testTopicConsumer2() throws Exception {
        //第一步:建立ConnectionFactory
        String brokerURL = "tcp://192.168.31.215:61616";
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL);
        //第二步:通過工廠,建立Connection
        Connection connection = connectionFactory.createConnection();
        //設定持久訂閱的客戶端ID
        String clientId = "10086";
        connection.setClientID(clientId);
        //第三步:開啟連結
        connection.start();
        //第四步:通過Connection建立session
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //第五步:通過session建立Consumer
        Topic topic = session.createTopic("cyb-topic");

        //建立持久訂閱的消費者客戶端
        //第一個引數是指定Topic
        //第二個引數是自定義的ClientId
        MessageConsumer consumer = session.createDurableSubscriber(topic, clientId);
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                //第七步:處理資訊
                if (message instanceof TextMessage){
                    TextMessage tm=(TextMessage)message;
                    try{
                        System.out.println(tm.getText());
                    }
                    catch (Exception e){
                        e.printStackTrace();
                    }
                }
            }
        });
        //session.commit();
        //第八步:關閉資源
        consumer.close();
        session.close();
        connection.close();

    }

  只需要制定ClientID和建立持久客戶端即可

提供者端程式碼

   public void testTopicProducer() throws Exception {
        Connection connection = null;
        MessageProducer producer = null;
        Session session = null;
        try {
            //第一步:建立ConnectionFactory,用於連線broker
            String brokerURL = "tcp://192.168.31.215:61616";
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL);
            //設定
            //((ActiveMQConnectionFactory) connectionFactory).setProducerWindowSize(1000);
            //第二步:通過工廠,建立Connection
            connection = connectionFactory.createConnection();
            //第三步:連線啟動
            connection.start();
            //第四步:通過連接獲取session會話
            //第一個引數:是否啟用ActiveMQ事務,如果為true,第二個引數無用
            //第二個引數:應答模式,AUTO_ACKNOWLEDGE為自動應答
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //第五步:通過session建立destination,兩種目的地:Queue、Topic
            //引數:訊息佇列的名稱,在後臺管理系統中可以看到
            Topic topic = session.createTopic("cyb-topic");
            //第六步:通過session建立MessageProducer
            producer = session.createProducer(topic);
            //producer.setDeliveryMode(DeliveryMode.PERSISTENT);
            //第七步:建立Message
            //方式一
            //TextMessage message=new ActiveMQTextMessage();
            //message.setText("queue test");
            //方式二
            TextMessage message1 = session.createTextMessage("topic->部落格園地址:https://www.cnblogs.com/chenyanbin/");
            //第八步:通過producer傳送訊息
            producer.send(message1);
            //session.commit();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            //第九步:關閉資源
            producer.close();
            session.close();
            connection.close();
        }
    }

驗證離線訂閱者功能

失敗的驗證

正確的驗證方式

  首先明確一點,上面的程式碼是沒有一點問題的。為了節省時間,驗證步驟和上面的差不多,不啟動前兩步了,直接啟動第三步,也就是:

  1. 先啟動消費者(記錄持久訂閱者ClientID);
  2. 在啟動提供者;
  3. 啟動消費者(在下面加個死迴圈)

問題剖析

 

  第一次執行消費者時,此時Broker已經記錄訂閱者ClientID,然後程式一閃而過,進入到藍色框中的,離線訂閱者中,然後在執行提供者,此時,Topic中,已經入隊一次,再次執行消費者時,執行是非同步獲取的,執行一閃而過(鄙人猜測,可能是ActiveMQ機制問題,內部邏輯大概是,先遍歷非持久訂閱者,然後在檢視持久訂閱者,問題出在,程式執行太快,還沒到檢視持久訂閱者時,程式就執行完了,所以第二次執行消費者時,加了個死迴圈,不停監聽佇列訊息,具體ActiveMQ底層程式碼沒看過,有興趣的可以研究下,底層程式碼找到相應位置後,記得告訴我哦~~~)

 

  這個小問題,搗鼓一下午,百度上也說,就這2步驟配置即可,執行結果與初衷相違背,大半夜的都打算洗洗睡了,頭腦風暴想出來這個方法,在下面寫個死迴圈,不停監聽佇列訊息,這才有了這篇部落格,好啦...好啦,時間不早了,馬上都快凌晨1點鐘了,明個還得上班,洗洗睡了zZZZZZZZZZ