1. 程式人生 > >ActiveMQ入門以及使用整合spring boot

ActiveMQ入門以及使用整合spring boot

一、 ActiveMQ 訊息佇列的使用

MQ

MQ:Message Queue 訊息佇列,就是用來在系統之間進行訊息傳遞的

這個佇列有一系列具體的實現技術:ActiveMQ、rabbitMQ、kafka、RocketMQ(alibaba)

HttpCilent和MQ的比較

1 HttpClient只能算是兩個系統間呼叫的技術

HttpClient支援跨作業系統跨語言呼叫

2 MQ可以在兩個系統間進行資訊互動,並且支援高併發

ActiveMQ支援跨作業系統的java語言中通訊
rabbitMQ支援跨作業系統跨程式語言間的通訊   

1、ActiveMQ簡介

什麼是ActiveMQ 在這裡插入圖片描述

ActiveMQ工作原理:在本系統中的呼叫

在這裡插入圖片描述

1、 解決服務之間程式碼耦合

2、 使用訊息佇列,增加系統併發處理量

在這裡插入圖片描述

ActiveMQ應用場景分析:

1、 當系統使用簡訊平臺、郵件平臺的時候。

使用者註冊,重點使用使用者資訊資料庫儲存,而發簡訊、發郵件,增加業務處理複雜度,這時候使用MQ, 將發簡訊、發郵箱,通知MQ,由另外服務平臺完成。解決了程式碼的耦合問題。

2、 當系統使用搜索平臺、快取平臺的時候。

查詢資料,建立快取、索引 ,當再次查詢相同資料的時候,不從資料庫查詢,從快取或者索引庫查詢

當增加、修改、刪除資料時,傳送訊息給MQ, 快取平臺、索引平臺 從MQ獲取到這個資訊,更新快取或者索引

總結:使用MQ作為系統間資料呼叫的中轉站。

在這裡插入圖片描述

2、ActiveMQ安裝和使用

下載

在這裡插入圖片描述

在這裡插入圖片描述

下載windows版本

在這裡插入圖片描述

進行apache-activemq-5.14.0\bin\win64目錄 啟動activemq.bat 檔案

使用者名稱和密碼 都是admin 

在這裡插入圖片描述

ActiveMQ使用的是標準生產者和消費者模型

有兩種資料結構 Queue、Topic (詳見4.2)

1、Queue 佇列(P2P訊息模型) ,生產者生產了一個訊息,只能由一個消費者進行消費 :給微信好友發訊息

2、Topic 主題/廣播(Pub/Sub訊息模型),生產者生產了一個訊息,可以由多個消費者進行消費

  微信公眾號給粉絲髮訊息

JMS和ActiveMQ的對應

JMS訊息模型 P2P模式 pub/sub模式
ActiveMQ訊息 Queue佇列 Topic佇列
特點 一對一,一個人傳送,只允許一個人接收 一對多,一個人傳送,允許多個人接收
傳送的人:生產者
接收的人:消費者 

2、使用Java程式操作ActiveMQ

2.1、Queue-HelloWorld

2.1.1、pom

<dependencies>
    <dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-all</artifactId>
        <version>5.14.0</version>
    </dependency>
</dependencies>
2.1.2、編寫生產者

使用JMS原生API編寫測試類,向訊息中介軟體寫入訊息的開發步驟:

1 建立連結工廠
2 從連結工廠中獲取連結
3 啟動連結
4 獲取會話
5 建立Queue佇列
6 建立生產者
7 建立訊息
8 傳送訊息
9 提交請求
10 關閉各種資源

第一步:在test/java包中,建立包activeMQ_helloworld,建立類ActiveMQProducter

public class ActiveMQProducter {

