RabbitMQ從入門到精通(三)
目錄
- 1. 自定義消費者使用
- 自定義消費端演示
- 2.消費端的限流策略
- 2.1 限流的場景與機制
- 2.2 限流相關API
- 2.3 限流演示
- 3. 消費端ACK與重回佇列機制
- 3.1 ACK與NACK
- 3.2 重回佇列演示
- 4. TTL
- TTL演示
- 5.死信佇列
- 死信佇列演示
1. 自定義消費者使用
- 我們之前呢都是在程式碼中編寫while迴圈,進行
consumer.nextDelivery
方法進行獲取下一條訊息,然後進行消費處理! - 其實我們還可以使用自定義的Consumer,它更加的方便,解耦性更加的強,也是在實際工作中最常用的使用方式!
- 自定義消費端實現只需要繼承
DefaultConsumer
類,重寫handleDelivery
方法即可
自定義消費端演示
public class Producer { public static void main(String[] args) throws Exception { //1 建立ConnectionFactory ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.244.11"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setHandshakeTimeout(20000); //2 獲取Connection Connection connection = connectionFactory.newConnection(); //3 通過Connection建立一個新的Channel Channel channel = connection.createChannel(); String exchange = "test_consumer_exchange"; String routingKey = "consumer.save"; String msg = "Hello RabbitMQ Consumer Message"; //4 傳送訊息 for(int i =0; i<5; i ++){ channel.basicPublish(exchange, routingKey, true, null, msg.getBytes()); } } }
public class MyConsumer extends DefaultConsumer { public MyConsumer(Channel channel) { super(channel); } @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //consumerTag: 內部生成的消費標籤 properties: 訊息屬性 body: 訊息內容 System.err.println("-----------consume message----------"); System.err.println("consumerTag: " + consumerTag); //envelope包含屬性:deliveryTag(標籤), redeliver, exchange, routingKey //redeliver是一個標記,如果設為true,表示訊息之前可能已經投遞過了,現在是重新投遞訊息到監聽佇列的消費者 System.err.println("envelope: " + envelope); System.err.println("properties: " + properties); System.err.println("body: " + new String(body)); } }
public class Consumer {
public static void main(String[] args) throws Exception {
//1 建立ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.244.11");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setHandshakeTimeout(20000);
//2 獲取Connection
Connection connection = connectionFactory.newConnection();
//3 通過Connection建立一個新的Channel
Channel channel = connection.createChannel();
String exchangeName = "test_consumer_exchange";
String routingKey = "consumer.#";
String queueName = "test_consumer_queue";
//4 宣告交換機和佇列,然後進行繫結設定路由Key
channel.exchangeDeclare(exchangeName, "topic", true, false, null);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
//5 設定channel,使用自定義消費者
channel.basicConsume(queueName, true, new MyConsumer(channel));
}
}
執行說明
先啟動消費端,訪問管控臺:http://ip:15672,檢查Exchange和Queue是否設定OK,然後啟動生產端。消費端列印內容如下
2.消費端的限流策略
2.1 限流的場景與機制
- 假設一個場景,我們Rabbitmq伺服器有上萬條未處理的訊息,我們隨便開啟一個消費者客戶端,會出現這種情況:巨量的訊息瞬間全部推送過來,但是我們單個客戶端無法同時處理這麼多資料!此時很有可能導致伺服器崩潰,嚴重的可能導致線上的故障。
- 除了這種場景,還有一些其他的場景,比如說單個生產者一分鐘生產出了幾百條資料,但是單個消費者一分鐘可能只能處理60條資料,這個時候生產端和消費端肯定是不平衡的。通常生產端是沒辦法做限制的。所以消費端肯定需要做一些限流措施,否則如果超出最大負載,可能導致消費端效能下降,伺服器卡頓甚至崩潰等一系列嚴重後果。
消費端限流機制
RabbitMQ提供了一種qos
(服務質量保證)功能,即在非自動確認訊息的前提下,如果一定數目的訊息 (通過基於consume或者channel設定Qos的值) 未被確認前,不進行消費新的訊息。
需要注意:
1.不能設定自動簽收功能(autoAck = false)
2.如果訊息沒被確認,就不會到達消費端,目的就是給消費端減壓
2.2 限流相關API
限流設定 - BasicQos()
void BasicQos(uint prefetchSize, ushort prefetchCount, bool global);
prefetchSize:
單條訊息的大小限制,消費端通常設定為0,表示不做限制
prefetchCount:
一次最多能處理多少條訊息,通常設定為1
global:
是否將上面設定應用於channel,false代表consumer級別
注意事項
prefetchSize
和global
這兩項,rabbitmq沒有實現,暫且不研究
prefetchCount
在 autoAck=false
的情況下生效,即在自動應答的情況下這個值是不生效的
手工ACK - basicAck()
void basicAck(Integer deliveryTag,boolean multiple)
手工ACK,呼叫這個方法就會主動回送給Broker一個應答,表示這條訊息我處理完了,你可以給我下一條了。引數multiple
表示是否批量簽收,由於我們是一次處理一條訊息,所以設定為false
2.3 限流演示
生產端
生產端就是正常的邏輯
public class Producer {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.244.11");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setHandshakeTimeout(20000);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String exchange = "test_qos_exchange";
String routingKey = "qos.save";
String msg = "Hello RabbitMQ QOS Message";
// 傳送訊息
for (int i = 0; i < 5; i++) {
channel.basicPublish(exchange, routingKey, true, null,
msg.getBytes());
}
}
}
自定義消費者
為了看到限流效果,這裡不進行ACK
public class MyConsumer extends DefaultConsumer {
//接收channel
private Channel channel ;
public MyConsumer(Channel channel) {
super(channel);
this.channel = channel;
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.err.println("-----------consume message----------");
System.err.println("consumerTag: " + consumerTag);
System.err.println("envelope: " + envelope);
//System.err.println("properties: " + properties);
System.err.println("body: " + new String(body));
//手工ACK,引數multiple表示不批量簽收
//channel.basicAck(envelope.getDeliveryTag(), false);
}
}
消費端
關閉autoACK,進行限流設定
public class Consumer {
public static void main(String[] args) throws Exception {
//1 建立ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.244.11");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setHandshakeTimeout(20000);
//2 獲取Connection
Connection connection = connectionFactory.newConnection();
//3 通過Connection建立一個新的Channel
Channel channel = connection.createChannel();
String exchangeName = "test_qos_exchange";
String queueName = "test_qos_queue";
String routingKey = "qos.#";
//4 宣告交換機和佇列,然後進行繫結設定路由Key
channel.exchangeDeclare(exchangeName, "topic", true, false, null);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
//進行引數設定:單條訊息的大小限制,一次最多能處理多少條訊息,是否將上面設定應用於channel
channel.basicQos(0, 1, false);
//限流: autoAck設定為 false
channel.basicConsume(queueName, false, new MyConsumer(channel));
}
}
執行說明
我們先註釋掉手工ACK方法,然後啟動消費端和生產端,此時消費端只打印了一條訊息
這是因為我們設定了手工簽收,並且設定了一次只處理一條訊息,當我們沒有回送ack應答時,Broker端就認為消費端還沒有處理完這條訊息,基於這種限流機制就不會給消費端傳送新的訊息了,所以消費端只打印了一條訊息。
通過管控臺也可以看到佇列總共收到了5條訊息,有一條訊息沒有ack。
將手工簽收程式碼取消註釋,再次執行消費端,此時就會列印5條訊息的內容。
3. 消費端ACK與重回佇列機制
3.1 ACK與NACK
當我們設定 autoACK=false
時,就可以使用手工ACK方式了,那麼其實手工方式包括了手工ACK與NACK。
當我們手工 ACK
時,會發送給Broker一個應答,代表訊息成功處理了,Broker就可以回送響應給生產端了。NACK
則表示訊息處理失敗了,如果設定重回佇列,Broker端就會將沒有成功處理的訊息重新發送。
使用方式
- 消費端進行消費的時候,如果由於業務異常我們可以手工
NACK
並進行日誌的記錄,然後進行補償!
方法:void basicNack(long deliveryTag, boolean multiple, boolean requeue)
- 如果由於伺服器宕機等嚴重問題,那我們就需要手工進行
ACK
保障消費端消費成功!
方法:void basicAck(long deliveryTag, boolean multiple)
3.2 重回佇列演示
- 消費端重回佇列是為了對沒有處理成功的訊息,把訊息重新會遞給Broker!
- 重回佇列,會把消費失敗的訊息重新新增到佇列的尾端,供消費者繼續消費。
- 一般我們在實際應用中,都會關閉重回佇列,也就是設定為false
生產端
對訊息設定自定義屬性以便進行區分
public class Producer {
public static void main(String[] args) throws Exception {
//1 建立ConnectionFactorys
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.244.11");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setHandshakeTimeout(20000);
//2 獲取Connection
Connection connection = connectionFactory.newConnection();
//3 通過Connection建立一個新的Channel
Channel channel = connection.createChannel();
String exchange = "test_ack_exchange";
String routingKey = "ack.save";
for(int i =0; i<5; i ++){
//設定訊息屬性
Map<String, Object> headers = new HashMap<String, Object>();
headers.put("num", i);
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.deliveryMode(2)
.contentEncoding("UTF-8")
.headers(headers)
.build();
//傳送訊息
String msg = "Hello RabbitMQ ACK Message " + i;
channel.basicPublish(exchange, routingKey, true, properties, msg.getBytes());
}
}
}
自定義消費
對第一條訊息進行NACK,並設定重回佇列
public class MyConsumer extends DefaultConsumer {
private Channel channel ;
public MyConsumer(Channel channel) {
super(channel);
this.channel = channel;
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.err.println("-----------consume message----------");
System.err.println("body: " + new String(body));
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
if((Integer)properties.getHeaders().get("num") == 0) {
//NACK,引數三requeue:是否重回佇列
channel.basicNack(envelope.getDeliveryTag(), false, true);
} else {
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
}
消費端
關閉自動簽收功能
public class Consumer {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.244.11");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setHandshakeTimeout(20000);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_ack_exchange";
String queueName = "test_ack_queue";
String routingKey = "ack.#";
//宣告交換機和佇列,然後進行繫結設定路由Key
channel.exchangeDeclare(exchangeName, "topic", true, false, null);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
//手工簽收 必須要設定 autoAck = false
channel.basicConsume(queueName, false, new MyConsumer(channel));
}
}
執行說明
先啟動消費端,然後啟動生產端,消費端列印如下,顯然第一條訊息由於我們呼叫了NACK,並且設定了重回佇列,所以會導致該條訊息一直重複傳送,消費端就會一直迴圈消費。
一般工作中不會設定重回佇列這個屬性,都是自己去做補償或者投遞到延遲佇列裡的,然後指定時間去處理即可。
4. TTL
TTL說明
- TTL是
Time To Live
的縮寫,也就是生存時間 - RabbitMQ支援訊息的過期時間,在訊息傳送時可以進行指定
- RabbitMQ支援為每個佇列設定訊息的超時時間,從訊息入佇列開始計算,只要超過了佇列的超時時間配置,那麼訊息會自動的清除
TTL演示
這次演示我們不寫程式碼,只通過管控臺進行操作,實際測試也會更為方便一些。
1. 建立Exchange
選擇Exchange選單,找到下面的Add a new exchange
2.建立Queue
選擇Queue選單,找到下面的Add a new queue
3.建立佇列和交換機的繫結關係
點選Exchange表格中的test002_exchange
,在下面新增繫結規則
4.傳送訊息
點選Exchange表格中的test002_exchange
,在下面找到Publish message
,設定訊息進行傳送
5.驗證
點選Queue選單,查看錶格中test002已經有了一條訊息,10秒後表格顯示0條,說明過期時間到了訊息被自動清除了。
6.設定單條訊息過期時間
點選Exchange表格中的test002_exchange
,在下面找到Publish message
,設定訊息的過期時間並進行傳送,此時觀察test002佇列,發現訊息5s後就過期被清除了,即使佇列設定的過期時間是10s。
TTL程式碼設定過期時間
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.expiration("10000") //10s過期
.build();
//傳送訊息
channel.basicPublish(exchange, routingKey, true, properties, msg.getBytes());
佇列過期時間設定
//設定佇列的過期時間10s
Map<String,Object> param = new HashMap<>();
param.put("x-message-ttl", 10000);
//宣告佇列
channel.queueDeclare(queueName, true, false, false, null);
注意事項
- 兩者的區別是設定佇列的過期時間是對該佇列的所有訊息生效的。
- 為訊息設定TTL有一個問題:RabbitMQ只對處於隊頭的訊息判斷是否過期(即不會掃描佇列),所以,很可能佇列中已存在死訊息,但是佇列並不知情。這會影響佇列統計資料的正確性,妨礙佇列及時釋放資源。
5.死信佇列
死信佇列介紹
- 死信佇列:DLX,
dead-letter-exchange
- 利用DLX,當訊息在一個佇列中變成死信
(dead message)
之後,它能被重新publish到另一個Exchange,這個Exchange就是DLX
訊息變成死信有以下幾種情況
- 訊息被拒絕(basic.reject / basic.nack),並且requeue = false
- 訊息TTL過期
- 佇列達到最大長度
死信處理過程
- DLX也是一個正常的Exchange,和一般的Exchange沒有區別,它能在任何的佇列上被指定,實際上就是設定某個佇列的屬性。
- 當這個佇列中有死信時,RabbitMQ就會自動的將這個訊息重新發布到設定的Exchange上去,進而被路由到另一個佇列。
- 可以監聽這個佇列中的訊息做相應的處理。
死信佇列設定
- 首先需要設定死信佇列的exchange和queue,然後進行繫結:
- 然後需要有一個監聽,去監聽這個佇列進行處理
- 然後我們進行正常宣告交換機、佇列、繫結,只不過我們需要在佇列加上一個引數即可:
arguments.put(" x-dead-letter-exchange","dlx.exchange");
,這樣訊息在過期、requeue、 佇列在達到最大長度時,訊息就可以直接路由到死信佇列!
死信佇列演示
生產端
public class Producer {
public static void main(String[] args) throws Exception {
//1 建立ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.244.11");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setHandshakeTimeout(20000);
//2 獲取Connection
Connection connection = connectionFactory.newConnection();
//3 通過Connection建立一個新的Channel
Channel channel = connection.createChannel();
String exchange = "test_dlx_exchange";
String routingKey = "dlx.save";
String msg = "Hello RabbitMQ DLX Message";
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.deliveryMode(2)
.contentEncoding("UTF-8")
.expiration("10000")
.build();
//傳送訊息
channel.basicPublish(exchange, routingKey, true, properties, msg.getBytes());
}
}
自定義消費者
public class MyConsumer extends DefaultConsumer {
public MyConsumer(Channel channel) {
super(channel);
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.err.println("-----------consume message----------");
System.err.println("consumerTag: " + consumerTag);
System.err.println("envelope: " + envelope);
System.err.println("properties: " + properties);
System.err.println("body: " + new String(body));
}
}
消費端
- 宣告正常處理訊息的交換機、佇列及繫結規則
- 在正常交換機上指定死信傳送的Exchange
- 宣告死信交換機、佇列及繫結規則
- 監聽死信佇列,進行後續處理,這裡省略
public class Consumer {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.244.11");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setHandshakeTimeout(20000);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
// 宣告一個普通的交換機 和 佇列 以及路由
String exchangeName = "test_dlx_exchange";
String routingKey = "dlx.#";
String queueName = "test_dlx_queue";
String deadQueueName = "dlx.queue";
channel.exchangeDeclare(exchangeName, "topic", true, false, null);
// 指定死信傳送的Exchange
Map<String, Object> agruments = new HashMap<String, Object>();
agruments.put("x-dead-letter-exchange", "dlx.exchange");
// 這個agruments屬性,要設定到宣告佇列上
channel.queueDeclare(queueName, true, false, false, agruments);
channel.queueBind(queueName, exchangeName, routingKey);
// 要進行死信佇列的宣告
channel.exchangeDeclare("dlx.exchange", "topic", true, false, null);
channel.queueDeclare(deadQueueName, true, false, false, null);
channel.queueBind(deadQueueName, "dlx.exchange", "#");
channel.basicConsume(queueName, true, new MyConsumer(channel));
//channel.basicConsume(deadQueueName, true, new MyConsumer(channel));
}
}
執行說明
啟動消費端,此時檢視管控臺,新增了兩個Exchange,兩個Queue。在test_dlx_queue
上我們設定了DLX,也就代表死信訊息會發送到指定的Exchange上,最終其實會路由到dlx.queue
上。
此時關閉消費端,然後啟動生產端,檢視管控臺佇列的訊息情況,test_dlx_queue
的值為1,而dlx_queue
的值為0。
10s後的佇列結果如圖,由於生產端傳送訊息時指定了訊息的過期時間為10s,而此時沒有消費端進行消費,訊息便被路由到死信佇列中。
實際環境我們還需要對死信佇列進行一個監聽和處理,當然具體的處理邏輯和業務相關,這裡只是簡單演示死信佇列是否生