1. 程式人生 > 實用技巧 >訊息佇列之activeMQ

訊息佇列之activeMQ

1.activeMQ的主要功能

  1. 實現高可用、高伸縮、高效能、易用和安全的企業級面向訊息服務的系統
  2. 非同步訊息的消費和處理
  3. 控制訊息的消費順序
  4. 可以和Spring/springBoot整合簡化編碼
  5. 配置叢集容錯的MQ叢集

2.activeMQ安裝

下載地址:http://activemq.apache.org/components/classic/download/

這裡筆者是下載的linux版的:

因為activeMQ底層是使用java編寫的,所以需要安裝jdk,這個請移步我之前的部落格:

https://www.cnblogs.com/pluto-charon/p/11746636.html

安裝activeMq:

# 安裝apache
[root@localhost ~]# yum install ttpd
# 下載的apache-activemq並上傳到linux的home下,解壓
[root@localhost home]# tar -zxvf apache-activemq-5.16.0-bin.tar.gz 
# 進入到bin目錄下
[root@localhost home]# cd /apache-activemq-5.16.0/bin
# 啟動
[root@localhost bin]# ./activemq start
INFO: Loading '/home/apache-activemq-5.16.0//bin/env'
INFO: Using java '/usr/local/java/jdk1.8.0_20//bin/java'
INFO: Starting - inspect logfiles specified in logging.properties and log4j.properties to get details
INFO: pidfile created : '/home/apache-activemq-5.16.0//data/activemq.pid' (pid '7517')

# activemq的預設埠是61616,檢視是否啟動的三種方式
# 第一種
[root@localhost bin]# ps -ef |grep activemq
# 第二種
[root@localhost bin]# netstat -ano|grep 61616
tcp6       0      0 :::61616                :::*                    LISTEN      off (0.00/0/0)
# 第三種
[root@localhost bin]# lsof -i:61616
COMMAND  PID USER   FD   TYPE DEVICE SIZE/OFF NODE NAME
java    7517 root  132u  IPv6  39926      0t0  TCP *:61616 (LISTEN)

# 帶日誌的啟動方式
[root@localhost bin]# ./activemq start > /home/apache-activemq-5.16.0/myrunmq.log
[root@localhost bin]# cd ..
# 可以看到,啟動日誌都已經記錄到日誌裡了
[root@localhost apache-activemq-5.16.0]# cat myrunmq.log 
INFO: Loading '/home/apache-activemq-5.16.0//bin/env'
INFO: Using java '/usr/local/java/jdk1.8.0_20//bin/java'
INFO: Starting - inspect logfiles specified in logging.properties and log4j.properties to get details
INFO: pidfile created : '/home/apache-activemq-5.16.0//data/activemq.pid' (pid '7787')
# 關閉activemq
[root@localhost bin]# ./activemq stop

前臺訪問的埠是8161,在檢視前臺時,要關閉linux和windows的防火牆:

# 關閉linux防火牆
[root@localhost apache-activemq-5.16.0]# systemctl stop firewalld

在訪問之前,需要修改conf目錄下的jetty.xml,將下面的host修改成自己的ip,以及修改使用者名稱和密碼。

<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
    <!-- the default port number for the web console -->
    <property name="host" value="127.0.0.1"/>
    <property name="port" value="8161"/>
</bean>

# 使用者名稱和密碼可修改可不修改,預設為admin/admin
<bean id="securityConstraint" class="org.eclipse.jetty.util.security.Constraint">
    <property name="name" value="BASIC" />
    <property name="roles" value="user,admin" />
    <!-- set authenticate=false to disable login -->
    <property name="authenticate" value="true" />
</bean>	

修改完成之後重啟activemq

[root@localhost bin]# ./activemq restart

檢視,地址為192.168.189.150:8161

到這裡就說明activemq安裝成功了。

3.JMS

JMS(java message service)是一個用於提供訊息服務的技術規範,他制定了在整個訊息服務提供過程中的所有資料結構和互動流程。當兩個程式使用jms進行通訊時,他們並不是直接相連的,而是通過一個共同的訊息收發服務連線起來的,達到解耦的效果。jms為標準訊息協議和訊息服務提供了一組通用的介面,包括建立、傳送、讀取訊息等。

