RabbitMQ 實現延遲佇列
原文地址:RabbitMQ 實現延遲佇列
部落格地址:http://www.extlight.com
一、背景
最近開發一個活動功能,需要在活動結束後給榜單 Top10 使用者發放獎勵。由於活動的榜單是通過 RabbitMQ 進行非同步統計分值排名的,因此在活動結束時佇列中可能還存在訊息未消費完全,排名不準確,此時發放活動獎勵必然會出錯。
那麼,如果解決這個問題呢? 與產品經理協商,允許延遲 10 分鐘發放獎勵。目前有 2 個方案:
使用定時器:判斷當前時間與活動結束時間的時間差,如果 >= 10分鐘就發放獎勵 -- 缺點:除了需要排程執行緒,還需要定期訪問資料庫獲取活動結束時間來判斷,這樣既浪費資源也不優雅 使用 RabbitMQ 延遲佇列 -- 優點:既能滿足需求也規避定時器實現方案的缺點
因此,最終選定 RabbitMQ 的延遲佇列實現方案。但是,RabbitMQ 沒有直接提供延遲佇列我們該如何實現呢?請繼續閱讀下文。
二、TTL
TTL 全稱 Time To Live(存活時間/過期時間),當訊息到達存活時間後,還沒有被消費,會被自動清除。
RabbitMQ 提供 2 種方式給訊息設定過期時間:
設定佇列過期時間引數:x-message-ttl,單位:ms(毫秒),會對整個佇列訊息統一過期
呼叫訊息過期時間方法,設定過期時間:expiration。單位:ms(毫秒),當該訊息在佇列頭部時(消費時),會單獨判斷這一訊息是否過期
如果兩者都進行了設定,以時間短的為準
TTL實戰
@Bean public Queue ttlQueue() { Map<String,Object> map = new HashMap<>(1); // 設定佇列過期引數 map.put("x-message-ttl", 10000); return new Queue("ttl.queue", true, false, false, map); } @Bean public DirectExchange ttlDirectExchange() { return new DirectExchange("ttl.direct.exchange", true, false); } @Bean public Binding ttlDirectBinding() { return BindingBuilder .bind(ttlQueue()) .to(ttlDirectExchange()) .with("ttl"); }
測試類:
@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitmqTestApplicationTests {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void test() throws Exception {
for (int i = 0; i < 10; i++) {
this.rabbitTemplate.convertAndSend("ttl.direct.exchange", "ttl", "hello ttl", new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
// 呼叫訊息過期方法
message.getMessageProperties().setExpiration("5000");
return message;
}
});
}
}
}
測試結果如下圖:
ttl.queue 佇列建立後,可以看出 ttl 屬性為 10000(10秒),裡邊的的 10 條訊息在 5 秒後被清除了。
三、死信佇列
DLX 全稱 Dead Letter Exchange(死信交換機),當訊息成為死信 (Dead Message) 後,可以被重新發送到另一個交換機,這個交換機就是死信交換機,由於交換機是 RabbitMQ 特有的,通常我們把死信交換機也成為死信佇列。
原理圖如下:
要實現上邊的流程,我們需要解決 2 個問題:
佇列中的訊息怎麼成為死信:
--原佇列訊息長度到達限制
--原佇列存在訊息過期設定,訊息到達超時時間未被消費
--消費者拒接消費訊息,basicNack/basicReject,並且不把訊息重新放入原目標佇列,requeue=false
訊息成為死信後如何投遞到死信佇列中:
--給原佇列設定引數: x-dead-letter-exchange 和 x-dead-letter-routing-key
死信佇列實戰
@Bean
public Queue ttlQueue() {
Map<String,Object> map = new HashMap<>(1);
map.put("x-message-ttl", 10000);
// 設定訊息成為死信後,傳發到的路由器
map.put("x-dead-letter-exchange","dead.direct.exchange");
map.put("x-dead-letter-routing-key", "dead");
return new Queue("ttl.queue", true, false, false, map);
}
@Bean
public DirectExchange ttlDirectExchange() {
return new DirectExchange("ttl.direct.exchange", true, false);
}
@Bean
public Binding ttlDirectBinding() {
return BindingBuilder
.bind(ttlQueue())
.to(ttlDirectExchange())
.with("ttl");
}
// =======================以下為死信佇列相關配置=========================
@Bean
public Queue deadQueue() {
return new Queue("dead.queue", true);
}
@Bean
public DirectExchange deadDirectExchange() {
return new DirectExchange("dead.direct.exchange", true, false);
}
@Bean
public Binding deadDirectBinding() {
return BindingBuilder
.bind(deadQueue())
.to(deadDirectExchange())
.with("dead");
}
測試類程式碼不變,為了方便測試,我們這裡就不寫消費者程式碼。我們需要先把 ttl.queue 佇列刪除再執行程式碼。結果如下圖:
由圖可知,ttl.queue 佇列裡的訊息在 5 秒後轉移 dead.queue 佇列中了,其實這樣就已經實現了延遲佇列。
我們把需要實現的功能套用到上邊的案例中:活動結束後我們傳送一條有過期時間的訊息(10分鐘)到 ttl.queue 佇列中,該佇列不需要消費者。10分鐘後由於訊息沒被消費被轉發到死信佇列 dead.queue 佇列中,dead.queue 設定消費者,消費者用於執行發放活動獎勵。