1. 程式人生 > >消息中間件企業級應用

消息中間件企業級應用

ber HA blog msl fin 生產 接收 客戶 out

消息中間件企業級應用

眾所周知,消息中間件是大型分布式系統中不可或缺的重要組件。它使用簡單,卻解決了不少難題,比如異步處理,系統藕合,流量削鋒,分布式事務管理等。實現了一個高性能,高可用,高擴展的系統。本章通過介紹消息中間件的應用場景消息中間件的傳輸模式ActiveMQ快速入門 三個方面來對消息中間件進行入門介紹。還在等什麽,趕快來學習吧!

說明:消息中間件非常強大,值得我們認真去學習和使用。完整代碼請異步github。
技術:消息中間件的應用場景,通信模式,ActiveMQ。
源碼:https://github.com/ITDragonBlog/daydayup/tree/master/MQ
文章目錄結構:

技術分享圖片

消息中間件應用場景

異步處理

異步處理:調用者發起請求後,調用者不會立刻得到結果,也無需等待結果,繼續執行其他業務邏輯。提高了效率但存在異步請求失敗的隱患,適用於非核心業務邏輯處理。
同步處理:調用者發起請求後,調用者必須等待直到返回結果,再根據返回的結果執行其他業務邏輯。效率雖然沒有異步處理高,但能保證業務邏輯可控性,適用於核心業務邏輯處理。

舉一個比較常見的應用場景:為了確保註冊用戶的真實性,一般在註冊成功後會發送驗證郵件或者驗證碼短信,只有認證成功的用戶才能正常使用平臺功能。
如下圖所示:同步處理和異步處理的比較。

技術分享圖片

用消息中間件實現異步處理的好處:
一、在傳統的系統架構,用戶從註冊到跳轉成功頁面,中間需要等待郵件發送的業務邏輯耗時。這不僅影響系統響應時間,降低了CPU吞吐量,同時還影響了用戶的體驗。

二、通過消息中間件將郵件發送的業務邏輯異步處理,用戶註冊成功後發送數據到消息中間件,再跳轉成功頁面,郵件發送的邏輯再由訂閱該消息中間件的其他系統負責處理,
三、消息中間件的讀寫速度非常的快,其中的耗時可以忽略不計。通過消息中間件可以處理更多的請求。

小結:正確使用消息中間件將非核心業務邏輯功能異步處理,可以提高系統的響應效率,提高了CPU的吞吐量,改善用戶的體驗。

系統藕合和事務的最終一致性

分布式系統是若幹個獨立的計算機(系統)集合。每個計算機負責自己的模塊,實現系統的解耦,也避免單點故障對整個系統的影響。每個系統還可以做一個集群,進一步降低故障的發生概率。
在這樣的分布式系統中,消息中間件又扮演著什麽樣的角色呢?

舉一個比較常見的應用場景:訂單系統下單成功後,需要調用倉庫系統接口,選擇最優的發貨倉庫和更新商品庫存。若因為某種原因在調用倉庫系統接口失敗,會直接影響到下單流程。
如下圖所示:感受一下消息中間件扮演的重要角色。

技術分享圖片

用消息中間件實現系統藕合的好處:
一、消息中間件可以讓各系統之間耦合性降低,不會因為其他系統的異常影響到自身業務邏輯。各盡其職,訂單系統只需負責將訂單數據持久化到數據庫中,倉庫系統只需負責更新庫存,不會因為倉庫系統的原因從而影響到下單的流程。
二、各位看官是否發現了一個問題,下單和庫存減少本應該是一個事務。因為分布式的原因很難保證事務的強一致性。這裏通過消息中間件實現事務的最終一致性效果(後續會詳細介紹)。

小結:事務的一致性固然重要,沒有庫存會導致下單失敗是一個理論上很正常的邏輯。但實際業務中並非如此,我們完全可以利用發貨期通過采購或者借庫的方式來增加庫存。這樣無疑可以增加銷量,還是可以保證事務的最終一致性。

流量削鋒

流量削鋒也稱限流。在秒殺,搶購的活動中,為了不影響整個系統的正常使用,一般會通過消息中間件做限流,避免流量突增壓垮系統,前端頁面可以提示"排隊等待",即便用戶體驗很差,也不能讓系統垮掉。

