消息中間件企業級應用
消息中間件企業級應用
眾所周知,消息中間件是大型分布式系統中不可或缺的重要組件。它使用簡單,卻解決了不少難題,比如異步處理,系統藕合,流量削鋒,分布式事務管理等。實現了一個高性能,高可用,高擴展的系統。本章通過介紹消息中間件的應用場景,消息中間件的傳輸模式,ActiveMQ快速入門 三個方面來對消息中間件進行入門介紹。還在等什麽,趕快來學習吧!
說明:消息中間件非常強大,值得我們認真去學習和使用。完整代碼請異步github。
技術:消息中間件的應用場景,通信模式,ActiveMQ。
源碼:https://github.com/ITDragonBlog/daydayup/tree/master/MQ
文章目錄結構:
消息中間件應用場景
異步處理
異步處理:調用者發起請求後,調用者不會立刻得到結果,也無需等待結果,繼續執行其他業務邏輯。提高了效率但存在異步請求失敗的隱患,適用於非核心業務邏輯處理。
同步處理:調用者發起請求後,調用者必須等待直到返回結果,再根據返回的結果執行其他業務邏輯。效率雖然沒有異步處理高,但能保證業務邏輯可控性,適用於核心業務邏輯處理。
舉一個比較常見的應用場景:為了確保註冊用戶的真實性,一般在註冊成功後會發送驗證郵件或者驗證碼短信,只有認證成功的用戶才能正常使用平臺功能。
如下圖所示:同步處理和異步處理的比較。
用消息中間件實現異步處理的好處:
一、在傳統的系統架構,用戶從註冊到跳轉成功頁面,中間需要等待郵件發送的業務邏輯耗時。這不僅影響系統響應時間,降低了CPU吞吐量,同時還影響了用戶的體驗。
三、消息中間件的讀寫速度非常的快,其中的耗時可以忽略不計。通過消息中間件可以處理更多的請求。
小結:正確使用消息中間件將非核心業務邏輯功能異步處理,可以提高系統的響應效率,提高了CPU的吞吐量,改善用戶的體驗。
系統藕合和事務的最終一致性
分布式系統是若幹個獨立的計算機(系統)集合。每個計算機負責自己的模塊,實現系統的解耦,也避免單點故障對整個系統的影響。每個系統還可以做一個集群,進一步降低故障的發生概率。
在這樣的分布式系統中,消息中間件又扮演著什麽樣的角色呢?
舉一個比較常見的應用場景:訂單系統下單成功後,需要調用倉庫系統接口,選擇最優的發貨倉庫和更新商品庫存。若因為某種原因在調用倉庫系統接口失敗,會直接影響到下單流程。
如下圖所示:感受一下消息中間件扮演的重要角色。
用消息中間件實現系統藕合的好處:
一、消息中間件可以讓各系統之間耦合性降低,不會因為其他系統的異常影響到自身業務邏輯。各盡其職,訂單系統只需負責將訂單數據持久化到數據庫中,倉庫系統只需負責更新庫存,不會因為倉庫系統的原因從而影響到下單的流程。
二、各位看官是否發現了一個問題,下單和庫存減少本應該是一個事務。因為分布式的原因很難保證事務的強一致性。這裏通過消息中間件實現事務的最終一致性效果(後續會詳細介紹)。
小結:事務的一致性固然重要,沒有庫存會導致下單失敗是一個理論上很正常的邏輯。但實際業務中並非如此,我們完全可以利用發貨期通過采購或者借庫的方式來增加庫存。這樣無疑可以增加銷量,還是可以保證事務的最終一致性。
流量削鋒
流量削鋒也稱限流。在秒殺,搶購的活動中,為了不影響整個系統的正常使用,一般會通過消息中間件做限流,避免流量突增壓垮系統,前端頁面可以提示"排隊等待",即便用戶體驗很差,也不能讓系統垮掉。
小結:限流可以在流量突增的情況下保障系統的穩定。系統宕機會被同行抓住笑柄。
消息中間件的傳輸模式
消息中間件除了支持對點對和發布訂閱兩種模式外,在實際開發中還有一種雙向應答模式被廣泛使用。
點對點(p2p)模式
點對點(p2p)模式有三個角色:消息隊列(Queue),發送者(Sender),接收者(Receiver)。發送者將消息發送到一個特定的隊列中,等待接收者從隊列中獲取消息消耗。
P2P的三個特點:
一、每個消息只能被一個接收者消費,且消息被消費後默認從隊列中刪掉(也可以通過其他簽收機制重復消費)。
二、發送者和接收者之間沒有依賴性,生產者發送消息和消費者接收消息並不要求同時運行。
三、接收者在成功接收消息之後需向隊列發送接收成功的確認消息。
發布訂閱(Pub/Sub)模式
發布訂閱(Pub/Sub)模式也有三個角色:主題(Topic),發布者(Publisher),訂閱者(Subscriber)。發布者將消息發送到主題隊列中,系統再將這些消息傳遞給訂閱者。
Pub/Sub的特點:
一、每個消息可以被多個訂閱者消費。
二、發布者和訂閱者之間存在依賴性。訂閱者必須先訂閱主題後才能接收到信息,在訂閱前發布的消息,訂閱者是接收不到的。
三、非持久化訂閱:如果訂閱者不在線,此時發布的消息訂閱者是也接收不到,即便訂閱者重新上線也接收不到。
四、持久化訂閱:訂閱者訂閱主題後,即便訂閱者不在線,此時發布的消息可以在訂閱者重新上線後接收到的。
雙向應答模式
雙向應答模式並不是消息中間件提供的一種通信模式,它是由於實際生成環境的需要,在原有的基礎上做了改良。即消息的發送者也是消息的接收者。消息的接收者也是消息的發送者。如下圖所示
ActiveMQ 入門
ActiveMQ是Apache出品,簡單好用,能力強大,可以處理大部分的業務的開源消息總線。同時也支持JMS規範。
JMS(JAVA Message Service,java消息服務)API是一個消息服務的標準或者說是規範,允許應用程序組件基於JavaEE平臺創建、發送、接收和讀取消息。它使分布式通信耦合度更低,消息服務更加可靠以及異步性。
ActiveMQ 安裝
ActiveMQ 的安裝很簡單,三個步驟:
第一步:下載,官網下載地址:http://activemq.apache.org/download.html。
第二步:運行,壓縮包解壓後,在bin目錄下根據電腦系統位數找到對應的wrapper.exe程序,雙擊運行。
第三步:訪問,瀏覽器訪問http://localhost:8161/admin,賬號密碼都是admin。
ActiveMQ 工作流程
我們通過簡單的P2P模式來了解ActiveMQ的工作流程。
生產者發送消息給MQ主要步驟:
第一步:創建連接工廠實例
第二步:創建連接並啟動
第三步:獲取操作消息的接口
第四步:創建隊列,即Queue或者Topic
第五步:創建消息發送者
第六步:發送消息,關閉資源
import java.util.Random;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* 消息隊列生產者
* @author itdragon
*/
public class ITDragonProducer {
private static final String QUEUE_NAME = "ITDragon.Queue";
public static void main(String[] args) {
// ConnectionFactory: 連接工廠,JMS 用它創建連接
ConnectionFactory connectionFactory = null;
// Connection: 客戶端和JMS系統之間建立的鏈接
Connection connection = null;
// Session: 一個發送或接收消息的線程 ,操作消息的接口
Session session = null;
// Destination: 消息的目的地,消息發送給誰
Destination destination = null;
// MessageProducer: 消息生產者
MessageProducer producer = null;
try {
// step1 構造ConnectionFactory實例對象,需要填入 用戶名, 密碼 以及要連接的地址,默認端口為"tcp://localhost:61616"
connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD, ActiveMQConnection.DEFAULT_BROKER_URL);
// step2 連接工廠創建連接對象
connection = connectionFactory.createConnection();
// step3 啟動
connection.start();
// step4 獲取操作連接
/**
* 第一個參數:是否設置事務 true or false。 如果設置了true,第二個參數忽略,並且需要commit()才執行
* 第二個參數:acknowledge模式
* AUTO_ACKNOWLEDGE:自動確認,客戶端發送和接收消息不需要做額外的工作。不管消息是否被正常處理。 默認
* CLIENT_ACKNOWLEDGE:客戶端確認。客戶端接收到消息後,必須手動調用acknowledge方法,jms服務器才會刪除消息。
* DUPS_OK_ACKNOWLEDGE:允許重復的確認模式。
*/
session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
// step5 創建一個隊列到目的地
destination = session.createQueue(QUEUE_NAME);
// step6 在目的地創建一個生產者
producer = session.createProducer(destination);
// step7 生產者設置不持久化,若要設置持久化則使用 PERSISTENT
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
// step8 生產者發送信息,具體的業務邏輯
sendMessage(session, producer);
session.commit();
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (null != connection) {
connection.close();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
public static void sendMessage(Session session, MessageProducer producer) throws Exception {
for(int i = 0; i < 5; i++) {
String []operators = {"+","-","*","/"};
Random random = new Random(System.currentTimeMillis());
String expression = random.nextInt(10)+operators[random.nextInt(4)]+(random.nextInt(10)+1);
TextMessage message = session.createTextMessage(expression);
// 發送消息到目的地方
producer.send(message);
System.out.println("Queue Sender ---------> " + expression);
}
}
}
消費者從MQ中獲取數據消費步驟和上面類似,只是將發送消息改成了接收消息。
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import com.itdragon.utils.ITDragonUtil;
/**
* 消息隊列消費者
* @author itdragon
*/
public class ITDragonConsumer {
private static final String QUEUE_NAME = "ITDragon.Queue"; // 要和Sender一致
public static void main(String[] args) {
ConnectionFactory connectionFactory = null;
Connection connection = null;
Session session = null;
Destination destination = null;
// MessageConsumer: 信息消費者
MessageConsumer consumer = null;
try {
connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD, ActiveMQConnection.DEFAULT_BROKER_URL);
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue(QUEUE_NAME);
consumer = session.createConsumer(destination);
// 不斷地接收信息,直到沒有為止
while (true) {
TextMessage message = (TextMessage) consumer.receive();
if (null != message) {
System.out.print(ITDragonUtil.cal(message.getText()));
} else {
break;
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (null != connection) {
connection.close();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
SpringBoot 整合ActiveMQ使用
SpringBoot可以幫助我們快速搭建項目,減少Spring整合第三方配置的精力。SpringBoot整合ActiveMQ也是非常簡單,只需要簡單的兩個步驟:
第一步,在pom.xml文件中添加依賴使其支持ActiveMQ
第二步,在application.properties文件中配置連接ActiveMQ參數
pom.xml是Maven項目的核心配置文件
<dependency> <!-- 支持ActiveMQ依賴 -->
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<dependency> <!-- 支持使用mq連接池 -->
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
</dependency>
application.properties是SpringBoot項目的核心參數配置文件
spring.activemq.user=admin
spring.activemq.password=admin
spring.activemq.broker-url=tcp://localhost:61616
spring.activemq.in-memory=true
spring.activemq.pool.enabled=true
spring.activemq.in-memory
默認值為true,表示無需安裝ActiveMQ的服務器,直接使用內存。
spring.activemq.pool.enabled
表示通過連接池的方式連接。
springboot-activemq-producer 項目模擬生產者所在的系統,支持同時發送點對點模式和發布訂閱模式。
兩個核心文件:一個是消息發送類,一個是隊列Bean管理配置類。
三種主要模式:一個是對點對模式,隊列名為"queue.name";一個是發布訂閱模式,主題名為"topic.name";最後一個是雙向應答模式,隊列名為"response.name" 。
import java.util.Random;
import javax.jms.Queue;
import javax.jms.Topic;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
/**
* 消息隊列生產者
* @author itdragon
*/
@Service
@EnableScheduling
public class ITDragonProducer {
@Autowired
private JmsMessagingTemplate jmsTemplate;
@Autowired
private Queue queue;
@Autowired
private Topic topic;
@Autowired
private Queue responseQueue;
/**
* 點對點(p2p)模式測試
* 包含三個角色:消息隊列(Queue),發送者(Sender),接收者(Receiver)。
* 發送者將消息發送到一個特定的隊列,隊列保留著消息,直到接收者從隊列中獲取消息。
*/
@Scheduled(fixedDelay = 5000)
public void testP2PMQ(){
for(int i = 0; i < 5; i++) {
String []operators = {"+","-","*","/"};
Random random = new Random(System.currentTimeMillis());
String expression = random.nextInt(10)+operators[random.nextInt(4)]+(random.nextInt(10)+1);
jmsTemplate.convertAndSend(this.queue, expression);
System.out.println("Queue Sender ---------> " + expression);
}
}
/**
* 訂閱/發布(Pub/Sub)模擬測試
* 包含三個角色:主題(Topic),發布者(Publisher),訂閱者(Subscriber) 。
* 多個發布者將消息發送到Topic,系統將這些消息傳遞給多個訂閱者。
*/
@Scheduled(fixedDelay = 5000)
public void testPubSubMQ() {
for (int i = 0; i < 5; i++) {
String []operators = {"+","-","*","/"};
Random random = new Random(System.currentTimeMillis());
String expression = random.nextInt(10)+operators[random.nextInt(4)]+(random.nextInt(10)+1);
jmsTemplate.convertAndSend(this.topic, expression);
System.out.println("Topic Sender ---------> " + expression);
}
}
/**
* 雙向應答模式測試
* P2P和Pub/Sub是MQ默認提供的兩種模式,而雙向應答模式則是在原有的基礎上做了改進。發送者既是接收者,接收者也是發送者。
*/
@Scheduled(fixedDelay = 5000)
public void testReceiveResponseMQ(){
for (int i = 0; i < 5; i++) {
String []operators = {"+","-","*","/"};
Random random = new Random(System.currentTimeMillis());
String expression = random.nextInt(10)+operators[random.nextInt(4)]+(random.nextInt(10)+1);
jmsTemplate.convertAndSend(this.responseQueue, expression);
}
}
// 接收P2P模式,消費者返回的數據
@JmsListener(destination = "out.queue")
public void receiveResponse(String message) {
System.out.println("Producer Response Receiver ---------> " + message);
}
}
import javax.jms.Queue;
import javax.jms.Topic;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* bean配置管理類
* @author itdragon
*/
@Configuration
public class ActiveMQBeansConfig {
@Bean // 定義一個名字為queue.name的點對點列隊
public Queue queue() {
return new ActiveMQQueue("queue.name");
}
@Bean // 定義一個名字為topic.name的主題隊列
public Topic topic() {
return new ActiveMQTopic("topic.name");
}
@Bean // 定義一個名字為response.name的雙向應答隊列
public Queue responseQueue() {
return new ActiveMQQueue("response.name");
}
}
springboot-activemq-consumer 模擬消費者所在的服務器,主要負責監聽隊列消息。
兩個核心文件:一個是消息接收類,一個是兼容點對點模式和發布訂閱模式的鏈接工廠配置類。
import org.springframework.jms.annotation.JmsListener;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Service;
import com.itdragon.utils.ITDragonUtil;
/**
* 消息隊列消費者
* @author itdragon
*/
@Service
public class ITDragonConsumer {
// 1. 監聽名字為"queue.name"的點對點隊列
@JmsListener(destination = "queue.name", containerFactory="queueListenerFactory")
public void receiveQueue(String text) {
System.out.println("Queue Receiver : " + text + " \t 處理結果 : " + ITDragonUtil.cal(text));
}
// 2. 監聽名字為"topic.name"的發布訂閱隊列
@JmsListener(destination = "topic.name", containerFactory="topicListenerFactory")
public void receiveTopicOne(String text) {
System.out.println(Thread.currentThread().getName() + " No.1 Topic Receiver : " + text + " \t 處理結果 : " + ITDragonUtil.cal(text));
}
// 2. 監聽名字為"topic.name"的發布訂閱隊列
@JmsListener(destination = "topic.name", containerFactory="topicListenerFactory")
public void receiveTopicTwo(String text) {
System.out.println(Thread.currentThread().getName() +" No.2 Topic Receiver : " + text + " \t 處理結果 : " + ITDragonUtil.cal(text));
}
// 3. 監聽名字為"response.name"的接收應答(雙向)隊列
@JmsListener(destination = "response.name")
@SendTo("out.queue")
public String receiveResponse(String text) {
System.out.println("Response Receiver : " + text + " \t 正在返回數據......");
return ITDragonUtil.cal(text).toString();
}
}
import java.util.concurrent.Executors;
import javax.jms.ConnectionFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;
@Configuration
@EnableJms
public class JmsConfig {
@Bean // 開啟pub/Sub模式
public JmsListenerContainerFactory<?> topicListenerFactory(ConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setPubSubDomain(true);
factory.setConnectionFactory(connectionFactory);
return factory;
}
@Bean // JMS默認開啟P2P模式
public JmsListenerContainerFactory<?> queueListenerFactory(ConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setPubSubDomain(false);
factory.setConnectionFactory(connectionFactory);
return factory;
}
}
總結
1) 消息中間件可以解決異步處理,系統解耦,流量削鋒,分布式系統事務管理等問題。
2) 消息中間件默認支持點對點模式和發布訂閱模式,實際工作中還可以使用雙向應當模式。
3) ActiveMQ是Apache出品,簡單好用,功能強大,可以處理大部分的業務的開源消息總線。
到這裏 消息中間件企業級應用使用 的文章就寫完了。如果文章對你有幫助,可以點個"推薦",也可以"關註"我,獲得更多豐富的知識。後續博客計劃是:RocketMQ和Kafka的使用,Zookeeper和相關集群的搭建。若文中有什麽不對或者不嚴謹的地方,請指正。
消息中間件企業級應用