1. 程式人生 > >ActiveMQ(一)初步接觸-編寫Demo

ActiveMQ(一)初步接觸-編寫Demo

宣告 轉載請註明出處! Reprint please indicate the source!

MessageQueue是分散式的系統裡經常要用到的元件。

什麼是JMS

JMS即Java訊息服務(Java Message Service)應用程式介面,是一個Java平臺中關於面向訊息中介軟體(MOM)的API,用於在兩個應用程式之間,或分散式系統中傳送訊息,進行非同步通訊。Java訊息服務是一個與具體平臺無關的API,絕大多數MOM提供商都對JMS提供支援。

(引用自百度百科)

JMS 在其中扮演的角色與JDBC 很相似,正如JDBC 提供了一套用於訪問各種不同關係資料庫的公共API,JMS 也提供了獨立於特定廠商的企業訊息系統訪問方式。JMS 的程式設計過程很簡單,概括為:應用程式A 傳送一條訊息到訊息伺服器(也就是JMS Provider)的某個目得地(Destination),然後訊息伺服器把訊息轉發給應用程式B。因為應用程式A 和應用程式B 沒有直接的程式碼關連,所以兩者實現瞭解偶。

JMS訊息解耦

(引用自部落格)

JMS的用途

  • 解耦
  • 資料的可靠傳輸
  • 保證資料不重發,不丟失
  • 能夠實現跨平臺操作,能夠為不同作業系統上的軟體整合資料傳送服務。

訊息的傳遞模型

JMS支援兩種訊息傳遞模型:

點對點(point-to-point,簡稱PTP)和釋出/訂閱(publish/subscribe,簡稱pub/sub)。這兩種訊息傳遞模型非常相似,但有以下區別:

  • a. PTP訊息傳遞模型規定了一條訊息之恩能夠傳遞費一個接收方。
  • b. Pub/sub訊息傳遞模型允許一條訊息傳遞給多個接收方 每個模型都通過擴充套件公用基類來實現。例如:javax.jms.Queue和Javax.jms.Topic都擴充套件自javax.jms.Destination類。

上面兩種訊息傳遞模型裡,我們都需要定義訊息生產者和消費者,生產者吧訊息傳送到JMS Provider的某個目標地址(Destination),訊息從該目標地址傳送至消費者。消費者可以同步或非同步接收訊息,一般而言,非同步訊息消費者的執行和伸縮性都優於同步訊息接收者,體現在:

  1. 非同步訊息接收者建立的網路流量比較小。單向對東訊息,並使之通過管道進入訊息監聽器。管道操作支援將多條訊息聚合為一個網路呼叫。
  2. 非同步訊息接收者使用執行緒比較少。非同步訊息接收者在不活動期間不使用執行緒。同步訊息接收者在接收呼叫期間內使用執行緒,結果執行緒可能會長時間保持空閒,尤其是如果該呼叫中指定了阻塞超時。
  3. 對於伺服器上執行的應用程式程式碼,使用非同步訊息接收者幾乎總是最佳選擇,尤其是通過訊息驅動Bean。使用非同步訊息接收者可以防止應用程式程式碼在伺服器上執行阻塞操作。而阻塞操作會是伺服器端執行緒空閒,甚至會導致死鎖。阻塞操作使用所有執行緒時則發生死鎖。如果沒有空餘的執行緒可以處理阻塞操作自身解鎖所需的操作,這該操作永遠無法停止阻塞。

(引用自部落格)

什麼是ActiveMQ

ActiveMQ 是Apache出品,最流行的,能力強勁的開源訊息匯流排。ActiveMQ 是一個完全支援JMS1.1和J2EE 1.4規範的 JMS Provider實現,儘管JMS規範出臺已經是很久的事情了,但是JMS在當今的J2EE應用中間仍然扮演著特殊的地位。

(引用自百度百科)

官方主頁

權威書籍

環境配置

Maven依賴

引入核心包

<dependency>
  <groupId>org.apache.activemq</groupId>
  <artifactId>activemq-core</artifactId>
  <version>5.7.0</version>
</dependency>

