1. 程式人生 > 實用技巧 >凡客傳說4-DelayQueue使用方式,原始碼分析,以及應用場景

凡客傳說4-DelayQueue使用方式,原始碼分析,以及應用場景

DelayQueue 顧名思義,它是一個延時佇列

使用方式 :

假設我們生產者提交一個任務,消費者5秒鐘之後才可以執行,那麼我們可以把任務定義為如下格式,並實現Delayed介面,其中data是任務儲存的資訊。

/**
 * 具體的任務
 * @author wangshixiang
 */
public class Task implements Delayed {
    /**
     * 資料
     */
    private final String data;
    /**
     * 任務執行時間
     */
    private final long time;

    public Task(String data,TimeUnit timeUnit,long time){
        this.data=data;
        this.time=System.currentTimeMillis()+timeUnit.toMillis(time);
    }

    @Override
    public long getDelay(TimeUnit unit) {
        long res= time-System.currentTimeMillis();
        return unit.convert(res,TimeUnit.MILLISECONDS);
    }

    public String getData() {
        return data;
    }

    @Override
    public int compareTo(Delayed o) {
        if (o instanceof Task ){
            Task task= (Task) o;
            return (int) (this.time-task.time);
        }
        return 0;
    }
}

定義好任務後,我們需要定義一個任務佇列 QUEUE_TASK,來儲存訊息,實現效果為程式執行後 五秒鐘後輸出Hello...

  private static final DelayQueue<Task> QUEUE_TASK =new DelayQueue<>();

    public static void main(String[] args) throws InterruptedException {
        QUEUE_TASK .add(new Task("Hello ... ", TimeUnit.SECONDS,5));
        System.out.println(QUEUE_TASK .take().getData());
    }

使用詳解

  1. Delayed 介面定義:
public interface DeDlayed extends Comparable<Delayed> {

 
    long getDelay(TimeUnit unit);
}

我們發現Delayed介面繼承了Comparable介面,並且有一個getDelay方法,在程式執行的過程中,會呼叫頭部任務的這個方法,來返回該任務具體還有多長時間可以執行。當我們任務實現這個介面時 可以儲存任務的執行時間,通過執行時間-當前時間 計算出距離執行時間的差值,因此我們Task定義了一個任務的變數,在建立物件時設定任務的執行時間。
2. DelayQueue 延時佇列
首先我們看一下DelayQueue類繼承實現結構圖

可以理解為 DelayQueue 是一個帶延遲執行功能的阻塞佇列

深入理解

  • 為什麼Delayed介面繼承了Comparable介面 ?
  • DelayQueue是怎麼實現只有到預定時間才能取出任務 ?
  • 向佇列裡放入一個任務時 發生了什麼事情 ?

帶著這幾個問題,我們來看一下DelayQueeu的原始碼 首先看一下主要的引數:

    //鎖
    private final transient ReentrantLock lock = new ReentrantLock();
    //優先順序佇列 執行時間最早的排在第一個
    private final PriorityQueue<E> q = new PriorityQueue<E>();
    //是否有執行緒在等待任務到執行時間
    private Thread leader;
    //條件喚醒
    private final Condition available = lock.newCondition();

那麼我們先看add(E e)方法 ,任務入佇列時做了哪些操作

    public boolean add(E e) {
        return offer(e);
    }
    public boolean offer(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            q.offer(e);
            if (q.peek() == e) {
                leader = null;
                available.signal();
            }
            return true;
        } finally {
            lock.unlock();
        }
    }

入佇列時做了一下步驟:

  1. 獲取鎖
  2. 放入元素 (放入優先順序佇列)
  3. 如果自己排在第一個 則原來標記的leader執行緒已經失效 直接設定為null,並喚醒消費者
  4. 釋放鎖

接下來在看出佇列時take()方法做了哪些操作

public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                E first = q.peek();
                if (first == null)
                    available.await();
                else {
                    long delay = first.getDelay(NANOSECONDS);
                    if (delay <= 0L)
                        return q.poll();
                    first = null; // don't retain ref while waiting
                    if (leader != null)
                        available.await();
                    else {
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            available.awaitNanos(delay);
                        } finally {
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            if (leader == null && q.peek() != null)
            //我拿到元素了 喚醒其他的執行緒
                available.signal();
            lock.unlock();
        }
    }

出佇列做了如下步驟:

  1. 獲取鎖(可中斷的鎖 獲取這種鎖允許其他執行緒中斷此執行緒)
  2. 取出第一個元素 如果第一個元素為空 則直接 await(),等待被喚醒(如放佇列時的喚醒)
  3. 如果第一個元素不為空,檢視是否到執行時間,如果沒有到執行時間 檢視是否有leader已經注意到這個任務 如果他注意到這個任務 我直接await()。如果沒人注意,那麼我就把自己設定為leader然後設定帶時間的await()。
  4. 睡眠到執行時間後 醒來後檢視leader是否還是自己 如果是的話 取消自己的leader身份。然後在嘗試獲取任務。
  5. 如果我獲取到了符合要求的元素,那麼我應該喚醒大家 來一塊競爭獲取下一個元素。

帶時間的出佇列方法 E poll(long timeout, TimeUnit unit) 的實現邏輯與take()方法的唯一區別就是。只有當自己剩餘等待時間大於第一個元素剩餘執行時間時 才允許把自己設定為leader

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                E first = q.peek();
                if (first == null) {
                    if (nanos <= 0L)
                        return null;
                    else
                        //睡眠等待時間 有可能提前返回 那麼返回的是剩餘等待時間
                        nanos = available.awaitNanos(nanos);
                } else {
                    long delay = first.getDelay(NANOSECONDS);
                    if (delay <= 0L)
                        return q.poll();
                    if (nanos <= 0L)
                        return null;
                    first = null; // don't retain ref while waiting
                    if (nanos < delay || leader != null)
                     //如果剩餘等待時間比第一個元素剩餘執行時間還短 那麼應該睡剩餘等待時間
                        nanos = available.awaitNanos(nanos);
                    else {
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            long timeLeft = available.awaitNanos(delay);
                            //計算剩餘等待時間 
                            nanos -= delay - timeLeft;
                        } finally {
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            if (leader == null && q.peek() != null)
                available.signal(http://www.jintianxuesha.com/);
            lock.unlock();
        }
    }

應用場景:

在大多數業務場景中,我們會利用中介軟體提供的延時訊息的功能。比如利用redis zset實現 ,kafka rabbit mq 的延時佇列。我們需要根據我們的業務場景,來選擇合適的中介軟體。

  1. 訂單超時未支付取消.
  2. 呼叫其他系統時失敗間隔重試.
  3. 呼叫第三方介面時,過段時間非同步獲取結果。