延時訊息佇列
下面程式碼按需要填寫
@Bean
public Queue delayQueuePerMessageTTL() {
Map<String, Object> argument = new HashMap<>();
argument.put(“x-message-ttl”, 1000 * 5);//How long a message published to a queue can live before it is discarded (milliseconds).(Sets the “x-message-ttl” argument.)
argument.put(“x-expires”, 100060);//How long a queue can be unused for before it is automatically deleted (milliseconds).(Sets the “x-expires” argument.)
argument.put(“x-max-length”, 10000);//How many (ready) messages a queue can contain before it starts to drop them from its head.(Sets the “x-max-length” argument.)
argument.put(“x-max-length-bytes”, 1024
argument.put(“x-dead-letter-exchange”, “exchangeName”);//Optional name of an exchange to which messages will be republished if they are rejected or expire.(Sets the “x-dead-letter-exchange” argument.)
argument.put(“x-dead-letter-routing-key”, “routing-key”);//Optional replacement routing key to use when a message is dead-lettered. If this is not set, the message’s original routing key will be used.(Sets the “x-dead-letter-routing-key” argument.)
argument.put(“x-max-priority”, 100);//Maximum number of priority levels for the queue to support; if not set, the queue will not support message priorities.(Sets the “x-max-priority” argument.)
argument.put(“x-queue-mode”, “lazy”);//Set the queue into lazy mode, keeping as many messages as possible on disk to reduce RAM usage; if not set, the queue will keep an in-memory cache to deliver messages as fast as possible.(Sets the “x-queue-mode” argument.)
argument.put(“x-queue-master-locator”, “”);//Set the queue into master location mode, determining the rule by which the queue master is located when declared on a cluster of nodes.(Sets the “x-queue-master-locator” argument.)
return QueueBuilder.durable(“deladyQueues”).withArguments(argument).autoDelete().exclusive().build();
}
在實際的業務中我們會遇見生產者產生的訊息,不立即消費,而是延時一段時間在消費。RabbitMQ本身沒有直接支援延遲佇列功能,但是我們可以根據其特性Per-Queue Message TTL和 Dead Letter Exchanges實現延時佇列。也可以通過改特性設定訊息的優先順序。
1.Per-Queue Message TTL
RabbitMQ可以針對訊息和佇列設定TTL(過期時間)。佇列中的訊息過期時間(Time To Live, TTL)有兩種方法可以設定。第一種方法是通過佇列屬性設定,佇列中所有訊息都有相同的過期時間。第二種方法是對訊息進行單獨設定,每條訊息TTL可以不同。如果上述兩種方法同時使用,則訊息的過期時間以兩者之間TTL較小的那個數值為準。訊息在佇列的生存時間一旦超過設定的TTL值,就成為dead message,消費者將無法再收到該訊息。
2.Dead Letter Exchanges
當訊息在一個佇列中變成死信後,它能被重新publish到另一個Exchange。訊息變成Dead Letter一向有以下幾種情況:
訊息被拒絕(basic.reject or basic.nack)並且requeue=false
訊息TTL過期
佇列達到最大長度
實際上就是設定某個佇列的屬性,當這個佇列中有Dead Letter時,RabbitMQ就會自動的將這個訊息重新發布到設定的Exchange中去,進而被路由到另一個佇列,publish可以監聽這個佇列中訊息做相應的處理,這個特性可以彌補RabbitMQ 3.0.0以前支援的immediate引數中的向publish確認的功能。
雖然 consumer 從來看不到過期的 message ,但是在過期 message 到達 queue 的頭部時確實會被真正的丟棄(或者 dead-lettered )。當對每一個 queue 設定了 TTL 值時不會產生任何問題,因為過期的 message 總是會出現在 queue 的頭部。當對每一條 message 設定了 TTL 時,過期的 message 可能會排隊於未過期 message 的後面,直到這些訊息被 consume 到或者過期了。在這種情況下,這些過期的 message 使用的資源將不會被釋放,且會在 queue 統計資訊中被計算進去(例如,queue 中存在的 message 的數量)。對於第一種設定佇列TTL屬性的方法,一旦訊息過期,就會從佇列中抹去,而第二種方法裡,即使訊息過期,也不會馬上從佇列中抹去,因為每條訊息是否過期時在即將投遞到消費者之前判定的,為什麼兩者得處理方法不一致?因為第一種方法裡,佇列中已過期的訊息肯定在佇列頭部,RabbitMQ只要定期從隊頭開始掃描是否有過期訊息即可,而第二種方法裡,每條訊息的過期時間不同,如果要刪除所有過期訊息,勢必要掃描整個佇列,所以不如等到此訊息即將被消費時再判定是否過期,如果過期,再進行刪除。
一、在佇列上設定TTL
1.建立delay.exchange
這裡Internal設定為NO,否則將無法接受dead letter,YES表示這個exchange不可以被client用來推送訊息,僅用來進行exchange和exchange之間的繫結。
2.建立延時佇列(delay queue)
如上配置延時5min佇列(x-message-ttl=300000)
x-max-length:最大積壓的訊息個數,可以根據自己的實際情況設定,超過限制訊息不會丟失,會立即轉向delay.exchange進行投遞
x-dead-letter-exchange:設定為剛剛配置好的delay.exchange,訊息過期後會通過delay.exchange進行投遞
這裡不需要配置"dead letter routing key"否則會覆蓋掉訊息傳送時攜帶的routingkey,導致後面無法路由為剛才配置的delay.exchange
3.配置延時路由規則
需要延時的訊息到exchange後先路由到指定的延時佇列
1)建立delaysync.exchange通過Routing key將訊息路由到延時佇列
2.配置delay.exchange 將訊息投遞到正常的消費佇列
配置完成。
下面使用程式碼測試一下:
生產者:
package cn.slimsmart.study.rabbitmq.delayqueue.queue;
import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer {
private static String queue_name = "test.queue";
public static void main(String[] args) throws IOException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("10.1.199.169");
factory.setUsername("admin");
factory.setPassword("123456");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 宣告佇列
channel.queueDeclare(queue_name, true, false, false, null);
String message = "hello world!" + System.currentTimeMillis();
channel.basicPublish("delaysync.exchange", "deal.message", null, message.getBytes());
System.out.println("sent message: " + message + ",date:" + System.currentTimeMillis());
// 關閉頻道和連線
channel.close();
connection.close();
}
}
消費者:
package cn.slimsmart.study.rabbitmq.delayqueue.queue;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
public class Consumer {
private static String queue_name = "test.queue";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("10.1.199.169");
factory.setUsername("admin");
factory.setPassword("123456");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 宣告佇列
channel.queueDeclare(queue_name, true, false, false, null);
QueueingConsumer consumer = new QueueingConsumer(channel);
// 指定消費佇列
channel.basicConsume(queue_name, true, consumer);
while (true) {
// nextDelivery是一個阻塞方法(內部實現其實是阻塞佇列的take方法)
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println("received message:" + message + ",date:" + System.currentTimeMillis());
}
}
}
二、在訊息上設定TTL
實現程式碼:
生產者:
package cn.slimsmart.study.rabbitmq.delayqueue.message;
import java.io.IOException;
import java.util.HashMap;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer {
private static String queue_name = "message_ttl_queue";
public static void main(String[] args) throws IOException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("10.1.199.169");
factory.setUsername("admin");
factory.setPassword("123456");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
HashMap<String, Object> arguments = new HashMap<String, Object>();
arguments.put("x-dead-letter-exchange", "amq.direct");
arguments.put("x-dead-letter-routing-key", "message_ttl_routingKey");
channel.queueDeclare("delay_queue", true, false, false, arguments);
// 宣告佇列
channel.queueDeclare(queue_name, true, false, false, null);
// 繫結路由
channel.queueBind(queue_name, "amq.direct", "message_ttl_routingKey");
String message = "hello world!" + System.currentTimeMillis();
// 設定延時屬性
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
// 永續性 non-persistent (1) or persistent (2)
AMQP.BasicProperties properties = builder.expiration("300000").deliveryMode(2).build();
// routingKey =delay_queue 進行轉發
channel.basicPublish("", "delay_queue", properties, message.getBytes());
System.out.println("sent message: " + message + ",date:" + System.currentTimeMillis());
// 關閉頻道和連線
channel.close();
connection.close();
}
}
消費者:
package cn.slimsmart.study.rabbitmq.delayqueue.message;
import java.util.HashMap;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
public class Consumer {
private static String queue_name = "message_ttl_queue";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("10.1.199.169");
factory.setUsername("admin");
factory.setPassword("123456");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
HashMap<String, Object> arguments = new HashMap<String, Object>();
arguments.put("x-dead-letter-exchange", "amq.direct");
arguments.put("x-dead-letter-routing-key", "message_ttl_routingKey");
channel.queueDeclare("delay_queue", true, false, false, arguments);
// 宣告佇列
channel.queueDeclare(queue_name, true, false, false, null);
// 繫結路由
channel.queueBind(queue_name, "amq.direct", "message_ttl_routingKey");
QueueingConsumer consumer = new QueueingConsumer(channel);
// 指定消費佇列
channel.basicConsume(queue_name, true, consumer);
while (true) {
// nextDelivery是一個阻塞方法(內部實現其實是阻塞佇列的take方法)
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println("received message:" + message + ",date:" + System.currentTimeMillis());
}
}
}
spring-rabbit整合教程
maven依賴:
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>1.4.6.RELEASE</version>
</dependency>
spring配置檔案(在檔案頭部引入rabbit的名稱空間和約束檔案):
<?xml version="1.0" encoding="UTF-8"?><!-- 定義Rabbit,指定連線工廠 -->
<rabbit:connection-factory id="connectionFactory" host="你的rabbitMQ服務的ip" virtual-host="/vhost名稱" username="使用者名稱" password="密碼" port="5672" />
<!-- MQ的管理,包括佇列、交換器等 -->
<rabbit:admin connection-factory="connectionFactory"/>
<!-- 定義Rabbit模板,指定連線工廠以及定義exchange -->
<rabbit:template id="amqpTemplate" exchange="my_exchange" connection-factory="connectionFactory" />
<!-- queue 佇列宣告 -->
<!-- durable 是否持久化 ,exclusive 僅建立者可以使用的私有佇列,斷開後自動刪除 ,auto-delete 當所有消費端連線斷開後,是否自動刪除佇列 -->
<rabbit:queue name="my_queue" durable="true" auto-delete="false" exclusive="false"/>
<!-- 交換機定義 -->
<!-- direct-exchange 模式:訊息與一個特定的路由器完全匹配,才會轉發; topic-exchange 模式:按規則轉發訊息,最靈活 -->
<rabbit:topic-exchange name="my_exchange" durable="true" auto-delete="false">
<rabbit:bindings>
<!-- 設定訊息Queue匹配的pattern (direct模式為key) -->
<rabbit:binding queue="my_queue" pattern="my_patt"/>
</rabbit:bindings>
</rabbit:topic-exchange>
<!-- 引入消費者 -->
<bean id="rabbitmqService" class="com.group.service.RabbitmqService" />
<!-- 配置監聽 消費者 acknowledeg = manual,auto,none -->
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto" >
<!-- queues 監聽佇列,多個用逗號分隔; ref 監聽器 -->
<rabbit:listener queue-names="my_queue" ref="rabbitmqService" method="test"/>
</rabbit:listener-container>
那麼在專案中裝配amqpTemplate中就可以傳送訊息了
作者:MC-閏土
來源:CSDN
原文:https://blog.csdn.net/qq_22075041/article/details/78885113
版權宣告:本文為博主原創文章,轉載請附上博文連結!