1. 程式人生 > >分散式延遲訊息佇列討論

分散式延遲訊息佇列討論

很多時候我們會有延時處理一個任務的需求,比如說:

  • 2個小時後給使用者傳送簡訊。
  • 15分鐘後關閉網路連線。
  • 2分鐘後再次嘗試回撥。

下面我們來分別探討一下幾種實現方案:

Java中的DelayQueue

Java中的DelayQueue位於java.util.concurrent包下,本質是由PriorityQueue和BlockingQueue實現的阻塞優先順序佇列。

放入佇列的元素需要實現Delayed介面:

public interface Delayed extends Comparable<Delayed> {

    /**     * Returns the remaining delay associated with this object, in the
* given time unit. * * @param unit the time unit * @return the remaining delay; zero or negative values indicate * that the delay has already elapsed */ longgetDelay(TimeUnit unit); }

通過實現這個介面,來完成對佇列中元素,按照時間延遲先後排序的目的。

從佇列中取元素:

   /**     * Retrieves and removes the head of this queue, waiting if necessary
* until an element with an expired delay is available on this queue. * * @return the head of this queue * @throws InterruptedException {@inheritDoc} */ public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try
{ for (;;) { E first = q.peek(); if (first == null) available.await(); else { long delay = first.getDelay(TimeUnit.NANOSECONDS); if (delay <= 0) return q.poll(); else if (leader != null) available.await(); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { available.awaitNanos(delay); } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && q.peek() != null) available.signal(); lock.unlock(); } }

可以看到,在這段程式碼裡,在第一個元素的延遲時間還沒到的情況下:

  • 如果當前沒有其他執行緒等待,則阻塞當前執行緒直到延遲時間。
  • 如果有其他執行緒在等待,則阻塞當前執行緒。

向佇列中放入元素:

    /**     * Inserts the specified element into this delay queue.     *     * @param e the element to add     * @return <tt>true</tt>     * @throws NullPointerException if the specified element is null     */
    publicbooleanoffer(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            q.offer(e);
            if (q.peek() == e) {
                leader = null;
                available.signal();
            }
            return true;
        } finally {
            lock.unlock();
        }
    }

在放入元素的時候,會喚醒等待中的讀執行緒。

如果我們不考慮分散式執行和任務持久化的話,Java中的DelayQueue是一個很理想的方案,精巧好用。

但是如果我們需要分散式執行和任務持久化,就需要引入一些外部元件。

使用Redis實現

前文我們看到,可以通過優先順序佇列來實現延遲佇列的功能。

Redis提供了很多資料結構,其中的zset是一種有序的資料結構;我們可以通過Redis中的zset來實現一個延遲佇列。

基本的方法就是使用時間戳作為元素的score存入zset。

redis> ZADD delayqueue <future_timestamp> "messsage"

獲取所有已經“就緒”的message,並且刪除message。

redis> MULTI
redis> ZRANGEBYSCORE delayqueue 0 <current_timestamp>
redis> ZREMRANGEBYSCORE delayqueue 0 <current_timestamp>
redis> EXEC

但是這個方案也有一些問題:

Redis事務雖然保證了一致性和隔離性,但是並沒有提供回滾功能。訊息處理失敗是不能被恢復的,如果處理某條訊息的執行緒崩潰或機器宕機,這條未被處理不能被自動的再次處理。

也有考慮過將分為TODO和Doing兩條佇列:

先從TODO佇列中取出任務,放入Doing中,再開始處理;如果停留在Doing佇列總過久,則重新放入TODO佇列。

但是由於Redis的事務特性,並不能做到完全可靠;並且檢查Doing超時的邏輯也略複雜。

那麼有沒有一個成熟的訊息佇列可以支援延遲投遞訊息的功能呢?

答案當然是有的,本文的標題就是使用RabbitMQ實現DelayQueue。

使用RabbitMQ實現

