1. 程式人生 > >ActiveMQ訊息傳遞的兩種方式

ActiveMQ訊息傳遞的兩種方式

1.什麼是ActiveMQ?

  ActiveMQ是apache提供的開源的,實現訊息傳遞的一箇中間外掛,可以和spring整合,是目前最流行的開源訊息匯流排,ActiveMQ是一個完全支援JMS1.1和J2EE 1.4規範的 JMS Provider實現。較相似的還有rabbitMQ和kafka等,都是最為訊息傳遞的外掛

2.ActiveMQ傳遞訊息的兩種方式

前提:需要引入activemq的jar包

點對點方式(PTP):一個消費者對應一個生產者

釋出/訂閱模式(Publish/Sub):一個生產者產生訊息傳送後,可以被多個消費者進行接收。

JMS定義了五種訊息正文格式,以及訊息的呼叫型別,允許傳送和接收一些不同型別的資料,提供現有訊息格式的一些級別的相容性。

StreamMessage:--JAVA原始的資料流

TextMessage:一個字串物件

ObjectMessage:一個系列化的java物件

BytesMessage:一個位元組物件

MapMessage:key/value方式的鍵值對

(1)點對點的方式(PTP)

  即:一個訊息的生產者對應一個消費者

生產者(Producer)實現步驟:

第一步:建立一個ConnectionFactory物件,將服務端activemq的 ip 和 port 作為構造引數傳遞

第二步:通過第一步建立的工廠物件獲得連線物件Connection

第三步:開啟連線,直接呼叫connection物件的start方法即可

第四步:建立一個Session物件,通過connection物件建立

第五步:通過Session物件建立一個Destination物件(該物件有兩種方式:topic和quene),這裡使用quene

第六步:通過Session物件建立一個生產者Producer物件

第七步:建立Message物件,這裡使用TextMessage物件,設定訊息內容

第八步:使用建立的生產者物件Producer傳送訊息

第九步:關閉資源(Producer物件,Connection物件,Session物件)

