RabbitMQ的初識和快速入門
1.初識MQ
1.1.同步和非同步通訊
微服務間通訊有同步和非同步兩種方式:
同步通訊:就像打電話,需要實時響應。
非同步通訊:就像發郵件,不需要馬上回復。
兩種方式各有優劣,打電話可以立即得到響應,但是你卻不能跟多個人同時通話。傳送郵件可以同時與多個人收發郵件,但是往往響應會有延遲。
1.1.1.同步通訊
Feign呼叫就屬於同步方式,雖然呼叫可以實時得到結果,但存在下面的問題:
總結:
同步呼叫的優點:
- 時效性較強,可以立即得到結果
同步呼叫的問題: - 耦合度高
- 效能和吞吐能力下降
- 有額外的資源消耗
- 有級聯失敗問題
1.1.2.非同步通訊
非同步呼叫則可以避免上述問題:
我們以購買商品為例,使用者支付後需要呼叫訂單服務完成訂單狀態修改,呼叫物流服務,從倉庫分配響應的庫存並準備發貨。
在事件模式中,支付服務是事件釋出者(publisher),在支付完成後只需要釋出一個支付成功的事件(event),事件中帶上訂單id。
訂單服務和物流服務是事件訂閱者(Consumer),訂閱支付成功的事件,監聽到事件後完成自己業務即可。
為了解除事件釋出者與訂閱者之間的耦合,兩者並不是直接通訊,而是有一箇中間人(Broker)。釋出者釋出事件到Broker,不關心誰來訂閱事件。訂閱者從Broker訂閱事件,不關心誰發來的訊息。
Broker 是一個像資料匯流排一樣的東西,所有的服務要接收資料和傳送資料都發到這個總線上,這個匯流排就像協議一樣,讓服務間的通訊變得標準和可控。
好處:
-
吞吐量提升:無需等待訂閱者處理完成,響應更快速
-
故障隔離:服務沒有直接呼叫,不存在級聯失敗問題
-
呼叫間沒有阻塞,不會造成無效的資源佔用
-
耦合度極低,每個服務都可以靈活插拔,可替換
-
流量削峰:不管釋出事件的流量波動多大,都由Broker接收,訂閱者可以按照自己的速度去處理事件
缺點:
- 架構複雜了,業務沒有明顯的流程線,不好管理
- 需要依賴於Broker的可靠、安全、效能
好在現在開源軟體或雲平臺上 Broker 的軟體是非常成熟的,比較常見的一種就是我們今天要學習的MQ技術。
1.2.技術對比:
MQ,中文是訊息佇列(MessageQueue),字面來看就是存放訊息的佇列。也就是事件驅動架構中的Broker。
比較常見的MQ實現:
- ActiveMQ
- RabbitMQ
- RocketMQ
- Kafka
幾種常見MQ的對比:
RabbitMQ | ActiveMQ | RocketMQ | Kafka | |
---|---|---|---|---|
公司/社群 | Rabbit | Apache | 阿里 | Apache |
開發語言 | Erlang | Java | Java | Scala&Java |
協議支援 | AMQP,XMPP,SMTP,STOMP | OpenWire,STOMP,REST,XMPP,AMQP | 自定義協議 | 自定義協議 |
可用性 | 高 | 一般 | 高 | 高 |
單機吞吐量 | 一般 | 差 | 高 | 非常高 |
訊息延遲 | 微秒級 | 毫秒級 | 毫秒級 | 毫秒以內 |
訊息可靠性 | 高 | 一般 | 高 | 一般 |
追求可用性:Kafka、 RocketMQ 、RabbitMQ
追求可靠性:RabbitMQ、RocketMQ
追求吞吐能力:RocketMQ、Kafka
追求訊息低延遲:RabbitMQ、Kafka
2.快速入門RabbitMQ
RabbitMQ中的一些角色:
- publisher:生產者
- consumer:消費者
- exchange個:交換機,負責訊息路由
- queue:佇列,儲存訊息
- virtualHost:虛擬主機,隔離不同租戶的exchange、queue、訊息的隔離
2.2.RabbitMQ訊息模型
RabbitMQ官方提供了5個不同的Demo示例,對應了不同的訊息模型:
2.4.入門案例
簡單佇列模式的模型圖:
官方的HelloWorld是基於最基礎的訊息佇列模型來實現的,只包括三個角色:
- publisher:訊息釋出者,將訊息傳送到佇列queue
- queue:訊息佇列,負責接受並快取訊息
- consumer:訂閱佇列,處理佇列中的訊息
下面通過兩個模組進行收發訊息的模擬
工程目錄
1 父工程中匯入 依賴
<dependencies>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!--AMQP依賴,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--單元測試-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
</dependencies>
2.4.1.publisher實現
思路:
- 建立連線
- 建立Channel
- 宣告佇列
- 傳送訊息
- 關閉連線和channel
建立測試類
package cn.mq;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class PublisherTest {
@Test
public void testSendMessage() throws IOException, TimeoutException {
// 1.建立連線
ConnectionFactory factory = new ConnectionFactory();
// 1.1.設定連線引數,分別是:主機名、埠號、vhost、使用者名稱、密碼
factory.setHost("xxxxxx"); //ip
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("hgy");
factory.setPassword("123");
// 1.2.建立連線
Connection connection = factory.newConnection();
// 2.建立通道Channel
Channel channel = connection.createChannel();
// 3.建立佇列
String queueName = "simple.queue";
channel.queueDeclare(queueName, false, false, false, null);
// 4.傳送訊息
String message = "hello, rabbitmq!";
channel.basicPublish("", queueName, null, message.getBytes());
System.out.println("傳送訊息成功:【" + message + "】");
// 5.關閉通道和連線
channel.close();
connection.close();
}
}
結果
2.4.2.consumer實現
程式碼思路:
- 建立連線
- 建立Channel
- 宣告佇列
- 訂閱訊息
建立測試類
package cn.itcast.mq;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class consumer {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.建立連線
ConnectionFactory factory = new ConnectionFactory();
// 1.1.設定連線引數,分別是:主機名、埠號、vhost、使用者名稱、密碼
factory.setHost("xxxxx"); //ip
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("hgy");
factory.setPassword("123");
// 1.2.建立連線
Connection connection = factory.newConnection();
// 2.建立通道Channel
Channel channel = connection.createChannel();
// 3.建立佇列
String queueName = "simple.queue";
channel.queueDeclare(queueName, false, false, false, null);
// 4.訂閱訊息
channel.basicConsume(queueName, true, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
// 5.處理訊息
String message = new String(body);
System.out.println("接收到訊息:【" + message + "】");
}
});
System.out.println("等待接收訊息。。。。");
}
}
2.5.總結
基本訊息佇列的訊息傳送流程:
-
建立connection
-
建立channel
-
利用channel宣告佇列
-
利用channel向佇列傳送訊息
基本訊息佇列的訊息接收流程:
-
建立connection
-
建立channel
-
利用channel宣告佇列
-
定義consumer的消費行為handleDelivery()
-
利用channel將消費者與佇列繫結