rubbitMq+spring實現延遲佇列(適用訂單超時未支付的問題)
阿新 • • 發佈:2018-12-02
什麼是延遲佇列
通俗一點說,延遲佇列和我們生活中常用的定時器有點像,定時器會在指定的時間後響起,延遲佇列則會在指定的時間後處理訊息。延遲佇列主要的應用場景有訂單超時取消、超時自動評價等等。
實現原理
RabbitMQ給我們提供了TTL(Time-To-Live)和DLX (Dead-Letter-Exchange)這兩個特性,使用RabbitMQ實現延遲佇列利用的正是這兩個特性。TTL用於控制訊息的生存時間,如果超時,訊息將變成Dead Letter。DLX用於配置死信佇列,可以通過配置x-dead-letter-exchange和x-dead-letter-routing-key這兩個引數來指定訊息變成死信後需要路由到的交換器和佇列。
圖例
首先看看死信佇列的圖
然後就是延遲佇列,延遲佇列本身就是死信佇列的擴充套件
程式碼例項
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd"> <!--建立連線工廠--> <rabbit:connection-factory id="mqConnectionFactory" host="***.***.**.**" port="****" username="***" password="***" publisher-confirms="true" virtual-host="*****"/> <!--通過指定下面的admin資訊,當前producer中的exchange和queue會在rabbitmq伺服器上自動生成 --> <rabbit:admin id="orderConnectAdmin" connection-factory="mqConnectionFactory"/> <!-- <rabbit:admin id="delayConnectAdmin" connection-factory="delayConnectionFactory"/>--> <!-- 說明:將需要關閉的訂單的訊息傳送到此佇列 durable: 為 true 則設定佇列為持久化。持久化的佇列會存檔,在伺服器重啟的時候可以保證不丟失相關資訊。 autoDelete: 設定是否自動刪除。為 true 則設定佇列為自動刪除。自動刪除的前提是:至少有一個消費者連線到 這個佇列,之後所有與這個佇列連線的消費者都斷開時,才會自動刪除 --> <rabbit:queue name="shanreal_order_shutdown_queue" durable="false" auto-declare="true"> <rabbit:queue-arguments> <entry key="x-message-ttl" value="10000" value-type="java.lang.Long" /> <entry key="x-dead-letter-exchange" value="shanreal_exchange_delay" /> <entry key="x-dead-letter-routing-key" value="shanreal_delay_key" /> </rabbit:queue-arguments> </rabbit:queue> <!--正常交換機 durable:true 持久化。可以將交換器存檔,在伺服器重啟 的時候不會丟失相關資訊。false 反之 auto-declare:true 自動刪除。自動刪除的前提是至少有一個佇列或者交換器與這個交換器繫結 , 之後所有與這個交換器繫結的佇列或者交換器都與此解綁 --> <rabbit:fanout-exchange name="shanreal_exchange_normal" durable="false" auto-delete="true" id="shanreal_exchange_normal"> <rabbit:bindings> <rabbit:binding queue="shanreal_order_shutdown_queue" /> </rabbit:bindings> </rabbit:fanout-exchange> <!--死亡佇列,用於儲存超時訊息--> <rabbit:queue name="shanreal_delay_queue" durable="false" auto-declare="true"/> <!--死亡交換機,訊息過時後會由此交換機轉發到對應的佇列--> <rabbit:direct-exchange name="shanreal_exchange_delay" durable="false" auto-delete="true" id="shanreal_exchange_delay"> <rabbit:bindings> <rabbit:binding queue="shanreal_delay_queue" key="shanreal_delay_key"/> </rabbit:bindings> </rabbit:direct-exchange> <!--定義rabbit orderAmqpTemplate用於資料的生產 --> <rabbit:template id="orderAmqpTemplate" connection-factory="mqConnectionFactory" exchange="shanreal_exchange_normal"/> <!--定義rabbit delayAmqpTemplate用於死信佇列資料的消費 --> <rabbit:template id="delayAmqpTemplate" connection-factory="mqConnectionFactory" exchange="shanreal_exchange_delay" queue="shanreal_delay_queue"/> <!-- 配置執行緒池 --> <bean id ="taskExecutor" class ="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor" > <!-- 執行緒池維護執行緒的最少數量 --> <property name ="corePoolSize" value ="5" /> <!-- 執行緒池維護執行緒所允許的空閒時間 --> <property name ="keepAliveSeconds" value ="30000" /> <!-- 執行緒池維護執行緒的最大數量 --> <property name ="maxPoolSize" value ="1000" /> <!-- 執行緒池所使用的緩衝佇列 --> <property name ="queueCapacity" value ="200" /> </bean> <bean id="delayTask" class="com.shanreal.controller.gb.rubbitMq.DelayTask" /> <!-- Queue Listener 當有訊息到達時會通知監聽在對應的佇列上的監聽物件--> <rabbit:listener-container connection-factory="mqConnectionFactory" acknowledge="auto" task-executor="taskExecutor"> <rabbit:listener queues="shanreal_delay_queue" ref="delayTask"/> </rabbit:listener-container> </beans>
生產者:
這裡可以單獨對每個訊息設定TTL(訊息生存時間)
import com.rabbitmq.client.ConnectionFactory; import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.stereotype.Service; import javax.annotation.Resource; @Service public class ProducerService { /* @Resource(name="mqConnectionFactory") private ConnectionFactory mqConnectionFactory;*/ @Resource(name="orderAmqpTemplate") private AmqpTemplate orderAmqpTemplate; public void send(String msg) { orderAmqpTemplate.convertAndSend((Object) msg, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setDelay(10000); return message; } }); System.out.println("Sent: " + msg); } }
消費者:
這裡可以手動消費
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
@Service
public class ConsumerService {
@Resource(name="delayAmqpTemplate")
private AmqpTemplate delayAmqpTemplate;
public void recive() {
System.out.println("Received: " + delayAmqpTemplate.receiveAndConvert());
}
}
執行生產者操作
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import javax.annotation.Resource;
@Controller
@RequestMapping(value="/mqTest")
public class TestController {
@Resource(name = "producerService")
private ProducerService producerService;
@Resource(name = "consumerService")
private ConsumerService consumerService;
@ResponseBody
@RequestMapping(value="/producerTest")
public void producerTest() throws Exception {
String s = "我是生產者測試";
producerService.send(s);
}
@ResponseBody
@RequestMapping(value="/consumerTest")
public void consumerTest() throws Exception {
consumerService.recive();
}
}
通過監聽死信佇列的訊息來進行消費
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
public class DelayTask implements MessageListener {
@Override
public void onMessage(Message message) {
try {
String receivedMsg = new String(message.getBody(), "UTF-8");
System.out.println("Received : " + receivedMsg);
} catch (Exception e) {
e.printStackTrace();
}
}
}
參考:rubbitMq實戰指南
需要此書籍的pdf文件的可以留言,我私發郵箱