    public static void main(String[] args) throws Exception{
        // 連線工廠
        // 使用預設使用者名稱、密碼、路徑
        // 因為:底層實現:final String defaultURL = "tcp://" + DEFAULT_BROKER_HOST + ":" + DEFAULT_BROKER_PORT;
        // 所以:路徑 tcp://host:61616
        //1 建立連線工廠
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
        //2 建立連線
        Connection connection = connectionFactory.createConnection();
        //3 開啟連線
        connection.start();
        //4 建立會話
        //第一個引數:是否開啟事務
        //第二個引數:訊息是否自動確認
        Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
        //建立佇列
        Queue queue = session.createQueue("hello1115");
        //5 建立生產者
        MessageProducer producer = session.createProducer(queue);
        //6 建立訊息
        Message message = session.createTextMessage("hi i am boy");
        //7 傳送訊息
        producer.send(message);

        //8 關閉訊息
        session.commit();
        producer.close();
        session.close();
        connection.close();
        System.out.println("訊息生產成功");
    }
}

第二步:執行程式碼,在控制檯提示:

在這裡插入圖片描述

第三步:檢視頁面效果,預設tcp連線activeMQ埠 61616 !!!

在這裡插入圖片描述

2.1.3、編寫消費者

使用JMS原生API編寫測試類,向訊息中介軟體消費訊息的開發步驟:

1 建立連結工廠
2 建立連結
3 啟動連結
4 獲取會話
5 建立佇列
6 建立消費者
7 消費訊息
8 提交
9 關閉資源

第一步:使用MessageConsumer完成消費ActiveMQConsumer.java

public class ActiveMQConsumer {
    public static void main(String[] args) throws Exception {
        //建立連線工廠
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
        //建立連線
        Connection connection = connectionFactory.createConnection();
        //開啟連線
        connection.start();
        //建立會話
        /** 第一個引數,是否使用事務
         如果設定true,操作訊息佇列後,必須使用 session.commit();
         如果設定false,操作訊息佇列後,不使用session.commit();
         */
        Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
        //建立佇列
        Queue queue = session.createQueue("hello1115");
        //建立消費者
        MessageConsumer consumer = session.createConsumer(queue);
        while(true){
            //失效時間,如果10秒內沒有收到新的訊息,說明沒有訊息存在,此時可以退出當前迴圈
            TextMessage message = (TextMessage) consumer.receive(10000);
            if(message!=null){
                System.out.println(message.getText());
            }else {
                break;
            }
        }

        //關閉連線
        session.commit();
        session.close();
        connection.close();

        System.out.println("消費結束0");
    }
}

第二步:檢視控制檯,發現資訊已經被消費

在這裡插入圖片描述

第三步:檢視頁面效果

使用者名稱和密碼 都是admin 

消費前:表示沒有消費

在這裡插入圖片描述

消費後:表示已經消費

在這裡插入圖片描述