技術分享圖片

小結:限流可以在流量突增的情況下保障系統的穩定。系統宕機會被同行抓住笑柄。

消息中間件的傳輸模式

消息中間件除了支持對點對和發布訂閱兩種模式外,在實際開發中還有一種雙向應答模式被廣泛使用。

點對點(p2p)模式

點對點(p2p)模式有三個角色:消息隊列(Queue),發送者(Sender),接收者(Receiver)。發送者將消息發送到一個特定的隊列中,等待接收者從隊列中獲取消息消耗。
P2P的三個特點:
一、每個消息只能被一個接收者消費,且消息被消費後默認從隊列中刪掉(也可以通過其他簽收機制重復消費)。
二、發送者和接收者之間沒有依賴性,生產者發送消息和消費者接收消息並不要求同時運行。
三、接收者在成功接收消息之後需向隊列發送接收成功的確認消息。

技術分享圖片

發布訂閱(Pub/Sub)模式

發布訂閱(Pub/Sub)模式也有三個角色:主題(Topic),發布者(Publisher),訂閱者(Subscriber)。發布者將消息發送到主題隊列中,系統再將這些消息傳遞給訂閱者。
Pub/Sub的特點:
一、每個消息可以被多個訂閱者消費。
二、發布者和訂閱者之間存在依賴性。訂閱者必須先訂閱主題後才能接收到信息,在訂閱前發布的消息,訂閱者是接收不到的。
三、非持久化訂閱:如果訂閱者不在線,此時發布的消息訂閱者是也接收不到,即便訂閱者重新上線也接收不到。
四、持久化訂閱:訂閱者訂閱主題後,即便訂閱者不在線,此時發布的消息可以在訂閱者重新上線後接收到的。

技術分享圖片

雙向應答模式

雙向應答模式並不是消息中間件提供的一種通信模式,它是由於實際生成環境的需要,在原有的基礎上做了改良。即消息的發送者也是消息的接收者。消息的接收者也是消息的發送者。如下圖所示

技術分享圖片

ActiveMQ 入門

ActiveMQ是Apache出品,簡單好用,能力強大,可以處理大部分的業務的開源消息總線。同時也支持JMS規範。

JMS(JAVA Message Service,java消息服務)API是一個消息服務的標準或者說是規範,允許應用程序組件基於JavaEE平臺創建、發送、接收和讀取消息。它使分布式通信耦合度更低,消息服務更加可靠以及異步性。

ActiveMQ 安裝

ActiveMQ 的安裝很簡單,三個步驟:
第一步:下載,官網下載地址:http://activemq.apache.org/download.html。
第二步:運行,壓縮包解壓後,在bin目錄下根據電腦系統位數找到對應的wrapper.exe程序,雙擊運行。
第三步:訪問,瀏覽器訪問http://localhost:8161/admin,賬號密碼都是admin。

ActiveMQ 工作流程

我們通過簡單的P2P模式來了解ActiveMQ的工作流程。
生產者發送消息給MQ主要步驟:
第一步:創建連接工廠實例
第二步:創建連接並啟動
第三步:獲取操作消息的接口
第四步:創建隊列,即Queue或者Topic
第五步:創建消息發送者
第六步:發送消息,關閉資源

import java.util.Random;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
 * 消息隊列生產者
 * @author itdragon
 */
public class ITDragonProducer {
    
    private static final String QUEUE_NAME = "ITDragon.Queue";  
      