TIPS:如果你引入的是下面的activemq-all.jar,且工程中已經引入了SLF4J,會與activemq-all.jar中的SLF4J發生衝突。

<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-all</artifactId>
    <version>5.14.4</version>
</dependency>

啟動broker

要執行起來demo得先,啟動broker。我是在虛擬機器上測試的。ip:192.168.235.100

進入到 執行:

activemq start

測試

broker管理頁面

預設使用者名稱/密碼:admin/admin

介面介紹

API文件

由於歷史原因,JMS提供有四套API介面。

  • JMS1.0 定義了兩個域相關的API,queue、topic。
  • JMS1.1 引入了一組新API,也叫做傳統API。
  • JMS2.0 引入了一組簡化API,擁有傳統API所有的特性,同時介面更少、使用更方便。

每組API挺一組不同的介面集合,擁有連線到JMS提供者、傳送和接收訊息。它們共享一組代表訊息、訊息目的地和其他各方面功能特性的通用介面。所有的介面都在javax.jms下。

傳統API介面

傳統API介面模型圖 傳統API介面模型圖

Destination 介面是Queue和Topic的頂層介面。

介面 用處
ConnectionFactory 使用者用來建立到JMS提供者的連線的被管物件。JMS客戶通過可移植的介面訪問連線,這樣當下層的實現改變時,程式碼不需要進行修改。 管理員在JNDI名字空間中配置連線工廠,這樣,JMS客戶才能夠查詢到它們。根據訊息型別的不同,使用者將使用佇列連線工廠,或者主題連線工廠。
Connection 連線代表了應用程式和訊息伺服器之間的通訊鏈路。在獲得了連線工廠後,就可以建立一個與JMS提供者的連線。根據不同的連線型別,連線允許使用者建立會話,以傳送和接收佇列和主題到目標。
Session 表示一個單執行緒的上下文,用於傳送和接收訊息。由於會話是單執行緒的,所以訊息是連續的,就是說訊息是按照發送的順序一個一個接收的。會話的好處是它支援事務。如果使用者選擇了事務支援,會話上下文將儲存一組訊息,直到事務被提交才傳送這些訊息。在提交事務之前,使用者可以使用回滾操作取消這些訊息。一個會話允許使用者建立訊息生產者來發送訊息,建立訊息消費者來接收訊息。
Destination 目標是一個包裝了訊息目標識別符號的被管物件,訊息目標是指訊息釋出和接收的地點,或者是佇列,或者是主題。JMS管理員建立這些物件,然後使用者通過JNDI發現它們。和連線工廠一樣,管理員可以建立兩種型別的目標,點對點模型的佇列,以及釋出者/訂閱者模型的主題。
MessageConsumer 由會話建立的物件,用於接收發送到目標的訊息。消費者可以同步地(阻塞模式),或非同步(非阻塞)接收佇列和主題型別的訊息。
MessageProducer 由會話建立的物件,用於傳送訊息到目標。使用者可以建立某個目標的傳送者,也可以建立一個通用的傳送者,在傳送訊息時指定目標。
Message 是在消費者和生產者之間傳送的物件,也就是說從一個應用程式創送到另一個應用程式。一個訊息有三個主要部分:訊息頭(必須):包含用於識別和為訊息尋找路由的操作設定。一組訊息屬性(可選):包含額外的屬性,支援其他提供者和使用者的相容。可以建立定製的欄位和過濾器(訊息選擇器)。

一個訊息體(可選):允許使用者建立五種型別的訊息(文字訊息,對映訊息,位元組訊息,流訊息和物件訊息)。訊息介面非常靈活,並提供了許多方式來定製訊息的內容。

簡化API接

簡化API介面模型圖 簡化版API介面模型圖

demo注意:ActiveMQ是沒有實現簡化版介面的。不僅ActiveMQ,很多廠商也沒有支援簡化版API介面。

點對點模式

點對點模式,有點類似關係資料庫。從程式設計角度,它裡面的Acknowledge,就類似於資料庫的commit。Connection連線、Session會話、工廠模式等,在設計上與資料庫很像。

ActiveMQ中Queue實現了點對點模型。