這是RabbitMQ眾多隱藏的強大特性中的一個,可以輕鬆的降低程式碼的複雜度,實現DelayQueue的功能。

我們需要兩個佇列,一個用來做主佇列,真正的投遞訊息;另一個用來延遲處理訊息。

ConnectionFactory factory = new ConnectionFactory();
factory.setHost(host);
factory.setPort(port);
connection = factory.newConnection();
channel = connection.createChannel();

channel.queueDeclare("MAIN_QUEUE", true, false, false, null);
channel.queueBind("MAIN_QUEUE", "amq.direct", "MAIN_QUEUE");

HashMap<String, Object> arguments = new HashMap<String, Object>();
arguments.put("x-dead-letter-exchange", "amq.direct");
arguments.put("x-dead-letter-routing-key", "MAIN_QUEUE");
channel.queueDeclare("DELAY_QUEUE", true, false, false, arguments);

放入延遲訊息:

AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
AMQP.BasicProperties properties = builder.expiration(String.valueOf(task.getDelayMillis())).deliveryMode(2).build();
channel.basicPublish("", "DELAY_QUEUE", properties, SerializationUtils.serialize(task));

而關鍵點,就在於 x-dead-letter-exchange 和 x-dead-letter-routing-key 兩個引數上。這兩個引數說明了:訊息過期後的處理方式 --> 投遞到我們指定的MAIN_QUEUE;然後我們只需要在MAIN_QUEUE中等待訊息投遞即可。

RabbitMQ本身提供了訊息持久化和沒有收到ACK的重投遞功能,這樣我們就可以實現一個高可靠的分散式延遲訊息隊列了。

PS

上面講述的RabbitMQ定時任務方案有問題,RabbitMQ TTL文件 中寫道:

Caveats

While consumers never see expired messages, only when expired messages reach the head of a queue will they actually be discarded (or dead-lettered). When setting a per-queue TTL this is not a problem, since expired messages are always at the head of the queue. When setting per-message TTL however, expired messages can queue up behind non-expired ones until the latter are consumed or expired. Hence resources used by such expired messages will not be freed, and they will be counted in queue statistics (e.g. the number of messages in the queue).

per-queue TTL不會有問題,因為快要過期的訊息總是在佇列的前邊;但是如果使用per-message TTL的話,過期的訊息有可能會在未過期的訊息後邊,直到前邊的訊息過期或者被消費。因為RabbitMQ保證過期的訊息一定不會被消費者消費,但是不能保證訊息過期就會從佇列中移除。

ActiveMQ

ActiveMQ from version 5.4 has an optional persistent scheduler built into the ActiveMQ message broker.

可以支援定時、延遲投遞、重複投遞和Cron排程。

在配置檔案中,啟用<broker ... schedulerSupport="true"> 選項後即可使用。

MessageProducer producer = session.createProducer(destination);
TextMessage message = session.createTextMessage("test msg");
long delay = 30 * 1000;
long period = 10 * 1000;
int repeat = 9;
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period);
message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeat);
producer.send(message);
MessageProducer producer = session.createProducer(destination);
TextMessage message = session.createTextMessage("test msg");
message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "0 * * * *");
producer.send(message);

PS

由於ActiveMQ採用的是類似於Java中DelayQueue的方式,通過先將訊息排序再定時觸發的方式來實現延遲訊息。在往佇列中投遞大量(10w+)定時訊息之後,ActiveMQ的效能將會變得接近不可用,大量的訊息擠壓得不到投遞。

其他一些可能的實現方式

  • RocketMQ 支援定時訊息,但是不支援任意時間精度,支援特定的level,例如定時5s,10s,1m等。

  • 通過MySQL等資料庫記錄訊息應該被投遞的時間,然後迴圈進行查詢,並把當前時間應該投遞的訊息放入普通的訊息佇列。

參考資料

https://www.cnblogs.com/yx1989/p/7000503.html