1 JMS的優勢:

非同步:客戶端不用傳送請求,JMS自動將訊息傳送給客戶端

可靠:JMS保證訊息只傳遞一次

2.JMS的四大元件:

  • JMS provider:實現了jms介面和規範的訊息中介軟體

  • JMS producer:訊息生產者,建立和傳送JMS訊息的客戶端應用

  • JMS consumer:訊息消費者,接受和處理JMS訊息的客戶端應用

  • JMS message:由訊息頭、訊息屬性、訊息體組成

    訊息頭(在send方法之前,通過setXXX()設定):

    JMSDestination:訊息傳送的目的地,主要是指Queue(點對點傳送模型)和Topic(釋出訂閱模型)

    JMSDeliverMode:訊息是否持久

    JMSExpiration:設定訊息過期時間

    JMSPriority:訊息優先順序,0-4被稱為普通訊息,5-9是加急訊息,預設為4

    JMSMessageID:唯一識別每個訊息的標識,由MQ產者或者自己設定

    訊息屬性:除訊息頭以外的值,如識別,去重,重點標註等方法,如textMessage.setStringProperty("c1","VIP");

    訊息體:

    TextMessage:普通字串

    MapMessage:map型別,其中key為String型別,而值為java的基本型別

    BytesMessage:二進位制陣列訊息

    StreamMessage:java資料流訊息,用個標準流來順序填充和讀取

    ObjectMessage:物件訊息,包含一個可序列化的java物件

3.JMS的傳送模型:

  • 點對點訊息傳送模型:應用程式由訊息佇列、傳送者、接收者組成,每個訊息傳送給一個特殊的訊息佇列,該佇列儲存了所有傳送給它的訊息,處理消費掉的和已過期的訊息

    點對點訊息傳送的特性:

    1.每個訊息只有一個接收者

    2.訊息傳送者和接收者沒有時間依賴性

    3.當訊息傳送者傳送訊息時,無論接收者程式在不在執行,都能傳送訊息

    4.當接收者收到訊息時,會發送確認收到通知

  • 釋出訂閱訊息傳遞模型:釋出者釋出一個訊息,該訊息通過topic傳遞給所有訂閱的客戶端,釋出者和訂閱者彼此不知道對方,是匿名的且可以動態釋出和訊息訂閱。

    釋出訂閱訊息傳遞的特性:

    1.一個訊息可以傳遞給多個訂閱者

    2.釋出者和訂閱者有時間依賴性

    3.為了緩和嚴格的時間相關性,JMS允許訂閱者建立一個可持久化的訂閱

4.生產者程式碼實現

1.引入jar包

<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-all</artifactId>
    <version>5.16.0</version>
</dependency>

2.生產者程式碼

package activemq;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

/**
 * @className: Jmsproducer
 * @description: activemq生產者
 * @author: charon
 * @create: 2020-12-27 22:36
 */
public class JmsProducer {
    
    /** 宣告activemq的地址 */
    private static final String ACTIVEMQ_URL = "tcp://192.168.189.150:61616";

    /** 佇列名 */
    private static final String QUEUE_NAME = "queue01";

    /**
     * @param args 引數
     */
    public static void main(String[] args) throws JMSException {
        // 建立連線工廠
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        // 獲得連線
        Connection conn = activeMQConnectionFactory.createConnection();
        conn.start();
        // 建立會話
        Session session = conn.createSession(false,Session.AUTO_ACKNOWLEDGE);
        // 建立佇列
        Queue queue = session.createQueue(QUEUE_NAME);
        // 建立訊息的生產者
        MessageProducer messageProducer = session.createProducer(queue);
        // 建立訊息
        for (int i = 0; i < 5; i++) {
            // 訊息體
            TextMessage textMessage = session.createTextMessage("textMessage:第【 "+i+" 】條訊息");
            // 訊息頭
            // textMessage.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT));
            // 訊息屬性
            // textMessage.setStringProperty("c1","VIP");
            messageProducer.send(textMessage);
        }
        // 關閉資源
        messageProducer.close();
        session.close();
        conn.close();
    }
}

