RabbitMQ學習(四)——訊息分發機制
在前面的一篇博文中,我們對RabbitMQ中的交換機有了大致的瞭解,同時結合Spring Boot的例項,讓我們對RabbitMQ的用法有了更清晰的認識。如果忘記了可以去複習一下,RabbitMQ學習(三)——探索交換機(Exchange),結合SpringBoot實戰。
今天我們將要對RabbitMQ的訊息機制進行更詳細的探究,在上一篇文章中其實也涉及到了訊息機制的問題。只是沒有深入探究,今天我們將要開始詳細瞭解該機制的特點。
我們知道每個訊息處理的時間是不同的,換句話說訊息的複雜度是不同的,有些訊息很複雜需要很久的時間,有些訊息很簡單,只需要耗時一會兒就可以完成。而在實際情況下如何考慮分配資源,讓效率達到最大化,從而實現按能力分配任務,達到物盡其用。這就需要了解訊息的分發機制。
在這裡我們使用Thread.Sleep()方法來模擬耗時,時間越久,任務就越複雜。
這裡我們結合前面的例子,在spring boot的程式碼架構下使用RabbitMQ來探究一下,分發機制。首先我們結合前面第三節文章的例子,在sender和receiver下新建DistributionSender.java和DistributionReceiver.java來模擬傳送者和接收者。
DistributionSender.java:
@Component
public class DistributionSender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send(int i) {
// 傳送的訊息
String message = "This is a task, and the complexity is " + i + "。" + StringUtils.repeat(".", i);
this.rabbitTemplate.convertAndSend("distribu", message);
}
}
這裡採用的是預設交換機,佇列為“distribu”。需要在config包下面的RabbitConfig.java裡面新增如下佇列:
/**
* 申明distribu佇列
*
* @return
*/
@Bean
public Queue DistribuQueue() {
return new Queue("distribu");
}
DistributionReceiver.java:
@Component
public class DistributionReceiver {
/**
* 消費者A
*
* @param msg
*/
@SuppressWarnings("deprecation")
@RabbitListener(queues = "distribu")
public void processA(Message message) {
String msg = new String(message.getBody());
System.out.println(" DistributionReceiverA : " + msg);
SimpleDateFormat time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSSS");
System.out.println(" ProccessingA... at " + time.format(new Date()));
try {
for (char ch : msg.toCharArray()) {
if (ch == '.') {
doWork(1000);
}
}
} catch (InterruptedException e) {
} finally {
System.out.println(" A Done! at " + time.format(new Date()));
}
}
private void doWork(long time) throws InterruptedException {
Thread.sleep(time);
}
}
然後在controller包的RabbitTest.java檔案裡新增Restful介面,模擬傳送請求。
@Autowired
private DistributionSender distributionSender;
/**
* 分發機制訊息傳送測試
*/
@GetMapping("/distribu")
public void distribu() {
distributionSender.send(3);
}
DistributionReceiverA : This is a task, and the complexity is 3。...
ProccessingA... at 2018-05-23 21:29:18:0628
A Done! at 2018-05-23 21:29:21:0639
從列印的資訊可以看出這裡就模擬了完成任務需要3秒鐘的時間任務實現。
下面我們更改傳送的訊息數量,在controller控制器裡面進行更改,如下:
/**
* 分發機制訊息傳送測試
*/
@GetMapping("/distribu")
public void distribu() {
for (int i = 0; i < 5; i++) {
//傳送任務複雜度都為1的訊息
distributionSender.send(1);
}
}
模擬傳送5條訊息,並且每條傳送的訊息的複雜度都是相同的,複雜度都為1。
然後再在receiver包中DistributionReceiver.java新增一個消費者B,如下:
/**
* 消費者B
*
* @param msg
*/
@SuppressWarnings("deprecation")
@RabbitListener(queues = "distribu")
public void processB(Message message) {
String msg = new String(message.getBody());
System.out.println(" DistributionReceiverB : " + msg);
SimpleDateFormat time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSSS");
System.out.println(" ProccessingB... at " + time.format(new Date()));
try {
for (char ch : msg.toCharArray()) {
if (ch == '.') {
doWork(1000);
}
}
} catch (InterruptedException e) {
} finally {
System.out.println(" B Done! at " + time.format(new Date()));
}
}
再次執行程式,訪問介面,結果如下:
DistributionReceiverA : This is a task, and the complexity is 1。.
ProccessingA... at 2018-05-22 23:23:43:0014
DistributionReceiverB : This is a task, and the complexity is 1。.
ProccessingB... at 2018-05-22 23:23:43:0014
A Done! at 2018-05-22 23:23:44:0017
B Done! at 2018-05-22 23:23:44:0017
DistributionReceiverA : This is a task, and the complexity is 1。.
DistributionReceiverB : This is a task, and the complexity is 1。.
ProccessingA... at 2018-05-22 23:23:44:0093
ProccessingB... at 2018-05-22 23:23:44:0093
A Done! at 2018-05-22 23:23:45:0095
B Done! at 2018-05-22 23:23:45:0095
DistributionReceiverB : This is a task, and the complexity is 1。.
ProccessingB... at 2018-05-22 23:23:45:0143
B Done! at 2018-05-22 23:23:46:0148
在訊息相同,A、B處理能力一樣情況下,我們可以發現A、B幾乎是同時處理訊息,訊息傳送順序為A->B->A->B->B。可以看出這裡並沒有實現A與B平均輪詢的情況,在最後的情況B執行了兩次。
接著現在我們把A處理能力更改為每個點要Thread.sleep(4000), B為Thread.sleep(1000),就是B的處理能力是A的四倍。執行一下,我們看一下列印的結果:
DistributionReceiverB : This is a task, and the complexity is 1。.
ProccessingB... at 2018-05-22 23:24:48:0623
DistributionReceiverA : This is a task, and the complexity is 1。.
ProccessingA... at 2018-05-22 23:24:48:0623
B Done! at 2018-05-22 23:24:49:0624
DistributionReceiverB : This is a task, and the complexity is 1。.
ProccessingB... at 2018-05-22 23:24:49:0663
B Done! at 2018-05-22 23:24:50:0664
DistributionReceiverB : This is a task, and the complexity is 1。.
ProccessingB... at 2018-05-22 23:24:50:0704
B Done! at 2018-05-22 23:24:51:0709
DistributionReceiverB : This is a task, and the complexity is 1。.
ProccessingB... at 2018-05-22 23:24:51:0748
A Done! at 2018-05-22 23:24:52:0629
B Done! at 2018-05-22 23:24:52:0749
現在我們可以清晰地看到在這裡B處理了4條訊息,而A只處理了1條訊息。這裡就是按公平分發的機制來發送訊息的,即按消費者處理能力來分發訊息。
注意現在重點來了,RabbitMQ的分發機制
1、Round-robin dispatch(輪詢分發)
這個是RabbitMQ預設的訊息分發機制,使用任務佇列的優點之一就是可以輕易的並行工作。如果我們有很多要分發的訊息,可以通過增加工作者(消費者)來解決這種狀況,使得系統的伸縮性更加容易擴充套件。
在預設情況下,RabbitMQ不會顧慮訊息者處理訊息的能力,即使其中有的消費者閒置有的消費者高負荷。RabbitMQ會逐個傳送訊息到在序列中的下一個消費者(而不考慮每個任務的時長等等,且是提前一次性分配,並非一個一個分配)。平均每個消費者獲得相同數量的訊息,這種方式分發訊息機制稱為Round-Robin(輪詢)。
2、Fair dispatch (公平分發)
而公平分發,則是根據消費者的處理能力來進行分發處理的。這裡主要是通過設定prefetchCount 引數來實現的。這樣RabbitMQ就會使得每個Consumer在同一個時間點最多處理規定的數量級個數的Message。換句話說,在接收到該Consumer的ack前,它不會將新的Message分發給它。 比如prefetchCount=1,則在同一時間下,每個Consumer在同一個時間點最多處理1個Message,同時在收到Consumer的ack前,它不會將新的Message分發給它。
注意:使用公平分發,必須關閉自動應答,改為手動應答。
說完了概念,我們再來思考一下,前面我們的例項。在使用Spring Boot結合RabbitMQ時,我們並沒有手動去應答,那麼這為啥是採用的公平分發機制?
這個是因為Spring Boot封裝的RabbitMQ方法,預設ACK機制是使用手工應答機制,當@RabbitListener修飾的方法被呼叫且沒有丟擲異常時, Spring Boot會為我們自動應答。
我們可以在@RabbitListener原始碼的註解裡看到,
如果沒有指定containerFactory,將採用預設的containerFactory。然後我們在RabbitListenerContainerFactory中檢視到這個介面與MessageListenerContainer有關,
接著檢視MessageListenerContainer,這是一個介面,我們檢視實現了該介面的類,在原始碼包了提供了一個SimpleMessageListenerContainer的類,在裡面我們找到了DEFAULT_PREFETCH_COUNT,這下就清晰明瞭了。
預設情況下,Spring Boot中的RabbitMQ採用手動確認機制,只要如果不是程式設計師程式設計實現應答,框架就會為我們自動去確認。並且prefetchCount=1,這下就可以解釋為啥上面的例項出現的結果了。
下面我們再通過原生的RabbitMQ客戶端java程式碼來講解一下訊息分發機制的情況:
輪詢分發
還是在上面的例項裡,我們在sender包下,新建檔案DistributionSender2.java,程式碼如下:
DistributionSender2.java :
package net.anumbrella.rabbitmq.sender;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import org.apache.commons.lang.StringUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class DistributionSender2 {
private final static String QUEUE_NAME = "test";
public static void main(String[] args) throws IOException, TimeoutException {
/**
* 建立連線連線到MabbitMQ
*/
ConnectionFactory factory = new ConnectionFactory();
// 設定MabbitMQ所在主機ip或者主機名
factory.setUsername("guest");
factory.setPassword("guest");
factory.setHost("127.0.0.1");
factory.setVirtualHost("/");
factory.setPort(5672);
// 建立一個連線
Connection connection = factory.newConnection();
// 建立一個頻道
Channel channel = connection.createChannel();
// 指定一個佇列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
for (int i = 0; i < 8; i++) {
// 傳送的訊息
String message = "This is a task, and the complexity is " + i + "。" + StringUtils.repeat(".", i);
// 往佇列中發出一條訊息
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" DistributionSender2 Sent '" + message + "'");
}
// 關閉頻道和連線
channel.close();
connection.close();
}
}
模擬傳送8條訊息,然後再receiver包下,新建DistributionReceiver2.java和DistributionReceiver3.java,程式碼如下:
DistributionReceiver2.java:
package net.anumbrella.rabbitmq.receiver;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
public class DistributionReceiver2 {
private final static String QUEUE_NAME = "test";
public static void main(String[] argv) throws IOException, InterruptedException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("guest");
factory.setPassword("guest");
factory.setHost("127.0.0.1");
factory.setVirtualHost("/");
factory.setPort(5672);
// 開啟連線和建立頻道,與傳送端一樣
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 宣告佇列,主要為了防止訊息接收者先執行此程式,佇列還不存在時建立佇列。
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println("Receiver2 waiting for messages. To exit press CTRL+C");
// 建立佇列消費者
final Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" DistributionReceiver2 : " + message);
SimpleDateFormat time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSSS");
System.out.println(" Proccessing2... at " + time.format(new Date()));
try {
for (char ch: message.toCharArray()) {
if (ch == '.') {
doWork(1000);
}
}
} catch (InterruptedException e) {
} finally {
System.out.println(" DistributionReceiver2 Done! at " +time.format(new Date()));
}
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
}
private static void doWork(long time) throws InterruptedException {
Thread.sleep(time);
}
}
DistributionReceiver3.java:
package net.anumbrella.rabbitmq.receiver;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
public class DistributionReceiver3 {
private final static String QUEUE_NAME = "test";
public static void main(String[] argv) throws IOException, InterruptedException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("guest");
factory.setPassword("guest");
factory.setHost("127.0.0.1");
factory.setVirtualHost("/");
factory.setPort(5672);
// 開啟連線和建立頻道,與傳送端一樣
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 宣告佇列,主要為了防止訊息接收者先執行此程式,佇列還不存在時建立佇列。
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println("Receiver3 waiting for messages. To exit press CTRL+C");
// 建立佇列消費者
final Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" DistributionReceiver3 : " + message);
SimpleDateFormat time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSSS");
System.out.println(" Proccessing3... at " + time.format(new Date()));
try {
for (char ch: message.toCharArray()) {
if (ch == '.') {
doWork(4000);
}
}
} catch (InterruptedException e) {
} finally {
System.out.println(" DistributionReceiver3 Done! at " +time.format(new Date()));
}
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
}
private static void doWork(long time) throws InterruptedException {
Thread.sleep(time);
}
}
我們可以看到DistributionReceiver2是DistributionReceiver3處理訊息的4倍。
現在我們先啟動DistributionReceiver2.java與DistributionReceiver3.java監聽訊息,然後再啟動DistributionSender2.java傳送訊息,可以檢視結果如下:
傳送訊息:
DistributionSender2 Sent 'This is a task, and the complexity is 0。'
DistributionSender2 Sent 'This is a task, and the complexity is 1。.'
DistributionSender2 Sent 'This is a task, and the complexity is 2。..'
DistributionSender2 Sent 'This is a task, and the complexity is 3。...'
DistributionSender2 Sent 'This is a task, and the complexity is 4。....'
DistributionSender2 Sent 'This is a task, and the complexity is 5。.....'
DistributionSender2 Sent 'This is a task, and the complexity is 6。......'
DistributionSender2 Sent 'This is a task, and the complexity is 7。.......'
DistributionReceiver2.java收到的訊息:
Receiver2 waiting for messages. To exit press CTRL+C
DistributionReceiver2 : This is a task, and the complexity is 0。
Proccessing2... at 2018-05-23 23:12:50:0874
DistributionReceiver2 Done! at 2018-05-23 23:12:50:0876
DistributionReceiver2 : This is a task, and the complexity is 2。..
Proccessing2... at 2018-05-23 23:12:50:0876
DistributionReceiver2 Done! at 2018-05-23 23:12:52:0880
DistributionReceiver2 : This is a task, and the complexity is 4。....
Proccessing2... at 2018-05-23 23:12:52:0880
DistributionReceiver2 Done! at 2018-05-23 23:12:56:0890
DistributionReceiver2 : This is a task, and the complexity is 6。......
Proccessing2... at 2018-05-23 23:12:56:0890
DistributionReceiver2 Done! at 2018-05-23 23:13:02:0910
DistributionReceiver3.java收到的訊息:
Receiver3 waiting for messages. To exit press CTRL+C
DistributionReceiver3 : This is a task, and the complexity is 1。.
Proccessing3... at 2018-05-23 23:12:50:0879
DistributionReceiver3 Done! at 2018-05-23 23:12:54:0884
DistributionReceiver3 : This is a task, and the complexity is 3。...
Proccessing3... at 2018-05-23 23:12:54:0885
DistributionReceiver3 Done! at 2018-05-23 23:13:06:0887
DistributionReceiver3 : This is a task, and the complexity is 5。.....
Proccessing3... at 2018-05-23 23:13:06:0888
DistributionReceiver3 Done! at 2018-05-23 23:13:26:0903
DistributionReceiver3 : This is a task, and the complexity is 7。.......
Proccessing3... at 2018-05-23 23:13:26:0904
DistributionReceiver3 Done! at 2018-05-23 23:13:54:0921
我們知道DistributionReceiver2處理速度是DistributionReceiver3處理速度的4倍,但是結果兩者收到的訊息數是相同的。
DistributionReceiver2:0,2,4,6
DistributionReceiver3:1,3,5,7
這裡就是採用的輪詢分發,與前面講解的剛剛符合。
公平分發
為了使用公平分發,我們需要通過basicQos( prefetchCount = 1)方法,來設定prefetchCount引數,同時更改確認方式為手動確認。
int prefetchCount = 1;
channel.basicQos(prefetchCount);
同理我們在sender包下新建DistributionSender3.java,程式碼如下:
package net.anumbrella.rabbitmq.sender;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import org.apache.commons.lang.StringUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class DistributionSender3 {
private final static String QUEUE_NAME = "test";
public static void main(String[] args) throws IOException, TimeoutException {
/**
* 建立連線連線到MabbitMQ
*/
ConnectionFactory factory = new ConnectionFactory();
// 設定MabbitMQ所在主機ip或者主機名
factory.setUsername("guest");
factory.setPassword("guest");
factory.setHost("127.0.0.1");
factory.setVirtualHost("/");
factory.setPort(5672);
// 建立一個連線
Connection connection = factory.newConnection();
// 建立一個頻道
Channel channel = connection.createChannel();
// 指定一個佇列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
int prefetchCount = 1;
// 限制發給同一個消費者不得超過1條訊息
channel.basicQos(prefetchCount);
for (int i = 0; i < 8; i++) {
// 傳送的訊息
String message = "This is a task, and the complexity is " + i + "。" + StringUtils.repeat(".", 1);
// 往佇列中發出一條訊息
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" DistributionSender3 Sent '" + message + "'");
}
// 關閉頻道和連線
channel.close();
connection.close();
}
}
更改為手動確認需要新增如下程式碼,basicAck 方法的第二個引數 multiple 取值為 false 時,表示通知 RabbitMQ 當前訊息被確認;如果為 true,則額外將比第一個引數指定的 delivery tag 小的訊息一併確認。(在RabbitMQ的每個通道中,每條訊息的 Delivery Tag 從 1 開始遞增,這裡的批量處理是指同一通道中的訊息):
channel.basicAck(envelope.getDeliveryTag(), false);
通過更改auto為false,即更改自動確認為false。如下:
channel.basicConsume(QUEUE_NAME, false, consumer);
DistributionReceiver4.java:
package net.anumbrella.rabbitmq.receiver;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
public class DistributionReceiver4 {
private final static String QUEUE_NAME = "test";
public static void main(String[] argv) throws IOException, InterruptedException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("guest");
factory.setPassword("guest");
factory.setHost("127.0.0.1");
factory.setVirtualHost("/");
factory.setPort(5672);
// 開啟連線和建立頻道,與傳送端一樣
Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
// 宣告佇列,主要為了防止訊息接收者先執行此程式,佇列還不存在時建立佇列。
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println("Receiver4 waiting for messages. To exit press CTRL+C");
//保證一次只分發一個
channel.basicQos(1);
// 建立佇列消費者
final Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" DistributionReceiver4 : " + message);
SimpleDateFormat time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSSS");
System.out.println(" Proccessing4... at " + time.format(new Date()));
try {
for (char ch: message.toCharArray()) {
if (ch == '.') {
doWork(1000);
}
}
} catch (InterruptedException e) {
} finally {
System.out.println(" DistributionReceiver4 Done! at " +time.format(new Date()));
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
channel.basicConsume(QUEUE_NAME, false, consumer);
}
private static void doWork(long time) throws InterruptedException {
Thread.sleep(time);
}
}
DistributionReceiver5.java:
package net.anumbrella.rabbitmq.receiver;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
public class DistributionReceiver5 {
private final static String QUEUE_NAME = "test";
public static void main(String[] argv) throws IOException, InterruptedException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("guest");
factory.setPassword("guest");
factory.setHost("127.0.0.1");
factory.setVirtualHost("/");
factory.setPort(5672);
// 開啟連線和建立頻道,與傳送端一樣
Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
// 宣告佇列,主要為了防止訊息接收者先執行此程式,佇列還不存在時建立佇列。
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out