1. 程式人生 > 實用技巧 >RabbitMQ 實現延遲佇列

RabbitMQ 實現延遲佇列

原文地址:RabbitMQ 實現延遲佇列
部落格地址:http://www.extlight.com

一、背景

最近開發一個活動功能,需要在活動結束後給榜單 Top10 使用者發放獎勵。由於活動的榜單是通過 RabbitMQ 進行非同步統計分值排名的,因此在活動結束時佇列中可能還存在訊息未消費完全,排名不準確,此時發放活動獎勵必然會出錯。

那麼,如果解決這個問題呢? 與產品經理協商,允許延遲 10 分鐘發放獎勵。目前有 2 個方案:

使用定時器:判斷當前時間與活動結束時間的時間差,如果 >= 10分鐘就發放獎勵
    -- 缺點:除了需要排程執行緒,還需要定期訪問資料庫獲取活動結束時間來判斷,這樣既浪費資源也不優雅

使用 RabbitMQ 延遲佇列
    -- 優點:既能滿足需求也規避定時器實現方案的缺點

因此,最終選定 RabbitMQ 的延遲佇列實現方案。但是,RabbitMQ 沒有直接提供延遲佇列我們該如何實現呢?請繼續閱讀下文。

二、TTL

TTL 全稱 Time To Live(存活時間/過期時間),當訊息到達存活時間後,還沒有被消費,會被自動清除。

RabbitMQ 提供 2 種方式給訊息設定過期時間:

設定佇列過期時間引數:x-message-ttl,單位:ms(毫秒),會對整個佇列訊息統一過期

呼叫訊息過期時間方法,設定過期時間:expiration。單位:ms(毫秒),當該訊息在佇列頭部時(消費時),會單獨判斷這一訊息是否過期

如果兩者都進行了設定,以時間短的為準

TTL實戰

@Bean
public Queue ttlQueue() {
	Map<String,Object> map = new HashMap<>(1);
	// 設定佇列過期引數
    map.put("x-message-ttl", 10000);
	return new Queue("ttl.queue", true, false, false, map);
}

@Bean
public DirectExchange ttlDirectExchange() {
	return new DirectExchange("ttl.direct.exchange", true, false);
}

@Bean
public Binding ttlDirectBinding() {
  return BindingBuilder
  		.bind(ttlQueue())
  		.to(ttlDirectExchange())
  		.with("ttl");
}

測試類:

@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitmqTestApplicationTests {

	@Autowired
	private RabbitTemplate rabbitTemplate;
	
	@Test
	public void test() throws Exception {
		for (int i = 0; i < 10; i++) {
			this.rabbitTemplate.convertAndSend("ttl.direct.exchange", "ttl", "hello ttl", new MessagePostProcessor() {
				
				@Override
				public Message postProcessMessage(Message message) throws AmqpException {
				    // 呼叫訊息過期方法
					message.getMessageProperties().setExpiration("5000");
					return message;
				}
			});
		}
	}
}

測試結果如下圖:

ttl.queue 佇列建立後,可以看出 ttl 屬性為 10000(10秒),裡邊的的 10 條訊息在 5 秒後被清除了。

三、死信佇列

DLX 全稱 Dead Letter Exchange(死信交換機),當訊息成為死信 (Dead Message) 後,可以被重新發送到另一個交換機,這個交換機就是死信交換機,由於交換機是 RabbitMQ 特有的,通常我們把死信交換機也成為死信佇列。

原理圖如下:

要實現上邊的流程,我們需要解決 2 個問題:

佇列中的訊息怎麼成為死信:
    --原佇列訊息長度到達限制
    --原佇列存在訊息過期設定,訊息到達超時時間未被消費
    --消費者拒接消費訊息,basicNack/basicReject,並且不把訊息重新放入原目標佇列,requeue=false

訊息成為死信後如何投遞到死信佇列中:
    --給原佇列設定引數: x-dead-letter-exchange 和 x-dead-letter-routing-key

死信佇列實戰

@Bean
public Queue ttlQueue() {
	Map<String,Object> map = new HashMap<>(1);
    map.put("x-message-ttl", 10000);
    // 設定訊息成為死信後,傳發到的路由器
    map.put("x-dead-letter-exchange","dead.direct.exchange");
    map.put("x-dead-letter-routing-key", "dead");
	return new Queue("ttl.queue", true, false, false, map);
}

@Bean
public DirectExchange ttlDirectExchange() {
	return new DirectExchange("ttl.direct.exchange", true, false);
}

@Bean
public Binding ttlDirectBinding() {
  return BindingBuilder
  		.bind(ttlQueue())
  		.to(ttlDirectExchange())
  		.with("ttl");
}
	
// =======================以下為死信佇列相關配置=========================	
	
@Bean
public Queue deadQueue() {
	return new Queue("dead.queue", true);
}

@Bean
public DirectExchange deadDirectExchange() {
	return new DirectExchange("dead.direct.exchange", true, false);
}

@Bean
public Binding deadDirectBinding() {
  return BindingBuilder
  		.bind(deadQueue())
  		.to(deadDirectExchange())
  		.with("dead");
}

測試類程式碼不變,為了方便測試,我們這裡就不寫消費者程式碼。我們需要先把 ttl.queue 佇列刪除再執行程式碼。結果如下圖:

由圖可知,ttl.queue 佇列裡的訊息在 5 秒後轉移 dead.queue 佇列中了,其實這樣就已經實現了延遲佇列。

我們把需要實現的功能套用到上邊的案例中:活動結束後我們傳送一條有過期時間的訊息(10分鐘)到 ttl.queue 佇列中,該佇列不需要消費者。10分鐘後由於訊息沒被消費被轉發到死信佇列 dead.queue 佇列中,dead.queue 設定消費者,消費者用於執行發放活動獎勵。

四、參考資料

官方文件 ttl

官方文件 dlx