1. 程式人生 > 實用技巧 >延時佇列 DelayQueue

延時佇列 DelayQueue

當用戶超時未支付時,給使用者發提醒訊息。另一種場景是,超時未付款,訂單自動取消。通常,訂單建立的時候可以向延遲佇列種插入一條訊息,到時間自動執行。其實,也可以用臨時表,把這些未支付的訂單放到一個臨時表中,然後定時任務去掃描。這裡我們用延時佇列來做。RocketMQ有延時佇列,RibbitMQ也可以實現,Java自帶的也有延時佇列,接下來就回顧一下各種佇列。

Queue

佇列是一種集合。除了基本的集合操作以外,佇列還提供了額外的插入、提取和檢查操作。佇列的每個方法都以兩種形式存在:一種是當操作失敗時拋異常,另一種是返回一個特定的值(null或者false,取決於具體操作)。後一種形式的插入操作是專門設計用於有界佇列實現的,在大多情況下,插入操作不會失敗。

佇列通常(但不一定)以FIFO(先進先出)的方式對元素進行排序。例外情況包括優先順序佇列(根據提供的比較器對元素進行排序或元素的自然排序)和LIFO佇列(或堆疊),對LIFO進行排序(後進先出)。無論使用哪種順序,佇列的開頭都是該元素,可以通過呼叫remove()或poll()將其刪除。在FIFO佇列中,所有新元素都插入佇列的尾部。其他種類的佇列可能使用不同的放置規則。 每個Queue實現必須指定其排序屬性。無論使用哪種順序,都可以通過呼叫remove()或poll()來刪除佇列開頭的元素。在FIFO佇列中,所有新元素都插入到佇列的尾部。其他型別的佇列可能使用不同的放置規則。每個佇列實現都必須指定其排序屬性。

offer方法在可以的情況下會向佇列種插入一個元素,否則返回false。這不同於Collection.add方法,後者只能通過拋異常來新增元素。offer方法設計用於在正常情況下(而不是在例外情況下)發生故障時,例如在固定容量(或者“有界”)佇列種使用。

remove()和poll()方法刪除並返回隊頭元素。當佇列為空時,remove()丟擲異常,而poll()返回null。

element()和peek()方法返回隊頭元素。

PriorityQueue

PriorityQueue是一個無界優先順序佇列是基於優先順序堆的。優先順序佇列種的元素根據自然順序進行排序,或者通過在佇列構建時提供的Comparator進行排序,當然這取決於使用哪種建構函式。優先順序佇列不允許空(null)元素。一個依賴自然順序的優先順序佇列也不允許插入不可比較的物件。

優先順序佇列的隊頭元素是最小的元素,如果有多個元素並列最小,那麼隊頭是它們其中之一。

優先順序佇列是無界的,但是有一個內部容量來控制用於在佇列上儲存元素的陣列的大小。它總是至少與佇列大小一樣大。將元素新增到優先順序佇列時,其容量會自動增長。

BlockingQueue

這種佇列還支援以下操作:在檢索元素時等待佇列變為非空,並在儲存元素時等待佇列中的空間變為可用。

BlockingQueue方法有四種形式,它們以不同的方式處理操作,這些操作無法立即滿足,但將來可能會滿足:一種丟擲異常,第二種返回特殊值(null或false,取決於具體操作),第三種阻塞當前執行緒,直到操作成功為止;第四種阻塞當前執行緒,超時則放棄。 下表總結了這些方法:

阻塞佇列不接受空元素,如果你試圖add , put 或者 offer 一個null,將會拋NullPointerException。

阻塞佇列是執行緒安全的。所有排隊方法都使用內部鎖或者其他形式的併發控制來保證以原子方式實現它們的效果。

阻塞佇列被設計主要用於生產者-消費者佇列。

下面是一個典型的生產者-消費者方案:

package com.example;

import java.text.MessageFormat;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

/**
 * @author ChengJianSheng
 * @date 2020/12/15
 */
public class Setup {
    public static void main(String[] args) {
        BlockingQueue<Bread> queue = new ArrayBlockingQueue<>(5);

        Producer p1 = new Producer(queue);
        Producer p2 = new Producer(queue);
        Consumer c1 = new Consumer(queue);
        Consumer c2 = new Consumer(queue);

        new Thread(p1, "p1").start();
        new Thread(p2, "p2").start();
        new Thread(c1, "c1").start();
        new Thread(c2, "c2").start();
    }
}

