1. 程式人生 > >ActiveMQ學習總結------原生實戰操作(下)03

ActiveMQ學習總結------原生實戰操作(下)03

本篇將繼續延續上一篇的內容,作為知識補充篇,為接下來我們學習spring整合ActiveMQ打好基礎

本篇主要學習內容:

  1.ActiveMQ 佇列服務監聽

  2.ActiveMQ Topic模型


 

回顧下上一篇ActiveMQ學習總結我們學習到了:

  1.ActiveMQ術語及API介紹

  2.ActiveMQ 文字訊息處理

  3.ActiveMQ 物件訊息處理

相信大現在對ActiveMQ的一些簡單操作已經很輕鬆掌握了

上一篇文章地址:https://www.cnblogs.com/arebirth/p/activemq02.html


 

 

一 ActiveMQ實現佇列服務監聽

在我們上一篇的練習中,所有的消費者都是接收一次訊息即斷開連線,這樣是不是很不方便。

試想一下,如果我們的provider在consumer接收完第一條訊息後又繼續傳送了一條訊息,那麼consumer已經斷開連線了,是不是就不能連線不間斷的實時獲取訊息?

解決方案:

  很容易,用我們的佇列服務監聽即可

 

注*:根據上一章的學習,大家對環境搭建使用配置,肯定都已經相當清楚了,這裡就不過多闡述,直接進行程式碼實戰

 

1 訊息生產者

相比之下,我麼你的生產者照之前是沒有任何變化的,主要的變化還是在cosumer身上

