1. 程式人生 > >Java 使用RabbitMQ外掛實現延時佇列

Java 使用RabbitMQ外掛實現延時佇列

Springboot專案,windows環境

環境配置

在rabbitmq 3.5.7及以上的版本提供了一個外掛(rabbitmq-delayed-message-exchange)來實現延遲佇列功能。同時外掛依賴Erlang/OPT 18.0及以上。

外掛下載地址: http://www.rabbitmq.com/community-plugins.html

在 rabbitmq_delayed_message_exchange 一欄中選擇,根據自己RabbitMQ的版本下載適合自己版本的外掛,將外掛放在RabbitMQ安裝目錄下plugins目錄中,將名字改為: rabbitmq_delayed_message_exchange-0.0.1.ez

或 rabbitmq_delayed_message_exchange-3.7.7.ez  (我的版本是3.7.7) 。

關閉RabbitMQ服務,開啟命令視窗,到sbin目錄中, 執行命令:(根據自己的安裝目錄調整)

"{RabbitMQ 安裝目錄}\sbin\rabbitmq-plugins.bat" enable rabbitmq_delayed_message_exchange

然後再開啟服務。 OK,環境配置完成。

程式碼實現

1. 建立路由、佇列


import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

import org.junit.Test;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 *  建立佇列和交換機
 */
public class RabbitConfig { 

    static final String exchangeName = "test_exchange";
    static final String queueName = "test_queue";
    static final String routingKey = "test_queue";
    
    /**
     * 建立路由、佇列
     */
    @Test
    public void binding() throws IOException, TimeoutException{

        //建立連線,建立通道
        ConnectionFactory fc = new ConnectionFactory();
        fc.setHost("localhost");
        fc.setPort(5672);
        fc.setUsername("guest");
        fc.setPassword("guest");
        
        Connection conn = fc.newConnection();
        Channel channel = conn.createChannel();
        

        channel.exchangeDelete(exchangeName);  //刪除路由

        Map<String, Object> map = new HashMap<>();
        map.put("x-delayed-type", "direct");
        channel.exchangeDeclare(exchangeName, "x-delayed-message",false, false,map);  //建立路由
        
        channel.queueDelete(queueName);  //刪除佇列
        channel.queueDeclare(queueName, true, false, false, null);  //建立佇列
        
        channel.queueBind(queueName, exchangeName, routingKey);  //繫結路由、佇列

        channel.close();
        conn.close();
        
    }
    
    
}

2. 訊息傳送者


import java.time.LocalDateTime;

import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * 延遲訊息 釋出者
 */
@Component
public class DelayProducer {

    @Autowired
    RabbitTemplate rabbitTemplate;

    public void sendMsg(){
        
        String msg = "測試延時de訊息|"+LocalDateTime.now();
        
        rabbitTemplate.convertAndSend(RabbitConfig.exchangeName, RabbitConfig.routingKey, msg, (message) ->{
            message.getMessageProperties().setHeader("x-delay", 9000); //延遲9秒
            return message;
        });
        
    }
    

}

3.訊息接收者


import java.time.LocalDateTime;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 消費者 
 */
@Component
public class DelayConsumer {

    @RabbitListener(queues = "test_queue")
    public void receive(String msg) {
      System.out.println("接收到的訊息:"+msg +"||"+LocalDateTime.now());
    }
}

4. 測試結果:

接收到的訊息:測試延時de訊息|2018-07-26T19:22:49.895||2018-07-26T19:22:58.920