1. 程式人生 > >java實現rabbitMQ延時佇列詳解以及spring-rabbit整合教程

java實現rabbitMQ延時佇列詳解以及spring-rabbit整合教程

java實現rabbitMQ延時佇列詳解

這是我在公司開發中使用的倆套方案,感興趣的話可以看一下:點選下載

在實際的業務中我們會遇見生產者產生的訊息,不立即消費,而是延時一段時間在消費。RabbitMQ本身沒有直接支援延遲佇列功能,但是我們可以根據其特性Per-Queue Message TTL和 Dead Letter Exchanges實現延時佇列。也可以通過改特性設定訊息的優先順序。

1.Per-Queue Message TTL
RabbitMQ可以針對訊息和佇列設定TTL(過期時間)。佇列中的訊息過期時間(Time To Live, TTL)有兩種方法可以設定。第一種方法是通過佇列屬性設定,佇列中所有訊息都有相同的過期時間。第二種方法是對訊息進行單獨設定,每條訊息TTL可以不同。如果上述兩種方法同時使用,則訊息的過期時間以兩者之間TTL較小的那個數值為準。訊息在佇列的生存時間一旦超過設定的TTL值,就成為dead message,消費者將無法再收到該訊息。
2.Dead Letter Exchanges


當訊息在一個佇列中變成死信後,它能被重新publish到另一個Exchange。訊息變成Dead Letter一向有以下幾種情況:
訊息被拒絕(basic.reject or basic.nack)並且requeue=false
訊息TTL過期
佇列達到最大長度
實際上就是設定某個佇列的屬性,當這個佇列中有Dead Letter時,RabbitMQ就會自動的將這個訊息重新發布到設定的Exchange中去,進而被路由到另一個佇列,publish可以監聽這個佇列中訊息做相應的處理,這個特性可以彌補RabbitMQ 3.0.0以前支援的immediate引數中的向publish確認的功能。

雖然 consumer 從來看不到過期的 message ,但是在過期 message 到達 queue 的頭部時確實會被真正的丟棄(或者 dead-lettered )。當對每一個 queue 設定了 TTL 值時不會產生任何問題,因為過期的 message 總是會出現在 queue 的頭部。當對每一條 message 設定了 TTL 時,過期的 message 可能會排隊於未過期 message 的後面,直到這些訊息被 consume 到或者過期了。在這種情況下,這些過期的 message 使用的資源將不會被釋放,且會在 queue 統計資訊中被計算進去(例如,queue 中存在的 message 的數量)。

對於第一種設定佇列TTL屬性的方法,一旦訊息過期,就會從佇列中抹去,而第二種方法裡,即使訊息過期,也不會馬上從佇列中抹去,因為每條訊息是否過期時在即將投遞到消費者之前判定的,為什麼兩者得處理方法不一致?因為第一種方法裡,佇列中已過期的訊息肯定在佇列頭部,RabbitMQ只要定期從隊頭開始掃描是否有過期訊息即可,而第二種方法裡,每條訊息的過期時間不同,如果要刪除所有過期訊息,勢必要掃描整個佇列,所以不如等到此訊息即將被消費時再判定是否過期,如果過期,再進行刪除。

一、在佇列上設定TTL

1.建立delay.exchange

這裡Internal設定為NO,否則將無法接受dead letter,YES表示這個exchange不可以被client用來推送訊息,僅用來進行exchange和exchange之間的繫結。

2.建立延時佇列(delay queue)

如上配置延時5min佇列(x-message-ttl=300000)

x-max-length:最大積壓的訊息個數,可以根據自己的實際情況設定,超過限制訊息不會丟失,會立即轉向delay.exchange進行投遞

x-dead-letter-exchange:設定為剛剛配置好的delay.exchange,訊息過期後會通過delay.exchange進行投遞

這裡不需要配置"dead letter routing key"否則會覆蓋掉訊息傳送時攜帶的routingkey,導致後面無法路由為剛才配置的delay.exchange

3.配置延時路由規則

需要延時的訊息到exchange後先路由到指定的延時佇列

1)建立delaysync.exchange通過Routing key將訊息路由到延時佇列

2.配置delay.exchange 將訊息投遞到正常的消費佇列

配置完成。

下面使用程式碼測試一下:

生產者:

package cn.slimsmart.study.rabbitmq.delayqueue.queue;  
  
