Java 使用RabbitMQ外掛實現延時佇列
阿新 • • 發佈:2018-12-27
Springboot專案,windows環境
環境配置
在rabbitmq 3.5.7及以上的版本提供了一個外掛(rabbitmq-delayed-message-exchange)來實現延遲佇列功能。同時外掛依賴Erlang/OPT 18.0及以上。
外掛下載地址: http://www.rabbitmq.com/community-plugins.html
在 rabbitmq_delayed_message_exchange 一欄中選擇,根據自己RabbitMQ的版本下載適合自己版本的外掛,將外掛放在RabbitMQ安裝目錄下plugins目錄中,將名字改為: rabbitmq_delayed_message_exchange-0.0.1.ez
或 rabbitmq_delayed_message_exchange-3.7.7.ez (我的版本是3.7.7) 。
關閉RabbitMQ服務,開啟命令視窗,到sbin目錄中, 執行命令:(根據自己的安裝目錄調整)
"{RabbitMQ 安裝目錄}\sbin\rabbitmq-plugins.bat" enable rabbitmq_delayed_message_exchange
然後再開啟服務。 OK,環境配置完成。
程式碼實現
1. 建立路由、佇列
import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeoutException; import org.junit.Test; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * 建立佇列和交換機 */ public class RabbitConfig { static final String exchangeName = "test_exchange"; static final String queueName = "test_queue"; static final String routingKey = "test_queue"; /** * 建立路由、佇列 */ @Test public void binding() throws IOException, TimeoutException{ //建立連線,建立通道 ConnectionFactory fc = new ConnectionFactory(); fc.setHost("localhost"); fc.setPort(5672); fc.setUsername("guest"); fc.setPassword("guest"); Connection conn = fc.newConnection(); Channel channel = conn.createChannel(); channel.exchangeDelete(exchangeName); //刪除路由 Map<String, Object> map = new HashMap<>(); map.put("x-delayed-type", "direct"); channel.exchangeDeclare(exchangeName, "x-delayed-message",false, false,map); //建立路由 channel.queueDelete(queueName); //刪除佇列 channel.queueDeclare(queueName, true, false, false, null); //建立佇列 channel.queueBind(queueName, exchangeName, routingKey); //繫結路由、佇列 channel.close(); conn.close(); } }
2. 訊息傳送者
import java.time.LocalDateTime; import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * 延遲訊息 釋出者 */ @Component public class DelayProducer { @Autowired RabbitTemplate rabbitTemplate; public void sendMsg(){ String msg = "測試延時de訊息|"+LocalDateTime.now(); rabbitTemplate.convertAndSend(RabbitConfig.exchangeName, RabbitConfig.routingKey, msg, (message) ->{ message.getMessageProperties().setHeader("x-delay", 9000); //延遲9秒 return message; }); } }
3.訊息接收者
import java.time.LocalDateTime;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* 消費者
*/
@Component
public class DelayConsumer {
@RabbitListener(queues = "test_queue")
public void receive(String msg) {
System.out.println("接收到的訊息:"+msg +"||"+LocalDateTime.now());
}
}
4. 測試結果:
接收到的訊息:測試延時de訊息|2018-07-26T19:22:49.895||2018-07-26T19:22:58.920