    public static void main(String[] args) {  
        // ConnectionFactory: 連接工廠,JMS 用它創建連接  
        ConnectionFactory connectionFactory = null;  
        // Connection: 客戶端和JMS系統之間建立的鏈接
        Connection connection = null;  
        // Session: 一個發送或接收消息的線程 ,操作消息的接口
        Session session = null;  
        // Destination: 消息的目的地,消息發送給誰  
        Destination destination = null;  
        // MessageProducer: 消息生產者  
        MessageProducer producer = null;  
        try {  
            // step1 構造ConnectionFactory實例對象,需要填入 用戶名, 密碼 以及要連接的地址,默認端口為"tcp://localhost:61616"  
            connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,  
                    ActiveMQConnection.DEFAULT_PASSWORD, ActiveMQConnection.DEFAULT_BROKER_URL);   
            // step2 連接工廠創建連接對象  
            connection = connectionFactory.createConnection();  
            // step3 啟動  
            connection.start();  
            // step4 獲取操作連接  
            /** 
             * 第一個參數:是否設置事務 true or false。 如果設置了true,第二個參數忽略,並且需要commit()才執行 
             * 第二個參數:acknowledge模式 
             * AUTO_ACKNOWLEDGE:自動確認,客戶端發送和接收消息不需要做額外的工作。不管消息是否被正常處理。 默認
             * CLIENT_ACKNOWLEDGE:客戶端確認。客戶端接收到消息後,必須手動調用acknowledge方法,jms服務器才會刪除消息。
             * DUPS_OK_ACKNOWLEDGE:允許重復的確認模式。
             */  
            session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);  
            // step5 創建一個隊列到目的地  
            destination = session.createQueue(QUEUE_NAME);  
            // step6 在目的地創建一個生產者  
            producer = session.createProducer(destination);  
            // step7 生產者設置不持久化,若要設置持久化則使用 PERSISTENT
            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);  
            // step8 生產者發送信息,具體的業務邏輯  
            sendMessage(session, producer);  
            session.commit();  
        } catch (Exception e) {  
            e.printStackTrace();  
        } finally {  
            try {  
                if (null != connection) {  
                    connection.close();  
                }  
            } catch (Exception e) {  
                e.printStackTrace();  
            }  
        }  
    }  
  
    public static void sendMessage(Session session, MessageProducer producer) throws Exception {  
        for(int i = 0; i < 5; i++) {
            String []operators = {"+","-","*","/"};
            Random random = new Random(System.currentTimeMillis());  
            String expression = random.nextInt(10)+operators[random.nextInt(4)]+(random.nextInt(10)+1);
            TextMessage message = session.createTextMessage(expression);  
            // 發送消息到目的地方  
            producer.send(message);  
            System.out.println("Queue Sender ---------> " + expression);
        }
    }  

}

消費者從MQ中獲取數據消費步驟和上面類似,只是將發送消息改成了接收消息。

import javax.jms.Connection;  
import javax.jms.ConnectionFactory;  
import javax.jms.Destination;  
import javax.jms.MessageConsumer;  
import javax.jms.Session;  
import javax.jms.TextMessage;  
import org.apache.activemq.ActiveMQConnection;  
import org.apache.activemq.ActiveMQConnectionFactory;
import com.itdragon.utils.ITDragonUtil;

/**
 * 消息隊列消費者
 * @author itdragon
 */
public class ITDragonConsumer {
    
    private static final String QUEUE_NAME = "ITDragon.Queue"; // 要和Sender一致  
    
    public static void main(String[] args) {  
        ConnectionFactory connectionFactory = null;  
        Connection connection = null;  
        Session session = null;  
        Destination destination = null;  
        // MessageConsumer: 信息消費者  
        MessageConsumer consumer = null;  
        try {  
            connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,  
                    ActiveMQConnection.DEFAULT_PASSWORD, ActiveMQConnection.DEFAULT_BROKER_URL);  
            connection = connectionFactory.createConnection();  
            connection.start();  
            session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);  
            destination = session.createQueue(QUEUE_NAME);  
            consumer = session.createConsumer(destination);  
            // 不斷地接收信息,直到沒有為止  
            while (true) {  
                TextMessage message = (TextMessage) consumer.receive();  
                if (null != message) {  
                    System.out.print(ITDragonUtil.cal(message.getText()));  
                } else {  
                    break;  
                }  
            }  
        } catch (Exception e) {  
            e.printStackTrace();  
        } finally {  
            try {  
                if (null != connection) {  
                    connection.close();  
                }  
            } catch (Exception e) {  
                e.printStackTrace();  
            }  
        }  
    }  
}

SpringBoot 整合ActiveMQ使用