執行程式碼在瀏覽器上檢視,可以看到queue01裡面有5條訊息:

  • Number Of Pending Messages:等待消費的訊息 這個是當前未出佇列的數量。可以理解為總接收數-總出佇列數
  • Number Of Consumers:消費者的數量
  • Messages Enqueued:進入佇列的訊息 進入佇列的總數量,包括出佇列的。 這個數量只增不減
  • Messages Dequeued:出了佇列的訊息 可以理解為是消費這消費掉的數量

5.消費者程式碼實現

package activemq;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.io.IOException;

/**
 * @className: JmsConsumer
 * @description: activeMq的消費者
 * @author: charon
 * @create: 2020-12-28 08:10
 */
public class JmsConsumer {
    /** 宣告activemq的地址 */
    private static final String ACTIVEMQ_URL = "tcp://192.168.189.150:61616";

    /** 佇列名 */
    private static final String QUEUE_NAME = "queue01";

    public static void main(String[] args) throws JMSException, IOException {
        // 建立連線工廠
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        // 獲得連線
        Connection conn = activeMQConnectionFactory.createConnection();
        conn.start();
        // 建立會話
        Session session = conn.createSession(false,Session.AUTO_ACKNOWLEDGE);
        // 建立佇列
        Queue queue = session.createQueue(QUEUE_NAME);
        // 建立訊息的生產者
        MessageConsumer messageConsumer = session.createConsumer(queue);
        // 同步方式,生產環境並不適用,這種方式將阻塞知道獲得並返回第一條訊息
//        while (true){
//            TextMessage textMessage  =(TextMessage) messageConsumer.receive();
//            if(null!=textMessage){
//                System.out.println("---消費者收到訊息:"+textMessage.getText());
//            }else{
//                break;
//            }
//        }

        // 非同步方式,建立監聽,在又訊息到達時,呼叫listener的onMessage方法,
        messageConsumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                if(message != null && message instanceof TextMessage){
                    TextMessage textMessage = (TextMessage) message;
                    System.out.println("--消費者接受到訊息:"+textMessage);
                }
            }
        });
        
        System.in.read();
        // 關閉資源
        messageConsumer.close();
        session.close();
        conn.close();
    }
}

執行消費者的程式碼,應該我上面生產者的程式碼運行了兩次,所以訊息有10條。

6.activeMQ叢集搭建

在這裡,筆者使用的基於Zookeeper+levelDb搭建的activeMq叢集,為了避免單點故障,使用一主兩從的架構。使用Zookeeper叢集註冊所有的ActiveMQ Broker但只有其中一個Broker可以提供服務,它被視為master,也就是說如果master因為故障而不能提供服務,Zookeeper會從SLave中選舉出一個Broker充當master。

我這邊的zookeeper叢集已經搭建好了,150和151是follower,152是leader。

# 每臺伺服器上安裝activeMq,同時在叢集環境下,activemq的jetty.xml檔案重的host要改成0.0.0.0
# 修改activeMq.xml,註釋掉kahadb這個配置,actviemq預設的是kahadb,並且新增leveldb
[root@localhost conf]# vi activemq.xml
<!--        <persistenceAdapter>
            <kahaDB directory="${activemq.data}/kahadb"/>
        </persistenceAdapter> -->
<persistenceAdapter>
   <replicatedLevelDB
      directory="${activemq.data}/leveldb"
      replicas="3"
      <!--例項間的通訊地址-->
      bind="tcp://0.0.0.0:62222"
      <!--zookeeper的地址-->
      zkAddress="192.168.189.150:2181,192.168.189.151:2181,192.168.189.152:2181"
      <!--修改為每個伺服器的節點的ip-->
      hostname="192.168.189.152"
      sync="local_disk"
      zkPath="/activemq/leveldb-stores"/>
</persistenceAdapter>
# 啟動三個節點的activemq
[root@localhost bin]# ./activemq restart

# 檢視 連線zookeeper客戶端
[root@localhost bin]# zkCli.sh
[zk: localhost(CONNECTED) 1] ls /activemq/leveldb-stores
[00000000022, 00000000020, 00000000021]
# 訪問
[zk: 192.168.189.150(CONNECTED) 3] get /activemq/leveldb-stores/00000000020
{"id":"localhost","container":null,"address":"tcp://192.168.189.150:62222","position":-1,"weight":1,"elected":"0000000020"}
[zk: 192.168.189.150(CONNECTED) 4] get /activemq/leveldb-stores/00000000021
{"id":"localhost","container":null,"address":null,"position":-1,"weight":1,"elected":null}
[zk: 192.168.189.150(CONNECTED) 5] get /activemq/leveldb-stores/00000000022
{"id":"localhost","container":null,"address":null,"position":-1,"weight":1,"elected":null}