JMSProducer.java

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * Created with IntelliJ IDEA.<br>
 * Description: JMS ActiveMQ Demo測試 訊息生產者<br>
 * 執行前,需要開啟本地的activemq。
 * 如果需要更改broker地址,要提前執行相應的broker。
 * User: jahen<br>
 * Date: 2017-04-02<br>
 * Time: 11:06<br>
 */
public class JMSProducer {
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; // 預設連線
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; // 預設密碼
//    private static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL; // 預設連線地址 為 failover://tcp://localhost:61616
    private static final String BROKER_URL = "failover://tcp://192.168.235.100:61616"; // 指定連線地址 (my VM)
    private static final int SENDNUM = 10; // 傳送的訊息數量

    public static void main(String[] args) {
        ConnectionFactory connectionFactory; // 連線工程,生產Connection
        Connection connection = null; // 連線
        Session session; // 會話 接受或者傳送訊息的執行緒
        Destination destination; // 訊息的目的地
        MessageProducer messageProducer; // 訊息生產者

        // 例項化連線工廠
        connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL);
        // 建立連線
        try {
            connection = connectionFactory.createConnection();
            connection.start(); // 啟動連線

            session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); // 提交事務,自動確認

            destination = session.createQueue("FirstQueue"); // 建立訊息佇列
            messageProducer = session.createProducer(destination); // 建立訊息傳送者

            sendMessage(session, messageProducer); // 傳送訊息
            session.commit(); // 提交事務
        } catch (JMSException e) {
            e.printStackTrace();
        } finally {
            if (connection!=null)
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
        }
    }

    /**
     * 傳送訊息
     * @param session 會話
     * @param messageProducer 訊息生產者
     */
    private static void sendMessage(Session session, MessageProducer messageProducer) throws JMSException {
        for (int i=0; i<JMSProducer.SENDNUM; i++) {
            TextMessage message = session.createTextMessage("ActiveMQ 傳送的訊息 "+i);
            System.out.println("傳送訊息: ActiveMQ 傳送的訊息 "+i);
            messageProducer.send(message);
        }
    }
}

執行一下JMSProudcer,生產10條訊息。

JMSConsumer.java

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQMessageConsumer;

import javax.jms.*;

/**
 * Created with IntelliJ IDEA.<br>
 * Description: 訊息消費者1-點對點模式<br>
 *     實現方式1 迴圈檢測<br>
 * User: jahen<br>
 * Date: 2017-04-02<br>
 * Time: 13:44<br>
 */
public class JMSConsumer {
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; // 預設連線
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; // 預設密碼
    //    private static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL; // 預設連線地址 為 failover://tcp://localhost:61616
    private static final String BROKER_URL = "failover://tcp://192.168.235.100:61616"; // 指定連線地址 (my VM)

