1. 程式人生 > 其它 >訊息佇列 - 延時訊息應用解析及實踐

訊息佇列 - 延時訊息應用解析及實踐

技術標籤:訊息佇列佇列

前言

訊息佇列服務相信大家一定都不陌生了,在很多應用系統中,都有一些場景會使用到訊息佇列服務,簡單來說,我們可以把訊息佇列比作是一個存放訊息的容器,上游傳送端將訊息傳送到訊息佇列,下游消費端從訊息佇列裡消費訊息。訊息佇列是分散式系統中重要的元件,核心作用可以幫助我們實現非同步、解耦以及削峰,從而提高系統性能和穩定性。

在大部分場景下業務系統如果只需要實現非同步解耦、削峰填谷等能力,常規的普通訊息就可以滿足此類需求。除此之外,在某些特殊的業務場景中,普通訊息型別存在無法滿足需求的情況。這就需要訊息佇列服務本身支援一些特殊的訊息型別,或者開發者通過開發一些定製化的程式碼實現目的。這裡我們列舉在使用訊息佇列過程中幾種特殊場景的例子:

順序消費場景

生產者按照一定的先後順序釋出訊息,消費者按照既定的先後順序消費訊息,即先發布的訊息一定會先被客戶端消費。

分散式事務場景

分散式架構下,隨著系統的演進,資料庫也進行了垂直拆分,如果選擇使用訊息佇列進行上下游解耦的話,生產者和消費者需要保證資料一致性。

延時消費場景

生產者將訊息傳送到訊息佇列後,並不期望立馬投遞這條訊息,而是推遲到某個時間點之後將訊息投遞給消費者進行消費。

對於順序訊息和事務訊息,這裡就不進行詳細介紹了,大家有興趣可以自行研究,本文後續內容會和大家一起詳細討論下延時訊息更多的細節及應用場景。

延時訊息介紹

延時(定時)訊息的特點就是傳送者成功傳送一條訊息後,這條訊息並不會馬上被消費者消費,而是在某個特定的時間或者延遲一段時間後,訊息才被消費者可見並進行後續的消費,延時訊息整個生命週期可以用如下示意圖來表示:

  1. 訊息釋出者將一條延時訊息傳送到訊息佇列服務端;
  2. 在預計投遞時間未到之前,訊息對消費者不可見,消費者此時無法立刻消費;
  3. 投遞時間到達後,訊息才對消費者可見,消費者此時可以消費;
  4. 消費者獲取此條訊息並進行消費;
  5. 消費者成功消費後,進行確認,此條訊息將不再被消費。

延時訊息應用場景

交易場景

在生產者和消費者有時間視窗的要求下,我們可以考慮使用延時訊息。如在電商交易場景下,交易中超時未支付的訂單需要被關閉的場景,在訂單建立時會發送一條延時訊息。這條訊息將會在30分鐘以後投遞給消費者,消費者收到此訊息後,需要判斷對應的訂單是否已完成支付;如支付未完成,則關閉訂單。

遊戲場景

再比如在遊戲社群裡,遊戲運營方經常會發起一些活動,玩家在活動期間內按照規則完成一系列任務,活動時間截止後,遊戲後臺根據玩家完成任務的情況進行判定,傳送系統通知或者進行rank排名並派發獎勵等。

此種場景也可以採用延時訊息來實現,上游系統釋出活動公告後,同時傳送一條延時訊息,延時時間設定為活動週期的時間。當活動截止後,下游系統可以隨即消費訊息並進行相應的邏輯處理。

其他場景

同時延時訊息也可以廣泛應用於資訊提醒等比較通用的場景。

如何實現延時訊息

介紹完延時訊息的一些概念及應用場景後,我們接下來分析一下目前比較主流的幾款開源訊息中介軟體對延時訊息的支援情況以及實現方式。

Kafka

原生Kafka預設是不支援延時訊息的,需要開發者自己實現一層代理服務,比如傳送端將訊息傳送到延時Topic,代理服務消費延時Topic的訊息然後轉存起來,代理服務通過一定的演算法,計算延時訊息所附帶的延時時間是否到達,然後將延時訊息取出來併發送到實際的Topic裡面,消費端從實際的Topic裡面進行消費。

RabbitMQ

RabbitMQ實現延時訊息有兩種方案,第一種是採用rabbitmq-delayed-message-exchange 外掛實現,第二種則是利用DLX(Dead Letter Exchanges)+ TTL(訊息存活時間)來間接實現。大致的實現思路如下:

  1. 建立一個普通佇列delay_queue,為此佇列設定死信交換機 (通過x-dead-letter-exchange引數) 和 RoutingKey (通過x-dead-letter-routing-key引數),生產者將向delay_queue傳送延時訊息。
  2. 建立步驟1中設定的死信交換機,同時建立一個目標佇列 target_queue,並使用步驟1中設定的RoutingKey將兩者繫結起來。消費者將從target_queue裡面消費延時訊息。
  3. 設定訊息的存活時間TTL,可以在步驟1中設定到佇列級別delay_queue的訊息存活時間,或者在傳送訊息時動態設定訊息級別的存活時間。