class Bread {

}

/**
 * 生產者
 */
class Producer implements Runnable {

    private final BlockingQueue<Bread> queue;

    public Producer(BlockingQueue<Bread> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            while (true) {
                queue.put(produce());
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public Bread produce() {
        try {
            Thread.sleep(Math.round(2000));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return new Bread();
    }
}

/**
 * 消費者
 */
class Consumer implements Runnable {

    private final BlockingQueue<Bread> queue;

    public Consumer(BlockingQueue<Bread> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            while (true) {
                consume(queue.take());
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public void consume(Bread bread) {
        try {
            Thread.sleep(Math.round(2000));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

ArrayBlockingQueue

ArrayBlockingQueue是用陣列實現的有界阻塞佇列。這種佇列中的元素按FIFO(先進先出)排序。隊頭是在佇列中停留最長時間的元素。隊尾是在佇列中停留時間最短的元素。新元素插入到佇列的尾部,並且佇列檢索操作在佇列的頭部獲取元素。

這是一個經典的“有界緩衝區”,其中固定大小的陣列包含由生產者插入並由消費者提取的元素。 建立後,容量將無法更改。 試圖將一個元素放入一個已滿的佇列將導致操作阻塞; 試圖從空佇列中取出一個元素也會阻塞。

這個類支援一個可選的公平性策略,用於對等待的生產者和消費者執行緒進行排序。預設情況下,不保證這個順序。然而,將公平性設定為true的佇列將按FIFO順序授予執行緒訪問權。公平性通常會降低吞吐量,但會降低可變性並避免飢餓。

LinkedBlockingQueue

LinkedBlockingQueue是一個基於連結串列實現的可選邊界的阻塞佇列。

PriorityBlockingQueue

PriorityBlockingQueue是一個無界阻塞佇列,它使用與PriorityQueue相同的排序規則,並提供阻塞檢索操作。

DelayQueue

DelayQueue是一種由延遲元素組成的無界阻塞佇列,在該佇列中,僅當元素的延遲到期時才可以使用該元素。隊頭是已經過期的延遲元素,它已過期時間最長。如果沒有過期的延遲,則佇列沒有頭部,此時呼叫poll將返回null。當呼叫元素的getDelay(TimeUnit.NANOSECONDS)方法返回值小於或等於0時,就會發生過期。即使元素沒有過期,也不能用take或者poll將其刪除。

AbstractQueuedSynchronizer

AbstractQueuedSynchronizer提供了一個框架來實現依賴於先進先出(FIFO)等待佇列的阻塞鎖和相關的同步器(訊號燈,事件等)。該類旨在為大多數依賴單個原子int值表示狀態的同步器提供有用的基礎。子類必須定義更改此狀態的受保護方法,並定義該狀態對於獲取或釋放此物件而言意味著什麼。 鑑於這些,此類中的其他方法將執行所有排隊和阻塞機制。 子類可以維護其他狀態欄位,但是僅跟蹤關於同步的使用方法getState(),setState(int)和compareAndSetState(int,int)操作的原子更新的int值。

小結

1、Queue是一個集合,佇列的每個方法都有兩種形式,一種是拋異常,另一種是返回一個特定的值。

2、PriorityQueue是一個無界優先順序佇列,預設情況下,佇列種的元素按自然順序排序,或者根據提供的Comparator進行排序。也就是說,優先順序佇列種的元素都是經過排序的,排序規則可以自己指定,同時佇列種的元素都必須是可排序的。

3、BlockingQueue是一個阻塞佇列,向已滿的佇列種插入元素時會阻塞,向空佇列中取元素時也會阻塞;阻塞佇列被設計主要用於生產者-消費者佇列。

4、ArrayBlockingQueue是用陣列實現的有界阻塞佇列,佇列種的元素按FIFO(先進先出)排序。

5、LinkedBlockingQueue是用連結串列實現的可選邊界的阻塞佇列。

6、PriorityBlockingQueue相當於是阻塞佇列和優先順序佇列的合體,排序規則與優先順序佇列相同。

7、DelayQueue延時佇列中的元素都有一個有效期,只有當過了有效期才可以使用該元素。