2.1.4、監聽器消費訊息
// 使用監聽器消費
public static void main(String[] args) throws Exception {
    // 連線工廠
    // 使用預設使用者名稱、密碼、路徑
    // 路徑 tcp://host:61616
    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
    // 獲取一個連線
    Connection connection = connectionFactory.createConnection();
    // 開啟連線
    connection.start();
    // 建立會話
    // 第一個引數,是否使用事務,如果設定true,操作訊息佇列後,必須使用 session.commit();
    Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
    // 建立佇列或者話題物件
    Queue queue = session.createQueue("Hello1115");
    // 建立消費者
    MessageConsumer messageConsumer = session.createConsumer(queue);

    messageConsumer.setMessageListener(new MessageListener() {
        // 每次接收訊息,自動呼叫 onMessage
        public void onMessage(Message message) {
            TextMessage textMessage = (TextMessage) message;
            try {
                System.out.println(textMessage.getText());
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    });
    //此時,不能讓程式結束,如果結束,監聽就結束了
    while (true) {
        // 目的:不能讓程式死掉
    }
} 

重複測試生成和消費的過程。實現一邊生產,一邊消費的系統。

2.2、多消費者模式-queue

在這裡插入圖片描述

P2P訊息模型中的多消費者模式,得出結論如下:

  • 一個訊息只能被一個消費者消費,不可重複消費
  • 多個消費者均分訊息(負載均衡策略)
  • 當消費者在消費某個訊息的時候,mq一定要等到它的成功回執,才會分發下一個訊息

在這裡插入圖片描述

注意:測試的時候一定先啟動消費者,然後再啟動生產者

###3.3、Topic-HelloWorld

Topic:主題模式、廣播模式、pus/sub模式、

//4 獲取會話
// 第一個引數:是否開啟事務  true 開啟事務,後面一定要提交commit
// 第二個引數:是否自動確認訊息已經被消費
Session session = connection.createSession(true,Session.AUTO_ACKNOWLEDGE);
//5 建立Topic主題模式
//Queue queue = session.createQueue("java1.0913");
Topic topic = session.createTopic("java1.0319");
//6 建立生產者
MessageProducer producer = session.createProducer(topic);

2.4、多消費者模式-topic

在這裡插入圖片描述

2.5 queue和topic模式的比較

相同點:

1、都只有一個生產者

2、都可以有多個消費者

不同點:

1、queue佇列模式,一個訊息只能被一個消費者消費,不能重複消費

當消費者消費某個訊息的時候,一定要得到這個訊息被成功消費的回執,才會分發下一個訊息 queue入隊之後,無論等待多久,訊息都會一直等待消費者來處理

2、topic廣播模式,一個訊息可以被多個消費者消費

這個訊息無法被成功消費與否,都無所謂

topic要求時間要一致,我正好發,你正好收

3、SpringBoot整合ActiveMQ

在Spring Boot中整合ActiveMQ相對還是比較簡單的,都不需要安裝什麼服務,預設使用記憶體的activeMQ,當然配合外接ActiveMQ Server會更好。

我們採用外接ActiveMQ。

3.1、建立工程maven子模組

1、在common_parent中匯入activeMQ的啟動器

<!-- ActiveMQ的啟動器 -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-activemq</artifactId>
</dependency>

2、建立bos-mq子模組

在這裡插入圖片描述

在這裡插入圖片描述

在這裡插入圖片描述

3、新增專案依賴

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
    </dependency>
    <!-- ActiveMQ的啟動器 -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-activemq</artifactId>
    </dependency>
</dependencies>

4、建立程式入口

在這裡插入圖片描述

程式碼如下:

@SpringBootApplication
public class BosMqApplication {

    public static void main(String[] args) {
        SpringApplication.run(BosMqApplication.class, args);
    }
}

5、建立application.properties,具體配置如下:

在這裡插入圖片描述

# MQ所在的伺服器的地址
spring.activemq.broker-url=tcp://127.0.0.1:61616
# 是否使用內建的MQ, true  使用; fale  不使用
spring.activemq.in-memory=false 
# 是否在回滾回滾訊息之前停止訊息傳遞。這意味著當啟用此命令時,訊息順序不會被保留。
spring.activemq.non-blocking-redelivery=false
# 使用者名稱
spring.activemq.password=admin
# 密碼
spring.activemq.user=admin

配置的具體意思

spring.activemq.broker-url=tcp://127.0.0.1:61616
# 在考慮結束之前等待的時間
#spring.activemq.close-timeout=15s 
# 預設代理URL是否應該在記憶體中。如果指定了顯式代理,則忽略此值。
spring.activemq.in-memory=true 
# 是否在回滾回滾訊息之前停止訊息傳遞。這意味著當啟用此命令時,訊息順序不會被保留。
spring.activemq.non-blocking-redelivery=false
# 密碼
spring.activemq.password=admin
# 等待訊息傳送響應的時間。設定為0等待永遠。
spring.activemq.user=admin
# 是否信任所有包
#spring.activemq.packages.trust-all=
# 要信任的特定包的逗號分隔列表(當不信任所有包時)
#spring.activemq.packages.trusted=
# 當連線請求和池滿時是否阻塞。設定false會拋“JMSException異常”。
#spring.activemq.pool.block-if-full=true
# 如果池仍然滿,則在丟擲異常前阻塞時間。
#spring.activemq.pool.block-if-full-timeout=-1ms
# 是否在啟動時建立連線。可以在啟動時用於加熱池。
#spring.activemq.pool.create-connection-on-startup=true
# 是否用Pooledconnectionfactory代替普通的ConnectionFactory。
#spring.activemq.pool.enabled=false 
# 連線過期超時。
#spring.activemq.pool.expiry-timeout=0ms
# 連線空閒超時
#spring.activemq.pool.idle-timeout=30s
# 連線池最大連線數
#spring.activemq.pool.max-connections=1
# 每個連線的有效會話的最大數目。
#spring.activemq.pool.maximum-active-session-per-connection=500
# 當有"JMSException"時嘗試重新連線
#spring.activemq.pool.reconnect-on-exception=true
# 在空閒連線清除執行緒之間執行的時間。當為負數時,沒有空閒連線驅逐執行緒執行。
#spring.activemq.pool.time-between-expiration-check=-1ms
# 是否只使用一個MessageProducer
#spring.activemq.pool.use-anonymous-producers=true

6、建立config

在這裡插入圖片描述

程式碼如下:

package com.czxy.config;
import javax.jms.Queue;
import javax.jms.Topic;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ActiveMQConfig {
	@Bean
	public Queue queue() {
		 return new ActiveMQQueue("Armyman.queue");
	}
	@Bean
	public Topic topic() {
		return new ActiveMQTopic("Armyman.topic");
	}
}

3.2、編寫消費者和生產者

1、編寫生產者QueueProducer

在這裡插入圖片描述

/**
  * 訊息的生產者 
  * @author Administrator
  *
  */
@Componet
@EnableScheduling
public class QueueProducer {  
	/*
	 * @Autowired // 也可以注入JmsTemplate,JmsMessagingTemplate對JmsTemplate進行了封裝
	 * private JmsMessagingTemplate jmsTemplate; //
	 * 傳送訊息,destination是傳送到的佇列,message是待發送的訊息
	 * 
	 * @Scheduled(fixedDelay=3000)//每3s執行1次 
	   public void sendMessage(Destination destination, final String message){
	      jmsTemplate.convertAndSend(destination, message); 
	   }
	 */

    @Autowired
    private JmsMessagingTemplate jmsMessagingTemplate;

    @Autowired
    private Queue queue;

    @Scheduled(fixedDelay=3000)//每3s執行1次
    public void send() {
       try {
		
		   MapMessage mapMessage = new ActiveMQMapMessage();
		   mapMessage.setString("info", "你還在睡覺");
		   
		   this.jmsMessagingTemplate.convertAndSend(this.queue, mapMessage);
		   
		} catch (Exception e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
    }
}  

2、編寫消費者QueueConsumer

在這裡插入圖片描述

/**
 * 訊息的消費者
 * @author Administrator
 */
@Component  
public class QueueConsumer {  
    //使用JmsListener配置消費者監聽的佇列,其中Message是接收到的訊息  
	@JmsListener(destination = "Armyman.queue")  
    public void receiveQueue(Message message) {
		try {
			MapMessage mapMessage = (MapMessage) message;
			String info = mapMessage.getString("info");
			System.out.println(info);
		} catch (Exception e) {
			e.printStackTrace();
		}
    } 
}

3、啟動,測試,OK

3.3 使用內建ActiveMQ

只需要改變properties配置檔案,即可執行

# MQ所在的伺服器的地址
# spring.activemq.broker-url=tcp://127.0.0.1:61616
# 是否使用SpringBoot內建的MQ, true  使用; fale  不使用
spring.activemq.in-memory=true 
# 是否在回滾回滾訊息之前停止訊息傳遞。這意味著當啟用此命令時,訊息順序不會被保留。
spring.activemq.non-blocking-redelivery=false
# 使用者名稱
# 密碼
#spring.activemq.user=admin
# spring.activemq.password=admin

7 應用場景:

  • 用在高併發的請求中
    • 註冊簡訊的傳送
    • 註冊郵件的傳送
    • 秒殺
    • 兩個系統間進行訊息傳遞

4.1、訊息佇列應用場景

以下介紹訊息佇列在實際應用中常用的使用場景。

非同步處理,應用解耦,流量削鋒和訊息通訊四個場景

4.1.1非同步處理

場景說明:使用者註冊後,需要發註冊郵件和註冊簡訊。傳統的做法有兩種:

1.序列方式;2.並行方式

(1)序列方式:將註冊資訊寫入資料庫成功後,傳送註冊郵件,再發送註冊簡訊。以上三個任務全部完成後,返回給客戶端

在這裡插入圖片描述

(2)並行方式:將註冊資訊寫入資料庫成功後,傳送註冊郵件的同時,傳送註冊簡訊。以上三個任務完成後,返回給客戶端。與序列的差別是,並行的方式可以提高處理的時間

在這裡插入圖片描述

假設三個業務節點每個使用50毫秒鐘,不考慮網路等其他開銷,則序列方式的時間是150毫秒,並行的時間可能是100毫秒。

因為CPU在單位時間內處理的請求數是一定的,假設CPU在1秒內吞吐量是100次。則序列方式1秒內CPU可處理的請求量是7次(1000/150)。並行方式處理的請求量是10次(1000/100)

小結:如以上案例描述,傳統的方式系統的效能(併發量,吞吐量,響應時間)會有瓶頸。如何解決這個問題呢?

引入訊息佇列,將不是必須的業務邏輯,非同步處理。改造後的架構如下:

在這裡插入圖片描述

按照以上約定,使用者的響應時間相當於是註冊資訊寫入資料庫的時間,也就是50毫秒。註冊郵件,傳送簡訊寫入訊息佇列後,直接返回,因此寫入訊息佇列的速度很快,基本可以忽略,因此使用者的響應時間可能是50毫秒。因此架構改變後,系統的吞吐量提高到每秒20 QPS。比序列提高了3倍,比並行提高了2倍

4.1.2應用解耦

場景說明:使用者下單後,訂單系統需要通知庫存系統。傳統的做法是,訂單系統呼叫庫存系統的介面。如下圖

在這裡插入圖片描述

傳統模式的缺點:

l 假如庫存系統無法訪問,則訂單減庫存將失敗,從而導致訂單失敗

l 訂單系統與庫存系統耦合

如何解決以上問題呢?引入應用訊息佇列後的方案,如下圖:

在這裡插入圖片描述

  • 訂單系統:使用者下單後,訂單系統完成持久化處理,將訊息寫入訊息佇列,返回使用者訂單下單成功

  • 庫存系統:訂閱下單的訊息,採用pub/sub(釋出/訂閱)的方式,獲取下單資訊,庫存系統根據下單資訊,進行庫存操作

  • 假如:在下單時庫存系統不能正常使用。也不影響正常下單,因為下單後,訂單系統寫入訊息佇列就不再關心其他的後續操作了。實現訂單系統與庫存系統的應用解耦

4.1.3流量削鋒

流量削鋒也是訊息佇列中的常用場景,一般在秒殺或團搶活動中使用廣泛

應用場景:秒殺活動,一般會因為流量過大,導致流量暴增,應用掛掉。為解決這個問題,一般需要在應用前端加入訊息佇列。

  • 可以控制活動的人數

  • 可以緩解短時間內高流量壓垮應用

在這裡插入圖片描述

  • 使用者的請求,伺服器接收後,首先寫入訊息佇列。假如訊息佇列長度超過最大數量,則直接拋棄使用者請求或跳轉到錯誤頁面

  • 秒殺業務根據訊息佇列中的請求資訊,再做後續處理

4.1.4日誌處理

日誌處理是指將訊息佇列用在日誌處理中,比如Kafka的應用,解決大量日誌傳輸的問題。架構簡化如下

在這裡插入圖片描述

  • 日誌採集客戶端,負責日誌資料採集,定時寫受寫入Kafka佇列

  • Kafka訊息佇列,負責日誌資料的接收,儲存和轉發

  • 日誌處理應用:訂閱並消費kafka佇列中的日誌資料img

(1)Kafka:接收使用者日誌的訊息佇列

(2)Logstash:做日誌解析,統一成JSON輸出給Elasticsearch

(3)Elasticsearch:實時日誌分析服務的核心技術,一個schemaless,實時的資料儲存服務,通過index組織資料,兼具強大的搜尋和統計功能

(4)Kibana:基於Elasticsearch的資料視覺化元件,超強的資料視覺化能力是眾多公司選擇ELK stack的重要原因

4.1.5訊息通訊

訊息通訊是指,訊息佇列一般都內建了高效的通訊機制,因此也可以用在純的訊息通訊。比如實現點對點訊息佇列,或者聊天室等

點對點通訊:

在這裡插入圖片描述

客戶端A和客戶端B使用同一佇列,進行訊息通訊。

聊天室通訊:

在這裡插入圖片描述

客戶端A,客戶端B,客戶端N訂閱同一主題,進行訊息釋出和接收。實現類似聊天室效果。

以上實際是訊息佇列的兩種訊息模式,點對點或釋出訂閱模式。模型為示意圖,供參考。

4.2、JMS訊息服務

訊息佇列的JAVAEE規範JMS 。JMS(Java Message Service,java訊息服務)API是一個訊息服務的標準/規範,允許應用程式元件基於JavaEE平臺建立、傳送、接收和讀取訊息。它使分散式通訊耦合度更低,訊息服務更加可靠以及非同步性。

4.2.1 訊息模型

在JMS標準中,有兩種訊息模型P2P(Point to Point),Publish/Subscribe(Pub/Sub)。

4.2.2 P2P模式-佇列模式

在這裡插入圖片描述

P2P模式包含三個角色:訊息佇列(Queue),傳送者(Sender),接收者(Receiver)。每個訊息都被髮送到一個特定的佇列,接收者從佇列中獲取訊息。佇列保留著訊息,直到他們被消費或超時。

P2P的特點

  • 每個訊息只能被一個消費者(Consumer)消費(即一旦被消費,訊息就不再存在於訊息佇列中)

  • 傳送者和接收者之間在時間上沒有依賴性,也就是說當傳送者傳送了訊息之後,不管接收者有沒有正在執行,它不會影響到訊息被髮送到佇列

  • 接收者在成功接收訊息之後需向佇列應答成功

如果希望傳送的每個訊息都會被成功處理的話,那麼需要P2P模式。

4.2.3 Pub/Sub模式–廣播/主題模式

在這裡插入圖片描述

包含三個角色主題(Topic),釋出者(Publisher),訂閱者(Subscriber) 多個釋出者將訊息傳送到Topic,系統將這些訊息傳遞給多個訂閱者。

Pub/Sub的特點

  • 每個訊息可以有多個消費者

  • 釋出者和訂閱者之間有時間上的依賴性。針對某個主題(Topic)的訂閱者,它必須建立一個訂閱者之後,才能消費釋出者的訊息

  • 為了消費訊息,訂閱者必須保持執行的狀態

為了緩和這樣嚴格的時間相關性,JMS允許訂閱者建立一個可持久化的訂閱。這樣,即使訂閱者沒有被啟用(執行),它也能接收到釋出者的訊息。

如果希望傳送的訊息可以被多個消費者處理的話,那麼可以採用Pub/Sub模型。

4.3.訊息消費方式

在JMS中,訊息的產生和消費都是非同步的。對於消費來說,JMS的訊息者可以通過兩種方式來消費訊息。

(1)同步

訂閱者或接收者通過receive方法來接收訊息,receive方法在接收到訊息之前(或超時之前)將一直阻塞;

(2)非同步

訂閱者或接收者可以註冊為一個訊息監聽器。當訊息到達之後,系統自動呼叫監聽器的onMessage方法。