package cn.arebirth.mq;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class ActiveMQQueueListenerProducer {
    public static void sendTextActiveMq(String txt) {
        //定義連結工廠
        ConnectionFactory connectionFactory = null;

        //定義連結物件
        Connection connection = null;

        //定義會話
        Session session = null;

        //目的地
        Destination destination = null;

        //定義訊息的傳送者
        MessageProducer producer = null;

        //定義訊息
        Message message = null;

        try {
            //建立連結工廠
            connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://169.254.18.20:61616");

            //建立連結誒物件
            connection = connectionFactory.createConnection();

            //啟動連結
            connection.start();

            //建立會話
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

            //建立目的地
            destination = session.createQueue("queue-listener");

            //建立訊息生產者
            producer = session.createProducer(destination);

            //建立訊息物件
            message = session.createTextMessage(txt);

            //傳送訊息
            producer.send(message);
        } catch (Exception ex) {
            ex.printStackTrace();
        } finally {
            //回收資源
            if (producer != null) {
                try {
                    producer.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
            if (session != null) {
                try {
                    session.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

 

2 訊息消費者

package cn.arebirth.mq;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class ActiveMQQueueListenerConsumer {
    public static void receiveTextActiveMq() {
        // 定義連結工廠
        ConnectionFactory connectionFactory = null;
        // 定義連結物件
        Connection connection = null;
        // 定義會話
        Session session = null;
        // 目的地
        Destination destination = null;
        // 定義訊息的傳送者
        MessageConsumer consumer = null;
        // 定義訊息
        Message message = null;

        try {
            //建立連結工廠
            connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://169.254.18.20:61616");

            //建立連結物件
            connection = connectionFactory.createConnection();

            //啟動連結
            connection.start();

            //建立會話
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

            //建立目的地
            destination = session.createQueue("queue-listener");

            //建立訊息消費者
            consumer = session.createConsumer(destination);

            //佇列服務監聽
            consumer.setMessageListener(new MessageListener() {
                //ActiveMQ回撥方法。通過該方法將訊息傳遞到consumer
                @Override
                public void onMessage(Message message) {
                    //處理訊息
                    String msg = null;
                    try {
                        msg = ((TextMessage) message).getText();
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                    System.out.println("Producer say:" + msg);
                }
            });
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }
}

 

3 測試

3.1 provider測試

package cn.arebirth.mq;

public class ProducerTest {
    public static void main(String[] args) {
        ActiveMQQueueListenerProducer.sendTextActiveMq("Hello,consumer!");
    }
}

觀察我們的控制檯可以發現已經成功釋出到佇列

 

 

 

3.2 consumer測試

package cn.arebirth.mq;

public class ConsumerTest {
    public static void main(String[] args) {
        ActiveMQQueueListenerConsumer.receiveTextActiveMq();
    }
}

我們執行後可以發現,它接收到了訊息,但是它的程序並沒有關閉,

 

 

我們用provider繼續釋出一條訊息,看看consumer能不能接收到

 

 

可以看到,consumer持續在後臺監聽我們釋出的訊息,

 

 

 

 

 

 

通過上面程式碼,不難發現,provider沒有任何改動,只是consumer修改了一部分

通過呼叫匿名內部類的方法來實現持續監聽

 consumer.setMessageListener(new MessageListener() {
    @Override
                public void onMessage(Message message) {

        }
    
}

注意:因為涉及到佇列持續監聽,所以我們不能在finally處給資源回收,否則還在監聽狀態,資源都回收沒了,也就無從監聽啦。

 


 

 

二 Topic模型

在本系列文章第一篇也有介紹過一些Topic模型的概念,那麼這裡我們將以原理+實戰的方式來帶領大家掌握

 

1 Publish/Subscribe處理模式(Topic)

訊息生產者(釋出)訊息到topic中,同時有多個訊息消費者(訂閱)消費該訊息。

和點對點方式不同,釋出到Topic的訊息會被所有的訂閱者消費,而點對點的只能是指定的消費者去消費

當生產者釋出訊息,不管是否有消費者,都不會儲存訊息,也就是說它是發完就啥也不管了那種,

所以要注意:一定要有消費者,然後在有生產者,否則生產者不發完訊息什麼也不管了,你消費者在生產者之後才有,那麼你是接收不到訊息的。

 

接下來我們就以實戰的方式鼓搗下。

 

2 建立生產者

package cn.arebirth.mq;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class ActiveMQTopicProducer {
    public static void sendTextActiveMQ(String txt){
        //定義連結工廠
        ConnectionFactory connectionFactory = null;

        //定義連結物件
        Connection connection = null;

        //定義會話
        Session session = null;

        //目的地
        Destination destination = null;

        //定義訊息的傳送者
        MessageProducer producer = null;

        //定義訊息
        Message message = null;

        try {
            //建立連結工廠
            connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://169.254.18.20:61616");

            //建立連結誒物件
            connection = connectionFactory.createConnection();

            //啟動連結
            connection.start();

            //建立會話
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

            //建立目的地
            destination = session.createTopic("topic-test");

            //建立訊息生產者
            producer = session.createProducer(destination);

            //建立訊息物件
            message = session.createTextMessage(txt);

            //傳送訊息
            producer.send(message);
        } catch (Exception ex) {
            ex.printStackTrace();
        } finally {
            //回收資源
            if (producer != null) {
                try {
                    producer.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
            if (session != null) {
                try {
                    session.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

我們可以發現,在建立目的地destination的時候程式碼有了變動

destination = session.createTopic("topic-test");

變成了createTopic,對這就是topic模式了。

 

3 建立消費者

package cn.arebirth.mq;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class ActiveMQTopicConsumer implements Runnable {


    public static void receiveTextActiveMQ(String threadName) {
        // 定義連結工廠
        ConnectionFactory connectionFactory = null;
        // 定義連結物件
        Connection connection = null;
        // 定義會話
        Session session = null;
        // 目的地
        Destination destination = null;
        // 定義訊息的傳送者
        MessageConsumer consumer = null;
        // 定義訊息
        Message message = null;

        try {
            //建立連結工廠
            connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://169.254.18.20:61616");

            //建立連結物件
            connection = connectionFactory.createConnection();

            //啟動連結
            connection.start();

            //建立會話
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

            //建立目的地
            destination = session.createTopic("topic-test");

            //建立訊息的消費者
            consumer = session.createConsumer(destination);

            //服務監聽
            consumer.setMessageListener(new MessageListener() {
                //ActiveMQ回撥方法。通過該方法將訊息傳遞到consumer
                @Override
                public void onMessage(Message message) {
                    //處理訊息
                    String msg = null;
                    try {
                        msg = ((TextMessage) message).getText();
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                    System.out.println(threadName + "--Producer say:" + msg);
                }
            });
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }

    @Override
    public void run() {
        receiveTextActiveMQ(Thread.currentThread().getName());
    }
}

 

我們可以發現,在建立目的地destination的時候程式碼有了變動

destination = session.createTopic("topic-test");

還有實現了Runnable這個是為了一會測試的時候,多執行緒啟動,看效果,是否多個都會接受到,(如果看著糊塗的話,你也可以去掉執行緒的部分,單獨複製多個物件,並啟動,效果也是一樣的)

 

4 測試(要先啟動消費者,否則消費者是接收不到訊息的!當然,你自己可以試一下)

4.1 測試消費者

package cn.arebirth.mq;

public class ConsumerTest {
    public static void main(String[] args) {
        ActiveMQTopicConsumer a1 = new ActiveMQTopicConsumer();
        Thread t1 = new Thread(a1,"a1");

        ActiveMQTopicConsumer a2 = new ActiveMQTopicConsumer();
        Thread t2 = new Thread(a2,"a2");

        ActiveMQTopicConsumer a3 = new ActiveMQTopicConsumer();
        Thread t3 = new Thread(a3,"a3");

        t1.start();
        t2.start();
        t3.start();
    }
}

 

可以看到,我們的消費者已經啟動了,三個執行緒。並以監聽服務的方式啟動

 

 

4.2 測試生產者

package cn.arebirth.mq;

public class ProducerTest {
    public static void main(String[] args) {
        ActiveMQTopicProducer.sendTextActiveMQ("hello,topic");
    }
}

 

可以看到,在topics下面,我們釋出的內容已經有記錄了

 

 

 

然後我們在看下,我們的consumer

 

 

 

可以發現,三個consumer都已經接收到了

 

ps:

  如果你對ActiveMQ原理性的東西感到困惑,可以看下我們前面的文章:https://www.cnblogs.com/arebirth/p/activemq01.html

&n