    public static void main(String args[]) {
        ConnectionFactory connectionFactory; // 連線工程,生產Connection
        Connection connection = null; // 連線
        Session session; // 會話 接受或者傳送訊息的執行緒
        Destination destination; // 訊息的目的地
        MessageConsumer messageConsumer; // 訊息消費者

        // 例項化連線工廠
        connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL);
        try {
            connection = connectionFactory.createConnection();
            connection.start();
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 消費訊息不需要事務,自動確認
            destination = session.createQueue("FirstQueue"); // 建立訊息佇列

            messageConsumer = session.createConsumer(destination); // 建立訊息消費者

            while (true) {
                TextMessage textMessage = (TextMessage) messageConsumer.receive(100000);// 設定延時為100s
                if (textMessage!=null) { // 接收到訊息
                    System.out.println("接收的訊息:"+textMessage.getText());
                }else {
                    break;
                }
            }
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

}

執行一下JMSConsumer,消費10條訊息。

image

這種方式消費訊息,通過迴圈檢查,顯然是不高明的。

下面,通過設定監聽的方式,實現訊息消費。

再次執行一下JMSProudcer,生產10條訊息。

又生產了10條訊息

首先實現一下監聽器

Listenr.java

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

/**
 * Created with IntelliJ IDEA.<br>
 * Description: 訊息監聽者<br>
 * User: jahen<br>
 * Date: 2017-04-02<br>
 * Time: 14:30<br>
 */
public class Listener implements MessageListener {
    @Override
    public void onMessage(Message message) {
        try {
            System.out.println("收到訊息:" + ((TextMessage)message).getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

JMSConsumer2.java

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * Created with IntelliJ IDEA.<br>
 * Description: 訊息消費者2-點對點模式<br>
 *     實現方式2 設定監聽<br>
 * User: jahen<br>
 * Date: 2017-04-02<br>
 * Time: 13:44<br>
 */
public class JMSConsumer2 {
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; // 預設連線
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; // 預設密碼
    //    private static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL; // 預設連線地址 為 failover://tcp://localhost:61616
    private static final String BROKER_URL = "failover://tcp://192.168.235.100:61616"; // 指定連線地址 (my VM)

    public static void main(String args[]) {
        ConnectionFactory connectionFactory; // 連線工程,生產Connection
        Connection connection = null; // 連線
        Session session; // 會話 接受或者傳送訊息的執行緒
        Destination destination; // 訊息的目的地
        MessageConsumer messageConsumer; // 訊息消費者

        // 例項化連線工廠
        connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL);
        try {
            connection = connectionFactory.createConnection();
            connection.start();
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 消費訊息不需要事務,自動確認
            destination = session.createQueue("FirstQueue"); // 建立訊息佇列

            messageConsumer = session.createConsumer(destination); // 建立訊息消費者

            messageConsumer.setMessageListener(new Listener());// 註冊訊息監聽
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

}

執行一下JMSConsumer2,新產生的訊息被消費了。

新產生的訊息被消費了

訊息釋出/訂閱模式

釋出/訂閱模式是一對多的關係。

注意:釋出/訂閱要先執行訂閱,再執行釋出才能收到訊息。

釋出者和訂閱者之間有時間上的依賴性。針對某個主題Topic的訂閱者,它必須建立一個訂閱者之後,才能消費釋出者的訊息,而且為了消費訊息,訂閱者必須保持執行的狀態。

Topic 實現了釋出/訂閱模型。

JMSConsumer.java

package com.jahentao.activemq;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * Created with IntelliJ IDEA.<br>
 * Description: 訊息消費者-釋出訂閱模式 訊息訂閱者<br>
 *     實現方式 設定監聽<br>
 *     訊息訂閱者1<br>
 * User: jahen<br>
 * Date: 2017-04-02<br>
 * Time: 13:44<br>
 */
public class JMSConsumer {
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; // 預設連線
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; // 預設密碼
    //    private static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL; // 預設連線地址 為 failover://tcp://localhost:61616
    private static final String BROKER_URL = "failover://tcp://192.168.235.100:61616"; // 指定連線地址 (my VM)

    public static void main(String args[]) {
        ConnectionFactory connectionFactory; // 連線工程,生產Connection
        Connection connection = null; // 連線
        Session session; // 會話 接受或者傳送訊息的執行緒
        Destination destination; // 訊息的目的地
        MessageConsumer messageConsumer; // 訊息消費者

        // 例項化連線工廠
        connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL);
        try {
            connection = connectionFactory.createConnection();
            connection.start();
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 消費訊息不需要事務,自動確認
//            destination = session.createQueue("FirstQueue"); // 建立訊息佇列
            destination = session.createTopic("FirstTopic"); // 建立訊息訂閱者
            messageConsumer = session.createConsumer(destination); // 建立訊息消費者

            messageConsumer.setMessageListener(new Listener());// 註冊訊息監聽
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

}

Listener.java

package com.jahentao.activemq;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

/**
 * Created with IntelliJ IDEA.<br>
 * Description: 訂閱者1訊息監聽器<br>
 * User: jahen<br>
 * Date: 2017-04-02<br>
 * Time: 14:46:52<br>
 */
public class Listener implements MessageListener {
    @Override
    public void onMessage(Message message) {
        try {
            System.out.println("訂閱者一 收到訊息:" + ((TextMessage)message).getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

JMSConsumer2.java

package com.jahentao.activemq;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * Created with IntelliJ IDEA.<br>
 * Description: 訊息消費者-釋出訂閱模式 訊息訂閱者<br>
 *     實現方式 設定監聽<br>
 *     訊息訂閱者2<br>
 * User: jahen<br>
 * Date: 2017-04-02<br>
 * Time: 13:44<br>
 */
public class JMSConsumer2 {
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; // 預設連線
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; // 預設密碼
    //    private static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL; // 預設連線地址 為 failover://tcp://localhost:61616
    private static final String BROKER_URL = "failover://tcp://192.168.235.100:61616"; // 指定連線地址 (my VM)

    public static void main(String args[]) {
        ConnectionFactory connectionFactory; // 連線工程,生產Connection
        Connection connection = null; // 連線
        Session session; // 會話 接受或者傳送訊息的執行緒
        Destination destination; // 訊息的目的地
        MessageConsumer messageConsumer; // 訊息消費者

        // 例項化連線工廠
        connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL);
        try {
            connection = connectionFactory.createConnection();
            connection.start();
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 消費訊息不需要事務,自動確認
//            destination = session.createQueue("FirstQueue"); // 建立訊息佇列
            destination = session.createTopic("FirstTopic"); // 建立訊息訂閱者
            messageConsumer = session.createConsumer(destination); // 建立訊息消費者

            messageConsumer.setMessageListener(new Listener2());// 註冊訊息監聽
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

}

Listener2.java

package com.jahentao.activemq;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

/**
 * Created with IntelliJ IDEA.<br>
 * Description: 訂閱者2訊息監聽器<br>
 * User: jahen<br>
 * Date: 2017-04-02<br>
 * Time: 14:46:52<br>
 */
public class Listener2 implements MessageListener {
    @Override
    public void onMessage(Message message) {
        try {
            System.out.println("訂閱者二 收到訊息:" + ((TextMessage)message).getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

首先,分別執行JMSConsumer、JMSConsumer2進行訂閱。

2個訂閱者

JMSProducer.java

package com.jahentao.activemq;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * Created with IntelliJ IDEA.<br>
 * Description: JMS ActiveMQ Demo測試 釋出訂閱模式 訊息釋出者<br>
 * 執行前,需要開啟本地的activemq。
 * 如果需要更改broker地址,要提前執行相應的broker。
 * User: jahen<br>
 * Date: 2017-04-02<br>
 * Time: 14:42:59<br>
 */
public class JMSProducer {
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; // 預設連線
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; // 預設密碼
//    private static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL; // 預設連線地址 為 failover://tcp://localhost:61616
    private static final String BROKER_URL = "failover://tcp://192.168.235.100:61616"; // 指定連線地址 (my VM)
    private static final int SENDNUM = 10; // 傳送的訊息數量

    public static void main(String[] args) {
        ConnectionFactory connectionFactory; // 連線工程,生產Connection
        Connection connection = null; // 連線
        Session session; // 會話 接受或者傳送訊息的執行緒
        Destination destination; // 訊息的目的地
        MessageProducer messageProducer; // 訊息生產者

        // 例項化連線工廠
        connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL);
        // 建立連線
        try {
            connection = connectionFactory.createConnection();
            connection.start(); // 啟動連線

            session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); // 提交事務,自動確認

//            destination = session.createQueue("FirstQueue"); // 建立訊息佇列
            destination = session.createTopic("FirstTopic"); // 建立主題
            messageProducer = session.createProducer(destination); // 建立訊息傳送者

            sendMessage(session, messageProducer); // 傳送訊息
            session.commit(); // 提交事務
        } catch (JMSException e) {
            e.printStackTrace();
        } finally {
            if (connection!=null)
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
        }
    }

    /**
     * 傳送訊息
     * @param session 會話
     * @param messageProducer 訊息生產者
     */
    private static void sendMessage(Session session, MessageProducer messageProducer) throws JMSException {
        for (int i = 0; i< JMSProducer.SENDNUM; i++) {
            TextMessage message = session.createTextMessage("ActiveMQ 傳送的訊息 "+i);
            System.out.println("傳送訊息: ActiveMQ 傳送的訊息 "+i);
            messageProducer.send(message);
        }
    }
}

然後執行JMSProducer。

釋出訊息訂閱者收到訊息

參考

這裡學習的原始碼,託管在碼雲