從上面可以看到,只有00000000020這個幾點的elected裡面有值,表明它被選舉為master節點了。

在瀏覽器上依次訪問:192.168.189.150:8161 , 192.168.189.151:8161,192.168.189.152:8161

只有192.168.189.150:8161可以訪問成功,因為只有master節點可以對外提供訪問,所以只有一個節點能訪問到,那麼它就是master節點。

第二種檢視的方式:

檢視activemq的日誌,最後一行,可以看到,MasterLevelDBStore即為master節點,SlaveLevelDBStore即為slave節點。

第三種檢視的方式為使用zookeeper的視覺化工具。

由於activeMq叢集是基於zookeeper叢集實現的,所以要注意一下三點:

  1. activeMQ的客戶端只能訪問master的Broker,其它處於Slave的Broker不能訪問,所以客戶端連線的Broker應該使用failover協議
  2. 當一個activeMQ節點掛掉或者一個Zookeeper節點掛掉,activeMQ服務正常運轉,但是如果僅剩一個activeMQ節點,由於不能選舉Master,所以activeMQ不能正常執行;(一個就不成叢集了)
  3. 同理,如果Zookeeper僅剩一個節點是活動的,不管activeMQ是都存活或者說不管activeMQ個節點是否存活,activeMQ不能正常提供服務,必須依賴於Zookeeper叢集服務。

7.叢集程式碼實現

叢集的程式碼和上面單機的程式碼大致是一直的,就只需要修改一個activemq的地址。

 /** 宣告叢集中activemq的地址,使用failover協議,隨機 */
    private static final String ACTIVEMQ_URL = "failover:(tcp://192.168.189.150:61616,tcp://192.168.189.151:61616,tcp://192.168.189.152:61616)?Randomize=false";

8.activemq的高階特性

1.訊息傳送方式

預設情況下,非持久化的訊息是非同步傳送的,持久化的訊息是同步傳送的。但是在開啟事務的情況下,訊息都是非同步傳送的,效率會有2個數量級的提升,所以在傳送持久化訊息時,請開啟事務模式。

2.儲存機制

在通常情況下,非持久化的訊息時儲存在記憶體中的,持久化訊息時儲存在檔案中的,他們的最大限制在配置檔案中的節點配置的,但是在非持久化訊息堆積到一定程度(記憶體告急)時,actviemq會將記憶體中的非持久化訊息寫入臨時檔案中,以騰出記憶體。但是它和持久化訊息的區別在於,重啟後持久化訊息會從檔案中恢復,非持久化訊息的臨時檔案會刪除。

所以儘量不要用非持久化檔案,如果非要用的化,可以將臨時檔案的限制調大。同時,非持久化的訊息要及時處理,不要堆積,或者啟動事務。啟動事務後,commit()會等待伺服器的訊息返回,也不會導致訊息丟失了。

3.死信佇列

一條訊息在被重發多次後(預設是6次),將會被ActiveMQ移入死信佇列;說白了就是異常訊息的歸併處理的集合,主要是處理失敗的訊息。可以在activeMQ.DLQ這個佇列中檢視。

4.重複訊息,冪等性呼叫

在網路延遲的情況洗啊,可能會造成MQ重試,可能會造成重複消費。如果訊息是做資料庫的插入操作,給這個訊息做一個唯一主鍵,那麼就算出現重複消費的情況,因為唯一主鍵,會造成主鍵衝突,避免資料庫出現髒資料。如果是第三方消費,可以在每條資料裡面加一個全域性唯一的id,如果訊息消費了,就將訊息存在redis中,在消費訊息之前將id到redis中查詢一下,判斷是否消費過,如果沒有消費過,就處理,如果消費過了,就不處理了。

參考網址:

https://blog.csdn.net/weixin_34122548/article/details/91929810?utm_medium=distribute.pc_relevant.none-task-blog-baidujs_title-2&spm=1001.2101.3001.4242