java實現rabbitMQ延時佇列詳解以及spring-rabbit整合教程
java實現rabbitMQ延時佇列詳解
這是我在公司開發中使用的倆套方案,感興趣的話可以看一下:點選下載
在實際的業務中我們會遇見生產者產生的訊息,不立即消費,而是延時一段時間在消費。RabbitMQ本身沒有直接支援延遲佇列功能,但是我們可以根據其特性Per-Queue Message TTL和 Dead Letter Exchanges實現延時佇列。也可以通過改特性設定訊息的優先順序。
1.Per-Queue Message TTL
RabbitMQ可以針對訊息和佇列設定TTL(過期時間)。佇列中的訊息過期時間(Time To Live, TTL)有兩種方法可以設定。第一種方法是通過佇列屬性設定,佇列中所有訊息都有相同的過期時間。第二種方法是對訊息進行單獨設定,每條訊息TTL可以不同。如果上述兩種方法同時使用,則訊息的過期時間以兩者之間TTL較小的那個數值為準。訊息在佇列的生存時間一旦超過設定的TTL值,就成為dead message,消費者將無法再收到該訊息。
2.Dead Letter Exchanges
當訊息在一個佇列中變成死信後,它能被重新publish到另一個Exchange。訊息變成Dead Letter一向有以下幾種情況:
訊息被拒絕(basic.reject or basic.nack)並且requeue=false
訊息TTL過期
佇列達到最大長度
實際上就是設定某個佇列的屬性,當這個佇列中有Dead Letter時,RabbitMQ就會自動的將這個訊息重新發布到設定的Exchange中去,進而被路由到另一個佇列,publish可以監聽這個佇列中訊息做相應的處理,這個特性可以彌補RabbitMQ 3.0.0以前支援的immediate引數中的向publish確認的功能。
雖然 consumer 從來看不到過期的 message ,但是在過期 message 到達 queue 的頭部時確實會被真正的丟棄(或者 dead-lettered )。當對每一個 queue 設定了 TTL 值時不會產生任何問題,因為過期的
message 總是會出現在 queue 的頭部。當對每一條 message 設定了 TTL 時,過期的 message 可能會排隊於未過期 message 的後面,直到這些訊息被 consume 到或者過期了。在這種情況下,這些過期的 message 使用的資源將不會被釋放,且會在 queue 統計資訊中被計算進去(例如,queue 中存在的 message 的數量)。
一、在佇列上設定TTL
1.建立delay.exchange
這裡Internal設定為NO,否則將無法接受dead letter,YES表示這個exchange不可以被client用來推送訊息,僅用來進行exchange和exchange之間的繫結。
2.建立延時佇列(delay queue)
如上配置延時5min佇列(x-message-ttl=300000)
x-max-length:最大積壓的訊息個數,可以根據自己的實際情況設定,超過限制訊息不會丟失,會立即轉向delay.exchange進行投遞
x-dead-letter-exchange:設定為剛剛配置好的delay.exchange,訊息過期後會通過delay.exchange進行投遞
這裡不需要配置"dead letter routing key"否則會覆蓋掉訊息傳送時攜帶的routingkey,導致後面無法路由為剛才配置的delay.exchange
3.配置延時路由規則
需要延時的訊息到exchange後先路由到指定的延時佇列
1)建立delaysync.exchange通過Routing key將訊息路由到延時佇列
2.配置delay.exchange 將訊息投遞到正常的消費佇列
配置完成。
下面使用程式碼測試一下:
生產者:
package cn.slimsmart.study.rabbitmq.delayqueue.queue;
import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer {
private static String queue_name = "test.queue";
public static void main(String[] args) throws IOException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("10.1.199.169");
factory.setUsername("admin");
factory.setPassword("123456");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 宣告佇列
channel.queueDeclare(queue_name, true, false, false, null);
String message = "hello world!" + System.currentTimeMillis();
channel.basicPublish("delaysync.exchange", "deal.message", null, message.getBytes());
System.out.println("sent message: " + message + ",date:" + System.currentTimeMillis());
// 關閉頻道和連線
channel.close();
connection.close();
}
}
消費者:
package cn.slimsmart.study.rabbitmq.delayqueue.queue;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
public class Consumer {
private static String queue_name = "test.queue";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("10.1.199.169");
factory.setUsername("admin");
factory.setPassword("123456");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 宣告佇列
channel.queueDeclare(queue_name, true, false, false, null);
QueueingConsumer consumer = new QueueingConsumer(channel);
// 指定消費佇列
channel.basicConsume(queue_name, true, consumer);
while (true) {
// nextDelivery是一個阻塞方法(內部實現其實是阻塞佇列的take方法)
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println("received message:" + message + ",date:" + System.currentTimeMillis());
}
}
}
二、在訊息上設定TTL
實現程式碼:
生產者:
package cn.slimsmart.study.rabbitmq.delayqueue.message;
import java.io.IOException;
import java.util.HashMap;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer {
private static String queue_name = "message_ttl_queue";
public static void main(String[] args) throws IOException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("10.1.199.169");
factory.setUsername("admin");
factory.setPassword("123456");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
HashMap<String, Object> arguments = new HashMap<String, Object>();
arguments.put("x-dead-letter-exchange", "amq.direct");
arguments.put("x-dead-letter-routing-key", "message_ttl_routingKey");
channel.queueDeclare("delay_queue", true, false, false, arguments);
// 宣告佇列
channel.queueDeclare(queue_name, true, false, false, null);
// 繫結路由
channel.queueBind(queue_name, "amq.direct", "message_ttl_routingKey");
String message = "hello world!" + System.currentTimeMillis();
// 設定延時屬性
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
// 永續性 non-persistent (1) or persistent (2)
AMQP.BasicProperties properties = builder.expiration("300000").deliveryMode(2).build();
// routingKey =delay_queue 進行轉發
channel.basicPublish("", "delay_queue", properties, message.getBytes());
System.out.println("sent message: " + message + ",date:" + System.currentTimeMillis());
// 關閉頻道和連線
channel.close();
connection.close();
}
}
消費者:
package cn.slimsmart.study.rabbitmq.delayqueue.message;
import java.util.HashMap;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
public class Consumer {
private static String queue_name = "message_ttl_queue";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("10.1.199.169");
factory.setUsername("admin");
factory.setPassword("123456");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
HashMap<String, Object> arguments = new HashMap<String, Object>();
arguments.put("x-dead-letter-exchange", "amq.direct");
arguments.put("x-dead-letter-routing-key", "message_ttl_routingKey");
channel.queueDeclare("delay_queue", true, false, false, arguments);
// 宣告佇列
channel.queueDeclare(queue_name, true, false, false, null);
// 繫結路由
channel.queueBind(queue_name, "amq.direct", "message_ttl_routingKey");
QueueingConsumer consumer = new QueueingConsumer(channel);
// 指定消費佇列
channel.basicConsume(queue_name, true, consumer);
while (true) {
// nextDelivery是一個阻塞方法(內部實現其實是阻塞佇列的take方法)
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println("received message:" + message + ",date:" + System.currentTimeMillis());
}
}
}
spring-rabbit整合教程
maven依賴:
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>1.4.6.RELEASE</version>
</dependency>
spring配置檔案(在檔案頭部引入rabbit的名稱空間和約束檔案):<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:mvc="http://www.springframework.org/schema/mvc"
xmlns:task="http://www.springframework.org/schema/task"
xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd
http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd
http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.0.xsd">
<!-- 定義Rabbit,指定連線工廠 -->
<rabbit:connection-factory id="connectionFactory" host="你的rabbitMQ服務的ip" virtual-host="/vhost名稱" username="使用者名稱" password="密碼" port="5672" />
<!-- MQ的管理,包括佇列、交換器等 -->
<rabbit:admin connection-factory="connectionFactory"/>
<!-- 定義Rabbit模板,指定連線工廠以及定義exchange -->
<rabbit:template id="amqpTemplate" exchange="my_exchange" connection-factory="connectionFactory" />
<!-- queue 佇列宣告 -->
<!-- durable 是否持久化 ,exclusive 僅建立者可以使用的私有佇列,斷開後自動刪除 ,auto-delete 當所有消費端連線斷開後,是否自動刪除佇列 -->
<rabbit:queue name="my_queue" durable="true" auto-delete="false" exclusive="false"/>
<!-- 交換機定義 -->
<!-- direct-exchange 模式:訊息與一個特定的路由器完全匹配,才會轉發; topic-exchange 模式:按規則轉發訊息,最靈活 -->
<rabbit:topic-exchange name="my_exchange" durable="true" auto-delete="false">
<rabbit:bindings>
<!-- 設定訊息Queue匹配的pattern (direct模式為key) -->
<rabbit:binding queue="my_queue" pattern="my_patt"/>
</rabbit:bindings>
</rabbit:topic-exchange>
<!-- 引入消費者 -->
<bean id="rabbitmqService" class="com.group.service.RabbitmqService" />
<!-- 配置監聽 消費者 acknowledeg = manual,auto,none -->
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto" >
<!-- queues 監聽佇列,多個用逗號分隔; ref 監聽器 -->
<rabbit:listener queue-names="my_queue" ref="rabbitmqService" method="test"/>
</rabbit:listener-container>
</beans>
那麼在專案中裝配amqpTemplate中就可以傳送訊息了