ActiveMQ入門以及使用整合spring boot
一、 ActiveMQ 訊息佇列的使用
MQ
MQ:Message Queue 訊息佇列,就是用來在系統之間進行訊息傳遞的
這個佇列有一系列具體的實現技術:ActiveMQ、rabbitMQ、kafka、RocketMQ(alibaba)
HttpCilent和MQ的比較
1 HttpClient只能算是兩個系統間呼叫的技術
HttpClient支援跨作業系統跨語言呼叫
2 MQ可以在兩個系統間進行資訊互動,並且支援高併發
ActiveMQ支援跨作業系統的java語言中通訊
rabbitMQ支援跨作業系統跨程式語言間的通訊
1、ActiveMQ簡介
什麼是ActiveMQ
ActiveMQ工作原理:在本系統中的呼叫
1、 解決服務之間程式碼耦合
2、 使用訊息佇列,增加系統併發處理量
ActiveMQ應用場景分析:
1、 當系統使用簡訊平臺、郵件平臺的時候。
使用者註冊,重點使用使用者資訊資料庫儲存,而發簡訊、發郵件,增加業務處理複雜度,這時候使用MQ, 將發簡訊、發郵箱,通知MQ,由另外服務平臺完成。解決了程式碼的耦合問題。
2、 當系統使用搜索平臺、快取平臺的時候。
查詢資料,建立快取、索引 ,當再次查詢相同資料的時候,不從資料庫查詢,從快取或者索引庫查詢
當增加、修改、刪除資料時,傳送訊息給MQ, 快取平臺、索引平臺 從MQ獲取到這個資訊,更新快取或者索引
總結:使用MQ作為系統間資料呼叫的中轉站。
2、ActiveMQ安裝和使用
下載
下載windows版本
進行apache-activemq-5.14.0\bin\win64目錄 啟動activemq.bat 檔案
使用者名稱和密碼 都是admin
ActiveMQ使用的是標準生產者和消費者模型
有兩種資料結構 Queue、Topic (詳見4.2)
1、Queue 佇列(P2P訊息模型) ,生產者生產了一個訊息,只能由一個消費者進行消費 :給微信好友發訊息
2、Topic 主題/廣播(Pub/Sub訊息模型),生產者生產了一個訊息,可以由多個消費者進行消費
微信公眾號給粉絲髮訊息
JMS和ActiveMQ的對應
JMS訊息模型 | P2P模式 | pub/sub模式 |
---|---|---|
ActiveMQ訊息 | Queue佇列 | Topic佇列 |
特點 | 一對一,一個人傳送,只允許一個人接收 | 一對多,一個人傳送,允許多個人接收 |
傳送的人:生產者
接收的人:消費者
2、使用Java程式操作ActiveMQ
2.1、Queue-HelloWorld
2.1.1、pom
<dependencies>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.14.0</version>
</dependency>
</dependencies>
2.1.2、編寫生產者
使用JMS原生API編寫測試類,向訊息中介軟體寫入訊息的開發步驟:
1 建立連結工廠
2 從連結工廠中獲取連結
3 啟動連結
4 獲取會話
5 建立Queue佇列
6 建立生產者
7 建立訊息
8 傳送訊息
9 提交請求
10 關閉各種資源
第一步:在test/java包中,建立包activeMQ_helloworld,建立類ActiveMQProducter
public class ActiveMQProducter {
public static void main(String[] args) throws Exception{
// 連線工廠
// 使用預設使用者名稱、密碼、路徑
// 因為:底層實現:final String defaultURL = "tcp://" + DEFAULT_BROKER_HOST + ":" + DEFAULT_BROKER_PORT;
// 所以:路徑 tcp://host:61616
//1 建立連線工廠
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
//2 建立連線
Connection connection = connectionFactory.createConnection();
//3 開啟連線
connection.start();
//4 建立會話
//第一個引數:是否開啟事務
//第二個引數:訊息是否自動確認
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
//建立佇列
Queue queue = session.createQueue("hello1115");
//5 建立生產者
MessageProducer producer = session.createProducer(queue);
//6 建立訊息
Message message = session.createTextMessage("hi i am boy");
//7 傳送訊息
producer.send(message);
//8 關閉訊息
session.commit();
producer.close();
session.close();
connection.close();
System.out.println("訊息生產成功");
}
}
第二步:執行程式碼,在控制檯提示:
第三步:檢視頁面效果,預設tcp連線activeMQ埠 61616 !!!
2.1.3、編寫消費者
使用JMS原生API編寫測試類,向訊息中介軟體消費訊息的開發步驟:
1 建立連結工廠
2 建立連結
3 啟動連結
4 獲取會話
5 建立佇列
6 建立消費者
7 消費訊息
8 提交
9 關閉資源
第一步:使用MessageConsumer完成消費ActiveMQConsumer.java
public class ActiveMQConsumer {
public static void main(String[] args) throws Exception {
//建立連線工廠
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
//建立連線
Connection connection = connectionFactory.createConnection();
//開啟連線
connection.start();
//建立會話
/** 第一個引數,是否使用事務
如果設定true,操作訊息佇列後,必須使用 session.commit();
如果設定false,操作訊息佇列後,不使用session.commit();
*/
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
//建立佇列
Queue queue = session.createQueue("hello1115");
//建立消費者
MessageConsumer consumer = session.createConsumer(queue);
while(true){
//失效時間,如果10秒內沒有收到新的訊息,說明沒有訊息存在,此時可以退出當前迴圈
TextMessage message = (TextMessage) consumer.receive(10000);
if(message!=null){
System.out.println(message.getText());
}else {
break;
}
}
//關閉連線
session.commit();
session.close();
connection.close();
System.out.println("消費結束0");
}
}
第二步:檢視控制檯,發現資訊已經被消費
第三步:檢視頁面效果
使用者名稱和密碼 都是admin
消費前:表示沒有消費
消費後:表示已經消費
2.1.4、監聽器消費訊息
// 使用監聽器消費
public static void main(String[] args) throws Exception {
// 連線工廠
// 使用預設使用者名稱、密碼、路徑
// 路徑 tcp://host:61616
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
// 獲取一個連線
Connection connection = connectionFactory.createConnection();
// 開啟連線
connection.start();
// 建立會話
// 第一個引數,是否使用事務,如果設定true,操作訊息佇列後,必須使用 session.commit();
Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
// 建立佇列或者話題物件
Queue queue = session.createQueue("Hello1115");
// 建立消費者
MessageConsumer messageConsumer = session.createConsumer(queue);
messageConsumer.setMessageListener(new MessageListener() {
// 每次接收訊息,自動呼叫 onMessage
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println(textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
//此時,不能讓程式結束,如果結束,監聽就結束了
while (true) {
// 目的:不能讓程式死掉
}
}
重複測試生成和消費的過程。實現一邊生產,一邊消費的系統。
2.2、多消費者模式-queue
P2P訊息模型中的多消費者模式,得出結論如下:
- 一個訊息只能被一個消費者消費,不可重複消費
- 多個消費者均分訊息(負載均衡策略)
- 當消費者在消費某個訊息的時候,mq一定要等到它的成功回執,才會分發下一個訊息
注意:測試的時候一定先啟動消費者,然後再啟動生產者
###3.3、Topic-HelloWorld
Topic:主題模式、廣播模式、pus/sub模式、
//4 獲取會話
// 第一個引數:是否開啟事務 true 開啟事務,後面一定要提交commit
// 第二個引數:是否自動確認訊息已經被消費
Session session = connection.createSession(true,Session.AUTO_ACKNOWLEDGE);
//5 建立Topic主題模式
//Queue queue = session.createQueue("java1.0913");
Topic topic = session.createTopic("java1.0319");
//6 建立生產者
MessageProducer producer = session.createProducer(topic);
2.4、多消費者模式-topic
2.5 queue和topic模式的比較
相同點:
1、都只有一個生產者
2、都可以有多個消費者
不同點:
1、queue佇列模式,一個訊息只能被一個消費者消費,不能重複消費
當消費者消費某個訊息的時候,一定要得到這個訊息被成功消費的回執,才會分發下一個訊息 queue入隊之後,無論等待多久,訊息都會一直等待消費者來處理
2、topic廣播模式,一個訊息可以被多個消費者消費
這個訊息無法被成功消費與否,都無所謂
topic要求時間要一致,我正好發,你正好收
3、SpringBoot整合ActiveMQ
在Spring Boot中整合ActiveMQ相對還是比較簡單的,都不需要安裝什麼服務,預設使用記憶體的activeMQ,當然配合外接ActiveMQ Server會更好。
我們採用外接ActiveMQ。
3.1、建立工程maven子模組
1、在common_parent中匯入activeMQ的啟動器
<!-- ActiveMQ的啟動器 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
2、建立bos-mq子模組
3、新增專案依賴
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<!-- ActiveMQ的啟動器 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
</dependencies>
4、建立程式入口
程式碼如下:
@SpringBootApplication
public class BosMqApplication {
public static void main(String[] args) {
SpringApplication.run(BosMqApplication.class, args);
}
}
5、建立application.properties,具體配置如下:
# MQ所在的伺服器的地址
spring.activemq.broker-url=tcp://127.0.0.1:61616
# 是否使用內建的MQ, true 使用; fale 不使用
spring.activemq.in-memory=false
# 是否在回滾回滾訊息之前停止訊息傳遞。這意味著當啟用此命令時,訊息順序不會被保留。
spring.activemq.non-blocking-redelivery=false
# 使用者名稱
spring.activemq.password=admin
# 密碼
spring.activemq.user=admin
配置的具體意思
spring.activemq.broker-url=tcp://127.0.0.1:61616
# 在考慮結束之前等待的時間
#spring.activemq.close-timeout=15s
# 預設代理URL是否應該在記憶體中。如果指定了顯式代理,則忽略此值。
spring.activemq.in-memory=true
# 是否在回滾回滾訊息之前停止訊息傳遞。這意味著當啟用此命令時,訊息順序不會被保留。
spring.activemq.non-blocking-redelivery=false
# 密碼
spring.activemq.password=admin
# 等待訊息傳送響應的時間。設定為0等待永遠。
spring.activemq.user=admin
# 是否信任所有包
#spring.activemq.packages.trust-all=
# 要信任的特定包的逗號分隔列表(當不信任所有包時)
#spring.activemq.packages.trusted=
# 當連線請求和池滿時是否阻塞。設定false會拋“JMSException異常”。
#spring.activemq.pool.block-if-full=true
# 如果池仍然滿,則在丟擲異常前阻塞時間。
#spring.activemq.pool.block-if-full-timeout=-1ms
# 是否在啟動時建立連線。可以在啟動時用於加熱池。
#spring.activemq.pool.create-connection-on-startup=true
# 是否用Pooledconnectionfactory代替普通的ConnectionFactory。
#spring.activemq.pool.enabled=false
# 連線過期超時。
#spring.activemq.pool.expiry-timeout=0ms
# 連線空閒超時
#spring.activemq.pool.idle-timeout=30s
# 連線池最大連線數
#spring.activemq.pool.max-connections=1
# 每個連線的有效會話的最大數目。
#spring.activemq.pool.maximum-active-session-per-connection=500
# 當有"JMSException"時嘗試重新連線
#spring.activemq.pool.reconnect-on-exception=true
# 在空閒連線清除執行緒之間執行的時間。當為負數時,沒有空閒連線驅逐執行緒執行。
#spring.activemq.pool.time-between-expiration-check=-1ms
# 是否只使用一個MessageProducer
#spring.activemq.pool.use-anonymous-producers=true
6、建立config
程式碼如下:
package com.czxy.config;
import javax.jms.Queue;
import javax.jms.Topic;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ActiveMQConfig {
@Bean
public Queue queue() {
return new ActiveMQQueue("Armyman.queue");
}
@Bean
public Topic topic() {
return new ActiveMQTopic("Armyman.topic");
}
}
3.2、編寫消費者和生產者
1、編寫生產者QueueProducer
/**
* 訊息的生產者
* @author Administrator
*
*/
@Componet
@EnableScheduling
public class QueueProducer {
/*
* @Autowired // 也可以注入JmsTemplate,JmsMessagingTemplate對JmsTemplate進行了封裝
* private JmsMessagingTemplate jmsTemplate; //
* 傳送訊息,destination是傳送到的佇列,message是待發送的訊息
*
* @Scheduled(fixedDelay=3000)//每3s執行1次
public void sendMessage(Destination destination, final String message){
jmsTemplate.convertAndSend(destination, message);
}
*/
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@Autowired
private Queue queue;
@Scheduled(fixedDelay=3000)//每3s執行1次
public void send() {
try {
MapMessage mapMessage = new ActiveMQMapMessage();
mapMessage.setString("info", "你還在睡覺");
this.jmsMessagingTemplate.convertAndSend(this.queue, mapMessage);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
2、編寫消費者QueueConsumer
/**
* 訊息的消費者
* @author Administrator
*/
@Component
public class QueueConsumer {
//使用JmsListener配置消費者監聽的佇列,其中Message是接收到的訊息
@JmsListener(destination = "Armyman.queue")
public void receiveQueue(Message message) {
try {
MapMessage mapMessage = (MapMessage) message;
String info = mapMessage.getString("info");
System.out.println(info);
} catch (Exception e) {
e.printStackTrace();
}
}
}
3、啟動,測試,OK
3.3 使用內建ActiveMQ
只需要改變properties配置檔案,即可執行
# MQ所在的伺服器的地址
# spring.activemq.broker-url=tcp://127.0.0.1:61616
# 是否使用SpringBoot內建的MQ, true 使用; fale 不使用
spring.activemq.in-memory=true
# 是否在回滾回滾訊息之前停止訊息傳遞。這意味著當啟用此命令時,訊息順序不會被保留。
spring.activemq.non-blocking-redelivery=false
# 使用者名稱
# 密碼
#spring.activemq.user=admin
# spring.activemq.password=admin
7 應用場景:
- 用在高併發的請求中
- 註冊簡訊的傳送
- 註冊郵件的傳送
- 秒殺
- 兩個系統間進行訊息傳遞
4.1、訊息佇列應用場景
以下介紹訊息佇列在實際應用中常用的使用場景。
非同步處理,應用解耦,流量削鋒和訊息通訊四個場景
4.1.1非同步處理
場景說明:使用者註冊後,需要發註冊郵件和註冊簡訊。傳統的做法有兩種:
1.序列方式;2.並行方式
(1)序列方式:將註冊資訊寫入資料庫
成功後,傳送註冊郵件,再發送註冊簡訊。以上三個任務全部完成後,返回給客戶端
(2)並行方式:將註冊資訊寫入資料庫成功後,傳送註冊郵件的同時,傳送註冊簡訊。以上三個任務完成後,返回給客戶端。與序列的差別是,並行的方式可以提高處理的時間
假設三個業務節點每個使用50毫秒鐘,不考慮網路等其他開銷,則序列方式的時間是150毫秒,並行的時間可能是100毫秒。
因為CPU在單位時間內處理的請求數是一定的,假設CPU在1秒內吞吐量是100次。則序列方式1秒內CPU可處理的請求量是7次(1000/150)。並行方式處理的請求量是10次(1000/100)
小結:如以上案例描述,傳統的方式系統的效能(併發量,吞吐量,響應時間)會有瓶頸。如何解決這個問題呢?
引入訊息佇列,將不是必須的業務邏輯,非同步處理。改造後的架構如下:
按照以上約定,使用者的響應時間相當於是註冊資訊寫入資料庫的時間,也就是50毫秒。註冊郵件,傳送簡訊寫入訊息佇列後,直接返回,因此寫入訊息佇列的速度很快,基本可以忽略,因此使用者的響應時間可能是50毫秒。因此架構改變後,系統的吞吐量提高到每秒20 QPS。比序列提高了3倍,比並行提高了2倍
4.1.2應用解耦
場景說明:使用者下單後,訂單系統需要通知庫存系統。傳統的做法是,訂單系統呼叫庫存系統的介面。如下圖
傳統模式的缺點:
l 假如庫存系統無法訪問,則訂單減庫存將失敗,從而導致訂單失敗
l 訂單系統與庫存系統耦合
如何解決以上問題呢?引入應用訊息佇列後的方案,如下圖:
-
訂單系統:使用者下單後,訂單系統完成持久化處理,將訊息寫入訊息佇列,返回使用者訂單下單成功
-
庫存系統:訂閱下單的訊息,採用pub/sub(釋出/訂閱)的方式,獲取下單資訊,庫存系統根據下單資訊,進行庫存操作
-
假如:在下單時庫存系統不能正常使用。也不影響正常下單,因為下單後,訂單系統寫入訊息佇列就不再關心其他的後續操作了。實現訂單系統與庫存系統的應用解耦
4.1.3流量削鋒
流量削鋒也是訊息佇列中的常用場景,一般在秒殺或團搶活動中使用廣泛
應用場景:秒殺活動,一般會因為流量過大,導致流量暴增,應用掛掉。為解決這個問題,一般需要在應用前端加入訊息佇列。
-
可以控制活動的人數
-
可以緩解短時間內高流量壓垮應用
-
使用者的請求,伺服器接收後,首先寫入訊息佇列。假如訊息佇列長度超過最大數量,則直接拋棄使用者請求或跳轉到錯誤頁面
-
秒殺業務根據訊息佇列中的請求資訊,再做後續處理
4.1.4日誌處理
日誌處理是指將訊息佇列用在日誌處理中,比如Kafka的應用,解決大量日誌傳輸的問題。架構簡化如下
-
日誌採集客戶端,負責日誌資料採集,定時寫受寫入Kafka佇列
-
Kafka訊息佇列,負責日誌資料的接收,儲存和轉發
-
日誌處理應用:訂閱並消費kafka佇列中的日誌資料
(1)Kafka:接收使用者日誌的訊息佇列
(2)Logstash:做日誌解析,統一成JSON輸出給Elasticsearch
(3)Elasticsearch:實時日誌分析服務的核心技術,一個schemaless,實時的資料儲存服務,通過index組織資料,兼具強大的搜尋和統計功能
(4)Kibana:基於Elasticsearch的資料視覺化元件,超強的資料視覺化能力是眾多公司選擇ELK stack的重要原因
4.1.5訊息通訊
訊息通訊是指,訊息佇列一般都內建了高效的通訊機制,因此也可以用在純的訊息通訊。比如實現點對點訊息佇列,或者聊天室等
點對點通訊:
客戶端A和客戶端B使用同一佇列,進行訊息通訊。
聊天室通訊:
客戶端A,客戶端B,客戶端N訂閱同一主題,進行訊息釋出和接收。實現類似聊天室效果。
以上實際是訊息佇列的兩種訊息模式,點對點或釋出訂閱模式。模型為示意圖,供參考。
4.2、JMS訊息服務
訊息佇列的JAVAEE規範JMS 。JMS(Java Message Service,java訊息服務)API是一個訊息服務的標準/規範,允許應用程式元件基於JavaEE平臺建立、傳送、接收和讀取訊息。它使分散式通訊耦合度更低,訊息服務更加可靠以及非同步性。
4.2.1 訊息模型
在JMS標準中,有兩種訊息模型P2P(Point to Point),Publish/Subscribe(Pub/Sub)。
4.2.2 P2P模式-佇列模式
P2P模式包含三個角色:訊息佇列(Queue),傳送者(Sender),接收者(Receiver)。每個訊息都被髮送到一個特定的佇列,接收者從佇列中獲取訊息。佇列保留著訊息,直到他們被消費或超時。
P2P的特點
-
每個訊息只能被一個消費者(Consumer)消費(即一旦被消費,訊息就不再存在於訊息佇列中)
-
傳送者和接收者之間在時間上沒有依賴性,也就是說當傳送者傳送了訊息之後,不管接收者有沒有正在執行,它不會影響到訊息被髮送到佇列
-
接收者在成功接收訊息之後需向佇列應答成功
如果希望傳送的每個訊息都會被成功處理的話,那麼需要P2P模式。
4.2.3 Pub/Sub模式–廣播/主題模式
包含三個角色主題(Topic),釋出者(Publisher),訂閱者(Subscriber) 多個釋出者將訊息傳送到Topic,系統將這些訊息傳遞給多個訂閱者。
Pub/Sub的特點
-
每個訊息可以有多個消費者
-
釋出者和訂閱者之間有時間上的依賴性。針對某個主題(Topic)的訂閱者,它必須建立一個訂閱者之後,才能消費釋出者的訊息
-
為了消費訊息,訂閱者必須保持執行的狀態
為了緩和這樣嚴格的時間相關性,JMS允許訂閱者建立一個可持久化的訂閱。這樣,即使訂閱者沒有被啟用(執行),它也能接收到釋出者的訊息。
如果希望傳送的訊息可以被多個消費者處理的話,那麼可以採用Pub/Sub模型。
4.3.訊息消費方式
在JMS中,訊息的產生和消費都是非同步的。對於消費來說,JMS的訊息者可以通過兩種方式來消費訊息。
(1)同步
訂閱者或接收者通過receive方法來接收訊息,receive方法在接收到訊息之前(或超時之前)將一直阻塞;
(2)非同步
訂閱者或接收者可以註冊為一個訊息監聽器。當訊息到達之後,系統自動呼叫監聽器的onMessage方法。