import java.io.IOException;  
  
import com.rabbitmq.client.Channel;  
import com.rabbitmq.client.Connection;  
import com.rabbitmq.client.ConnectionFactory;  
  
public class Producer {  
  
    private static String queue_name = "test.queue";  
  
    public static void main(String[] args) throws IOException {  
        ConnectionFactory factory = new ConnectionFactory();  
        factory.setHost("10.1.199.169");  
        factory.setUsername("admin");  
        factory.setPassword("123456");  
        Connection connection = factory.newConnection();  
        Channel channel = connection.createChannel();  
        // 宣告佇列  
        channel.queueDeclare(queue_name, true, false, false, null);  
        String message = "hello world!" + System.currentTimeMillis();  
        channel.basicPublish("delaysync.exchange", "deal.message", null, message.getBytes());  
        System.out.println("sent message: " + message + ",date:" + System.currentTimeMillis());  
        // 關閉頻道和連線  
        channel.close();  
        connection.close();  
    }  
}  

 消費者:  

package cn.slimsmart.study.rabbitmq.delayqueue.queue;  
  
import com.rabbitmq.client.Channel;  
import com.rabbitmq.client.Connection;  
import com.rabbitmq.client.ConnectionFactory;  
import com.rabbitmq.client.QueueingConsumer;  
  
public class Consumer {  
  
    private static String queue_name = "test.queue";  
  
    public static void main(String[] args) throws Exception {  
        ConnectionFactory factory = new ConnectionFactory();  
        factory.setHost("10.1.199.169");  
        factory.setUsername("admin");  
        factory.setPassword("123456");  
        Connection connection = factory.newConnection();  
        Channel channel = connection.createChannel();  
        // 宣告佇列  
        channel.queueDeclare(queue_name, true, false, false, null);  
        QueueingConsumer consumer = new QueueingConsumer(channel);  
        // 指定消費佇列  
        channel.basicConsume(queue_name, true, consumer);  
        while (true) {  
            // nextDelivery是一個阻塞方法(內部實現其實是阻塞佇列的take方法)  
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();  
            String message = new String(delivery.getBody());  
            System.out.println("received message:" + message + ",date:" + System.currentTimeMillis());  
        }  
    }  
  
}  

二、在訊息上設定TTL

實現程式碼:

生產者: 

package cn.slimsmart.study.rabbitmq.delayqueue.message;  
  
import java.io.IOException;  
import java.util.HashMap;  
  
import com.rabbitmq.client.AMQP;  
import com.rabbitmq.client.Channel;  
import com.rabbitmq.client.Connection;  
import com.rabbitmq.client.ConnectionFactory;  
  
public class Producer {  
  
    private static String queue_name = "message_ttl_queue";  
  
    public static void main(String[] args) throws IOException {  
        ConnectionFactory factory = new ConnectionFactory();  
        factory.setHost("10.1.199.169");  
        factory.setUsername("admin");  
        factory.setPassword("123456");  
        Connection connection = factory.newConnection();  
        Channel channel = connection.createChannel();  
        HashMap<String, Object> arguments = new HashMap<String, Object>();  
        arguments.put("x-dead-letter-exchange", "amq.direct");  
        arguments.put("x-dead-letter-routing-key", "message_ttl_routingKey");  
        channel.queueDeclare("delay_queue", true, false, false, arguments);  
  
        // 宣告佇列  
        channel.queueDeclare(queue_name, true, false, false, null);  
        // 繫結路由  
        channel.queueBind(queue_name, "amq.direct", "message_ttl_routingKey");  
  
        String message = "hello world!" + System.currentTimeMillis();  
        // 設定延時屬性  
        AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();  
        // 永續性 non-persistent (1) or persistent (2)  
        AMQP.BasicProperties properties = builder.expiration("300000").deliveryMode(2).build();  
        // routingKey =delay_queue 進行轉發  
        channel.basicPublish("", "delay_queue", properties, message.getBytes());  
        System.out.println("sent message: " + message + ",date:" + System.currentTimeMillis());  
        // 關閉頻道和連線  
        channel.close();  
        connection.close();  
    }  
}  

消費者:

package cn.slimsmart.study.rabbitmq.delayqueue.message;  
  
import java.util.HashMap;  
  