RocketMQ

開源RocketMQ支援延遲訊息,但是不支援秒級精度。預設支援18個level的延遲訊息,這是通過broker端的messageDelayLevel配置項確定的
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

訊息佇列服務在啟動時,會建立一個內部topic:SCHEDULE_TOPIC_XXXX,根據延遲level的個數,建立對應數量的佇列。生產者傳送訊息時可以設定延時等級,示例程式碼:

Message msg=new Message();
msg.setTopic("TopicA");
msg.setBody("this is a delay message".getBytes());
//設定延遲level為5,對應延遲1分鐘
msg.setDelayTimeLevel(5);
producer.send(msg);

傳送的訊息會暫存在Broker對應的內部topic中,再通過定時任務從內部topic中拉取資料,如果延遲時間到了,就會把訊息轉發到目標topic下,消費者從目標topic消費訊息。

阿里雲訊息佇列RocketMQ版

通過上一章節的討論,我們可以看出目前幾款主流的開源訊息佇列服務,在支援延時訊息的場景下或多或少有些不完美的地方。主要體現在以下幾點:

  1. Kafka不支援延時訊息,需要完全開發代理服務來實現,工作量大。
  2. RabbitMQ需要額外的外掛,或者利用DLX+TTL的方式進行中轉,實現不是非常直觀。
  3. RocketMQ支援延時訊息,但是無法支援秒級延時。

那麼有沒有一款訊息佇列服務,能夠完美的支援延時(定時)訊息。本節我們將介紹阿里雲訊息佇列RocketMQ版。

阿里雲訊息佇列RocketMQ版基於Apache RocketMQ構建的低延遲、高併發、高可用、高可靠的分散式訊息中介軟體。訊息佇列RocketMQ版既可為分散式應用系統提供非同步解耦和削峰填谷的能力,同時也具備網際網路應用所需的海量訊息堆積、高吞吐、可靠重試等特性。同時支援豐富的訊息型別包括普通訊息、順序訊息、事務訊息以及我們本文討論的延時訊息。接下來我們看下阿里雲RocketMQ為延時訊息提供的能力及優勢:

  1. 支援秒級的延時(定時)訊息,同時延時時間可以最大設定為40天,基本滿足所有場景。
  2. 延時(定時)訊息的投遞精度可控制在1~2秒之內。
  3. 延時(定時)訊息在某段時間內是對消費者不可見的,從另一個維度看也屬於積壓的訊息,阿里雲訊息佇列RocketMQ版的不同例項規格可以支援億級的訊息積壓。
  4. 提供了多語言支援,包括Java、.NET、CC++、GO、Python、PHP、Node.js等

使用阿里雲訊息佇列RocketMQ版收發延時(定時)訊息,只需要在控制檯建立Topic的時候選擇定時/延時訊息型別,既可以使用TCP或者http協議進行訊息收發。

控制檯建立定時/延時Topic

Java語言示例程式碼(TCP協議)

  • 傳送定時訊息
// 定時訊息,單位毫秒(ms),在指定時間戳(當前時間之後)進行投遞,例如2020-03-07 16:21:00投遞。如果被設定成當前時間戳之前的某個時刻,訊息將立刻投遞給消費者。
long timeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2020-03-07 16:21:00").getTime();
msg.setStartDeliverTime(timeStamp);
// 傳送訊息,只要不拋異常就是成功。
SendResult sendResult = producer.send(msg);
  • 傳送延時訊息
// 延時訊息,單位毫秒(ms),在指定延遲時間(當前時間之後)進行投遞,例如訊息在3秒後投遞。
long delayTime = System.currentTimeMillis() + 3000;
// 設定訊息需要被投遞的時間。
msg.setStartDeliverTime(delayTime);
SendResult sendResult = producer.send(msg);

同時訂閱延時訊息的邏輯無需任何改造,完全可以按照訂閱普通訊息的方式,沒有任何的程式碼侵入性。

結束語

到此我們討論了延時訊息的特性、應用場景,對比了各類訊息佇列對延時訊息的支援情況,同時也向大家介紹了阿里雲訊息佇列RocketMQ版。我們在對訊息中介軟體進行選型時,也會考慮到多方面的因素。除了訊息中介軟體本身所能提供的能力外,也包括服務效能、穩定性、可擴充套件能力,以及需要結合開發團隊自身的技術棧等情況。

作者:阿里雲解決方案架構師 鹿玄

原文連結

本文為阿里雲原創內容,未經允許不得轉載。