SpringBoot可以幫助我們快速搭建項目,減少Spring整合第三方配置的精力。SpringBoot整合ActiveMQ也是非常簡單,只需要簡單的兩個步驟:
第一步,在pom.xml文件中添加依賴使其支持ActiveMQ
第二步,在application.properties文件中配置連接ActiveMQ參數

pom.xml是Maven項目的核心配置文件

<dependency>  <!-- 支持ActiveMQ依賴 -->
    <groupId>org.springframework.boot</groupId>  
    <artifactId>spring-boot-starter-activemq</artifactId>  
</dependency> 
<dependency>  <!-- 支持使用mq連接池 -->
    <groupId>org.apache.activemq</groupId>  
    <artifactId>activemq-pool</artifactId>  
</dependency>  

application.properties是SpringBoot項目的核心參數配置文件

spring.activemq.user=admin
spring.activemq.password=admin
spring.activemq.broker-url=tcp://localhost:61616
spring.activemq.in-memory=true
spring.activemq.pool.enabled=true

spring.activemq.in-memory 默認值為true,表示無需安裝ActiveMQ的服務器,直接使用內存。
spring.activemq.pool.enabled 表示通過連接池的方式連接。

springboot-activemq-producer 項目模擬生產者所在的系統,支持同時發送點對點模式和發布訂閱模式。
兩個核心文件:一個是消息發送類,一個是隊列Bean管理配置類。
三種主要模式:一個是對點對模式,隊列名為"queue.name";一個是發布訂閱模式,主題名為"topic.name";最後一個是雙向應答模式,隊列名為"response.name" 。

import java.util.Random;
import javax.jms.Queue;
import javax.jms.Topic;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
/**
 * 消息隊列生產者
 * @author itdragon
 */
@Service
@EnableScheduling
public class ITDragonProducer {
    
    @Autowired
    private JmsMessagingTemplate jmsTemplate;  
    @Autowired
    private Queue queue;
    @Autowired
    private Topic topic;
    @Autowired
    private Queue responseQueue;
    
    /**
     * 點對點(p2p)模式測試
     * 包含三個角色:消息隊列(Queue),發送者(Sender),接收者(Receiver)。
     * 發送者將消息發送到一個特定的隊列,隊列保留著消息,直到接收者從隊列中獲取消息。
     */
    @Scheduled(fixedDelay = 5000)
    public void testP2PMQ(){
        for(int i = 0; i < 5; i++) {
            String []operators = {"+","-","*","/"};
            Random random = new Random(System.currentTimeMillis());  
            String expression = random.nextInt(10)+operators[random.nextInt(4)]+(random.nextInt(10)+1);
            jmsTemplate.convertAndSend(this.queue, expression);
            System.out.println("Queue Sender ---------> " + expression);
        }
    }
    
    /**
     * 訂閱/發布(Pub/Sub)模擬測試
     * 包含三個角色:主題(Topic),發布者(Publisher),訂閱者(Subscriber) 。
     * 多個發布者將消息發送到Topic,系統將這些消息傳遞給多個訂閱者。
     */
    @Scheduled(fixedDelay = 5000)
    public void testPubSubMQ() {
        for (int i = 0; i < 5; i++) {
            String []operators = {"+","-","*","/"};
            Random random = new Random(System.currentTimeMillis());
            String expression = random.nextInt(10)+operators[random.nextInt(4)]+(random.nextInt(10)+1);
            jmsTemplate.convertAndSend(this.topic, expression);
            System.out.println("Topic Sender ---------> " + expression);
        }
    }
    
    /**
     * 雙向應答模式測試
     * P2P和Pub/Sub是MQ默認提供的兩種模式,而雙向應答模式則是在原有的基礎上做了改進。發送者既是接收者,接收者也是發送者。
     */
    @Scheduled(fixedDelay = 5000)
    public void testReceiveResponseMQ(){
        for (int i = 0; i < 5; i++) {
            String []operators = {"+","-","*","/"};
            Random random = new Random(System.currentTimeMillis());  
            String expression = random.nextInt(10)+operators[random.nextInt(4)]+(random.nextInt(10)+1);
            jmsTemplate.convertAndSend(this.responseQueue, expression);
        }
    }
    