import com.rabbitmq.client.Channel;  
import com.rabbitmq.client.Connection;  
import com.rabbitmq.client.ConnectionFactory;  
import com.rabbitmq.client.QueueingConsumer;  
  
public class Consumer {  
  
    private static String queue_name = "message_ttl_queue";  
  
    public static void main(String[] args) throws Exception {  
        ConnectionFactory factory = new ConnectionFactory();  
        factory.setHost("10.1.199.169");  
        factory.setUsername("admin");  
        factory.setPassword("123456");  
        Connection connection = factory.newConnection();  
        Channel channel = connection.createChannel();  
        HashMap<String, Object> arguments = new HashMap<String, Object>();  
        arguments.put("x-dead-letter-exchange", "amq.direct");  
        arguments.put("x-dead-letter-routing-key", "message_ttl_routingKey");  
        channel.queueDeclare("delay_queue", true, false, false, arguments);  
  
        // 宣告佇列  
        channel.queueDeclare(queue_name, true, false, false, null);  
        // 繫結路由  
        channel.queueBind(queue_name, "amq.direct", "message_ttl_routingKey");  
  
        QueueingConsumer consumer = new QueueingConsumer(channel);  
        // 指定消費佇列  
        channel.basicConsume(queue_name, true, consumer);  
        while (true) {  
            // nextDelivery是一個阻塞方法(內部實現其實是阻塞佇列的take方法)  
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();  
            String message = new String(delivery.getBody());  
            System.out.println("received message:" + message + ",date:" + System.currentTimeMillis());  
        }  
    }  
  
}  


spring-rabbit整合教程

maven依賴:

        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit</artifactId>
            <version>1.4.6.RELEASE</version>
        </dependency>
spring配置檔案(在檔案頭部引入rabbit的名稱空間和約束檔案):
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:aop="http://www.springframework.org/schema/aop"
	xmlns:context="http://www.springframework.org/schema/context"
	xmlns:mvc="http://www.springframework.org/schema/mvc"
	xmlns:task="http://www.springframework.org/schema/task"
	xmlns:tx="http://www.springframework.org/schema/tx"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xmlns:rabbit="http://www.springframework.org/schema/rabbit"
	xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
		http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
		http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc.xsd
		http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd
		http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd
		http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd
		http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.0.xsd">
	
	<!-- 定義Rabbit,指定連線工廠 -->
	<rabbit:connection-factory id="connectionFactory"  host="你的rabbitMQ服務的ip"  virtual-host="/vhost名稱"  username="使用者名稱"  password="密碼"  port="5672"  />      
	
	<!-- MQ的管理,包括佇列、交換器等 -->
	<rabbit:admin connection-factory="connectionFactory"/>
	
	<!-- 定義Rabbit模板,指定連線工廠以及定義exchange -->
	<rabbit:template id="amqpTemplate" exchange="my_exchange" connection-factory="connectionFactory"  />	
	
	<!-- queue 佇列宣告 --> 
    <!--    durable 是否持久化 ,exclusive 僅建立者可以使用的私有佇列,斷開後自動刪除 ,auto-delete 當所有消費端連線斷開後,是否自動刪除佇列   -->
    <rabbit:queue name="my_queue" durable="true" auto-delete="false" exclusive="false"/>

    <!-- 交換機定義 -->
    <!-- direct-exchange 模式:訊息與一個特定的路由器完全匹配,才會轉發; topic-exchange 模式:按規則轉發訊息,最靈活 -->
	<rabbit:topic-exchange name="my_exchange" durable="true" auto-delete="false">
		    <rabbit:bindings>
		        <!-- 設定訊息Queue匹配的pattern (direct模式為key) -->
		        <rabbit:binding  queue="my_queue"  pattern="my_patt"/>
		    </rabbit:bindings>
	</rabbit:topic-exchange>
	
	
	<!-- 引入消費者 -->
	<bean id="rabbitmqService" class="com.group.service.RabbitmqService"  />
	    
    <!-- 配置監聽 消費者   acknowledeg = manual,auto,none   -->
    <rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto" >
        <!-- queues 監聽佇列,多個用逗號分隔; ref 監聽器 -->
        <rabbit:listener queue-names="my_queue"  ref="rabbitmqService"  method="test"/>
    </rabbit:listener-container>
    
</beans>

那麼在專案中裝配amqpTemplate中就可以傳送訊息了