複製程式碼
@Test
    public void testQueueProducer() throws Exception {
        // 第一步:建立ConnectionFactory物件,需要指定服務端ip及埠號。
        
//brokerURL伺服器的ip及埠號 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://ip地址:61616"); // 第二步:使用ConnectionFactory物件建立一個Connection物件。 Connection connection = connectionFactory.createConnection(); // 第三步:開啟連線,呼叫Connection物件的start方法。 connection.start(); // 第四步:使用Connection物件建立一個Session物件。 //第一個引數:是否開啟事務。true:開啟事務,第二個引數忽略。 //第二個引數:當第一個引數為false時,才有意義。訊息的應答模式。1、自動應答2、手動應答。一般是自動應答。 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 第五步:使用Session物件建立一個Destination物件(topic、queue),此處建立一個Queue物件。 //引數:佇列的名稱。 Queue queue = session.createQueue("test-queue"); // 第六步:使用Session物件建立一個Producer物件。 MessageProducer producer = session.createProducer(queue); // 第七步:建立一個Message物件,建立一個TextMessage物件。 /*TextMessage message = new ActiveMQTextMessage(); message.setText("hello activeMq,this is my first test.");*/ TextMessage textMessage = session.createTextMessage("hello activeMq,this is my first test."); // 第八步:使用Producer物件傳送訊息。 producer.send(textMessage); // 第九步:關閉資源。 producer.close(); session.close(); connection.close(); }
複製程式碼

消費者實現:

第一步:建立一個ConnectionFactory物件,將服務端activemq的 ip 和 port 作為構造引數傳遞

第二步:通過第一步建立的工廠物件獲得連線物件Connection

第三步:開啟連線,直接呼叫connection物件的start方法即可

第四步:建立一個Session物件,通過connection物件建立

第五步:建立一個Destination物件,使用quene,需要和生產者的quene一致

第六步:建立一個消費者物件

第七步:接收訊息

第八步:列印接收的訊息

第九步:關閉資源

消費者的程式碼:

複製程式碼
@Test
    public void testQueueConsumer() throws Exception {
        // 第一步:建立一個ConnectionFactory物件。
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.168:61616");
        // 第二步:從ConnectionFactory物件中獲得一個Connection物件。
        Connection connection = connectionFactory.createConnection();
        // 第三步:開啟連線。呼叫Connection物件的start方法。
        connection.start();
        // 第四步:使用Connection物件建立一個Session物件。
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 第五步:使用Session物件建立一個Destination物件。和傳送端保持一致queue,並且佇列的名稱一致。
        Queue queue = session.createQueue("test-queue");
        // 第六步:使用Session物件建立一個Consumer物件。
        MessageConsumer consumer = session.createConsumer(queue);
        // 第七步:接收訊息。
        consumer.setMessageListener(new MessageListener() {
            
            @Override
            public void onMessage(Message message) {
                try {
                    TextMessage textMessage = (TextMessage) message;
                    String text = null;
                    //取訊息的內容
                    text = textMessage.getText();
                    // 第八步:列印訊息。
                    System.out.println(text);
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
        //等待鍵盤輸入
        System.in.read();
        // 第九步:關閉資源
        consumer.close();
        session.close();
        connection.close();
    }
複製程式碼

(2)訂閱釋出方式傳遞訊息:Topic  

補充:由於topic傳遞訊息的特點是,一個生產者可以有多個消費者,生產者生產的訊息在沒有被消費者消費之前,並不會將訊息持久化到activemq的服務端,傳送的訊息會自動消失。所以 測試的時候需要先建立消費者物件,然後在傳送訊息,防止訊息丟失。

生產者實現步驟:

步驟和PTP的方式完全一樣,不同的是在建立Destination物件的時候,需要建立topic物件

直接上程式碼:

複製程式碼
@Test
    public void testTopicProducer() throws Exception {
        // 第一步:建立ConnectionFactory物件,需要指定服務端ip及埠號。
        // brokerURL伺服器的ip及埠號
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://IP地址:61616");
        // 第二步:使用ConnectionFactory物件建立一個Connection物件。
        Connection connection = connectionFactory.createConnection();
        // 第三步:開啟連線,呼叫Connection物件的start方法。
        connection.start();
        // 第四步:使用Connection物件建立一個Session物件。
        // 第一個引數:是否開啟事務。true:開啟事務,第二個引數忽略。
        // 第二個引數:當第一個引數為false時,才有意義。訊息的應答模式。1、自動應答2、手動應答。一般是自動應答。
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 第五步:使用Session物件建立一個Destination物件(topic、queue),此處建立一個topic物件。
        // 引數:話題的名稱。
        Topic topic = session.createTopic("test-topic");
        // 第六步:使用Session物件建立一個Producer物件。
        MessageProducer producer = session.createProducer(topic);
        // 第七步:建立一個Message物件,建立一個TextMessage物件。
        /*
         * TextMessage message = new ActiveMQTextMessage(); message.setText(
         * "hello activeMq,this is my first test.");
         */
        TextMessage textMessage = session.createTextMessage("hello activeMq,this is my topic test");
        // 第八步:使用Producer物件傳送訊息。
        producer.send(textMessage);
        // 第九步:關閉資源。
        producer.close();
        session.close();
        connection.close();
    }
複製程式碼

消費者實現的步驟:

步驟和PTP消費者實現的步驟一樣,唯一不同的是在建立Destination物件的時候,建立topic物件,同時要和釋出訂閱的生產者的topic一致

消費者程式碼:

複製程式碼
@Test
    public void testTopicConsumer() throws Exception {
        // 第一步:建立一個ConnectionFactory物件。
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://IP地址:61616");
        // 第二步:從ConnectionFactory物件中獲得一個Connection物件。
        Connection connection = connectionFactory.createConnection();
        // 第三步:開啟連線。呼叫Connection物件的start方法。
        connection.start();
        // 第四步:使用Connection物件建立一個Session物件。
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 第五步:使用Session物件建立一個Destination物件。和傳送端保持一致topic,並且話題的名稱一致。
        Topic topic = session.createTopic("test-topic");
        // 第六步:使用Session物件建立一個Consumer物件。
        MessageConsumer consumer = session.createConsumer(topic);
        // 第七步:接收訊息。
        consumer.setMessageListener(new MessageListener() {

            @Override
            public void onMessage(Message message) {
                try {
                    TextMessage textMessage = (TextMessage) message;
                    String text = null;
                    // 取訊息的內容
                    text = textMessage.getText();
                    // 第八步:列印訊息。
                    System.out.println(text);
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
        System.out.println("topic的消費端03。。。。。");
        // 等待鍵盤輸入
        System.in.read();
        // 第九步:關閉資源
        consumer.close();
        session.close();
        connection.close();
    }
複製程式碼

總結:兩種傳遞訊息的方式的異同

相同點:實現步驟基本一樣,大同小異

不同點:PTP傳遞訊息的方法,訊息的生產者傳送以後,訊息會持久化在activemq的服務端,如果該訊息給消費者消費,在服務端持久化的訊息也就同時被刪除。

釋出訂閱傳遞訊息的方法:訊息的生產者傳送訊息以後,如果沒有消費者消費,訊息不會持久化在activemq的客戶端,會立即消失。如果建立的訊息被消費,會的activemq的服務端顯示訊息相關內容。這一點和PTP剛好相反。

注意:釋出訂閱傳遞訊息的方式:也是可以實現訊息持久化在服務端的,需要消費者首先在activemq的服務端訂閱訊息(註冊),將消費者客戶端的ID(作為唯一標識,因為可以有多個消費者)和訊息的ID傳遞給服務端即可。