    // 接收P2P模式,消費者返回的數據
    @JmsListener(destination = "out.queue")
    public void receiveResponse(String message) {  
        System.out.println("Producer Response Receiver  ---------> " + message);  
    }
}
import javax.jms.Queue;
import javax.jms.Topic;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
 * bean配置管理類 
 * @author itdragon
 */
@Configuration
public class ActiveMQBeansConfig {
    
    @Bean   // 定義一個名字為queue.name的點對點列隊
    public Queue queue() {
        return new ActiveMQQueue("queue.name");
    }
    @Bean   // 定義一個名字為topic.name的主題隊列
    public Topic topic() {
        return new ActiveMQTopic("topic.name");
    }
    @Bean   // 定義一個名字為response.name的雙向應答隊列
    public Queue responseQueue() {
        return new ActiveMQQueue("response.name");
    }
}

springboot-activemq-consumer 模擬消費者所在的服務器,主要負責監聽隊列消息。
兩個核心文件:一個是消息接收類,一個是兼容點對點模式和發布訂閱模式的鏈接工廠配置類。

import org.springframework.jms.annotation.JmsListener;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Service;
import com.itdragon.utils.ITDragonUtil;
/**
 * 消息隊列消費者
 * @author itdragon
 */
@Service
public class ITDragonConsumer {
    
    // 1. 監聽名字為"queue.name"的點對點隊列
    @JmsListener(destination = "queue.name", containerFactory="queueListenerFactory")
    public void receiveQueue(String text) {  
        System.out.println("Queue Receiver : " + text + " \t 處理結果 : " + ITDragonUtil.cal(text));  
    }  
    
    // 2. 監聽名字為"topic.name"的發布訂閱隊列
    @JmsListener(destination = "topic.name", containerFactory="topicListenerFactory")
    public void receiveTopicOne(String text) {  
        System.out.println(Thread.currentThread().getName() + " No.1 Topic Receiver : " + text + " \t 處理結果 : " + ITDragonUtil.cal(text));  
    }  
    
    // 2. 監聽名字為"topic.name"的發布訂閱隊列
    @JmsListener(destination = "topic.name", containerFactory="topicListenerFactory")
    public void receiveTopicTwo(String text) {  
        System.out.println(Thread.currentThread().getName() +" No.2 Topic Receiver : " + text + " \t 處理結果 : " + ITDragonUtil.cal(text));  
    }  
    
    // 3. 監聽名字為"response.name"的接收應答(雙向)隊列
    @JmsListener(destination = "response.name")
    @SendTo("out.queue")
    public String receiveResponse(String text) {
        System.out.println("Response Receiver : " + text + " \t 正在返回數據......");  
        return ITDragonUtil.cal(text).toString();
    }

}
import java.util.concurrent.Executors;
import javax.jms.ConnectionFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;

@Configuration  
@EnableJms  
public class JmsConfig {  
    
    @Bean  // 開啟pub/Sub模式
    public JmsListenerContainerFactory<?> topicListenerFactory(ConnectionFactory connectionFactory) {  
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();  
        factory.setPubSubDomain(true);  
        factory.setConnectionFactory(connectionFactory);  
        return factory;  
    }  
    @Bean  // JMS默認開啟P2P模式
    public JmsListenerContainerFactory<?> queueListenerFactory(ConnectionFactory connectionFactory) {  
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();  
        factory.setPubSubDomain(false);  
        factory.setConnectionFactory(connectionFactory);  
        return factory;  
    }  
}  

總結

1) 消息中間件可以解決異步處理,系統解耦,流量削鋒,分布式系統事務管理等問題。

2) 消息中間件默認支持點對點模式和發布訂閱模式,實際工作中還可以使用雙向應當模式。

3) ActiveMQ是Apache出品,簡單好用,功能強大,可以處理大部分的業務的開源消息總線。

到這裏 消息中間件企業級應用使用 的文章就寫完了。如果文章對你有幫助,可以點個"推薦",也可以"關註"我,獲得更多豐富的知識。後續博客計劃是:RocketMQ和Kafka的使用,Zookeeper和相關集群的搭建。若文中有什麽不對或者不嚴謹的地方,請指正。

消息中間件企業級應用