1. 程式人生 > 其它 >RabbitMQ的初識和快速入門

RabbitMQ的初識和快速入門

1.初識MQ

1.1.同步和非同步通訊

微服務間通訊有同步和非同步兩種方式:

同步通訊:就像打電話,需要實時響應。

非同步通訊:就像發郵件,不需要馬上回復。


兩種方式各有優劣,打電話可以立即得到響應,但是你卻不能跟多個人同時通話。傳送郵件可以同時與多個人收發郵件,但是往往響應會有延遲。

1.1.1.同步通訊

Feign呼叫就屬於同步方式,雖然呼叫可以實時得到結果,但存在下面的問題:

總結:
同步呼叫的優點:

  • 時效性較強,可以立即得到結果
    同步呼叫的問題:
  • 耦合度高
  • 效能和吞吐能力下降
  • 有額外的資源消耗
  • 有級聯失敗問題

1.1.2.非同步通訊

非同步呼叫則可以避免上述問題:

我們以購買商品為例,使用者支付後需要呼叫訂單服務完成訂單狀態修改,呼叫物流服務,從倉庫分配響應的庫存並準備發貨。
在事件模式中,支付服務是事件釋出者(publisher),在支付完成後只需要釋出一個支付成功的事件(event),事件中帶上訂單id。
訂單服務和物流服務是事件訂閱者(Consumer),訂閱支付成功的事件,監聽到事件後完成自己業務即可。
為了解除事件釋出者與訂閱者之間的耦合,兩者並不是直接通訊,而是有一箇中間人(Broker)。釋出者釋出事件到Broker,不關心誰來訂閱事件。訂閱者從Broker訂閱事件,不關心誰發來的訊息。

Broker 是一個像資料匯流排一樣的東西,所有的服務要接收資料和傳送資料都發到這個總線上,這個匯流排就像協議一樣,讓服務間的通訊變得標準和可控。

好處:

  • 吞吐量提升:無需等待訂閱者處理完成,響應更快速

  • 故障隔離:服務沒有直接呼叫,不存在級聯失敗問題

  • 呼叫間沒有阻塞,不會造成無效的資源佔用

  • 耦合度極低,每個服務都可以靈活插拔,可替換

  • 流量削峰:不管釋出事件的流量波動多大,都由Broker接收,訂閱者可以按照自己的速度去處理事件

缺點:

  • 架構複雜了,業務沒有明顯的流程線,不好管理
  • 需要依賴於Broker的可靠、安全、效能

好在現在開源軟體或雲平臺上 Broker 的軟體是非常成熟的,比較常見的一種就是我們今天要學習的MQ技術。

1.2.技術對比:

MQ,中文是訊息佇列(MessageQueue),字面來看就是存放訊息的佇列。也就是事件驅動架構中的Broker。

比較常見的MQ實現:

  • ActiveMQ
  • RabbitMQ
  • RocketMQ
  • Kafka

幾種常見MQ的對比:

RabbitMQ ActiveMQ RocketMQ Kafka
公司/社群 Rabbit Apache 阿里 Apache
開發語言 Erlang Java Java Scala&Java
協議支援 AMQP,XMPP,SMTP,STOMP OpenWire,STOMP,REST,XMPP,AMQP 自定義協議 自定義協議
可用性 一般
單機吞吐量 一般 非常高
訊息延遲 微秒級 毫秒級 毫秒級 毫秒以內
訊息可靠性 一般 一般

追求可用性:Kafka、 RocketMQ 、RabbitMQ
追求可靠性:RabbitMQ、RocketMQ
追求吞吐能力:RocketMQ、Kafka
追求訊息低延遲:RabbitMQ、Kafka

2.快速入門RabbitMQ

RabbitMQ中的一些角色:

  • publisher:生產者
  • consumer:消費者
  • exchange個:交換機,負責訊息路由
  • queue:佇列,儲存訊息
  • virtualHost:虛擬主機,隔離不同租戶的exchange、queue、訊息的隔離

2.2.RabbitMQ訊息模型

RabbitMQ官方提供了5個不同的Demo示例,對應了不同的訊息模型:

2.4.入門案例

簡單佇列模式的模型圖:

官方的HelloWorld是基於最基礎的訊息佇列模型來實現的,只包括三個角色:

  • publisher:訊息釋出者,將訊息傳送到佇列queue
  • queue:訊息佇列,負責接受並快取訊息
  • consumer:訂閱佇列,處理佇列中的訊息

下面通過兩個模組進行收發訊息的模擬

工程目錄

1 父工程中匯入 依賴

    <dependencies>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <!--AMQP依賴,包含RabbitMQ-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <!--單元測試-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
    </dependencies>

2.4.1.publisher實現

思路:

  • 建立連線
  • 建立Channel
  • 宣告佇列
  • 傳送訊息
  • 關閉連線和channel
    建立測試類
package cn.mq;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class PublisherTest {
    @Test
    public void testSendMessage() throws IOException, TimeoutException {
        // 1.建立連線
        ConnectionFactory factory = new ConnectionFactory();
        // 1.1.設定連線引數,分別是:主機名、埠號、vhost、使用者名稱、密碼
        factory.setHost("xxxxxx");  //ip
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("hgy");
        factory.setPassword("123");
        // 1.2.建立連線
        Connection connection = factory.newConnection();

        // 2.建立通道Channel
        Channel channel = connection.createChannel();

        // 3.建立佇列
        String queueName = "simple.queue";
        channel.queueDeclare(queueName, false, false, false, null);

        // 4.傳送訊息
        String message = "hello, rabbitmq!";
        channel.basicPublish("", queueName, null, message.getBytes());
        System.out.println("傳送訊息成功:【" + message + "】");

        // 5.關閉通道和連線
        channel.close();
        connection.close();

    }
}

結果

2.4.2.consumer實現

程式碼思路:

  • 建立連線
  • 建立Channel
  • 宣告佇列
  • 訂閱訊息

建立測試類

package cn.itcast.mq;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class consumer {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.建立連線
        ConnectionFactory factory = new ConnectionFactory();
        // 1.1.設定連線引數,分別是:主機名、埠號、vhost、使用者名稱、密碼
        factory.setHost("xxxxx");  //ip
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("hgy");
        factory.setPassword("123");
        // 1.2.建立連線
        Connection connection = factory.newConnection();

        // 2.建立通道Channel
        Channel channel = connection.createChannel();

        // 3.建立佇列
        String queueName = "simple.queue";
        channel.queueDeclare(queueName, false, false, false, null);

        // 4.訂閱訊息
        channel.basicConsume(queueName, true, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                // 5.處理訊息
                String message = new String(body);
                System.out.println("接收到訊息:【" + message + "】");
            }
        });
        System.out.println("等待接收訊息。。。。");
    }
}

2.5.總結

基本訊息佇列的訊息傳送流程:

  1. 建立connection

  2. 建立channel

  3. 利用channel宣告佇列

  4. 利用channel向佇列傳送訊息

基本訊息佇列的訊息接收流程:

  1. 建立connection

  2. 建立channel

  3. 利用channel宣告佇列

  4. 定義consumer的消費行為handleDelivery()

  5. 利用channel將消費者與佇列繫結