1. 程式人生 > 實用技巧 >DelayQueue延時佇列

DelayQueue延時佇列

一、DelayQueue是什麼

  DelayQueue是一個無界的BlockingQueue,用於放置實現了Delayed介面的物件,其中的物件只能在其到期時才能從佇列中取走。這種佇列是有序的,即隊頭物件的延遲到期時間最長。注意:不能將null元素放置到這種佇列中。

二、DelayQueue能做什麼

 1. 淘寶訂單業務:下單之後如果三十分鐘之內沒有付款就自動取消訂單。
 2. 餓了嗎訂餐通知:下單成功後60s之後給使用者傳送簡訊通知。

 3.關閉空閒連線。伺服器中,有很多客戶端的連線,空閒一段時間之後需要關閉之。

 4.快取。快取中的物件,超過了空閒時間,需要從快取中移出。

 5.任務超時處理。在網路協議滑動視窗請求應答式互動時,處理超時未響應的請求等。

三、例項展示

 定義元素類,作為佇列的元素

 DelayQueue只能新增(offer/put/add)實現了Delayed介面的物件,意思是說我們不能想往DelayQueue裡新增什麼就新增什麼,不能新增int、也不能新增String進去,必須新增我們自己的實現了Delayed介面的類的物件,來程式碼:

/**
 *  compareTo 方法必須提供與 getDelay 方法一致的排序
 */
class MyDelayedTask implements Delayed{

    private String name ;
    private long start = System.currentTimeMillis();
    private long time ;

    public MyDelayedTask(String name,long time) {
        this.name = name;
        this.time = time;
    }

    /**
     * 需要實現的介面,獲得延遲時間   用過期時間-當前時間
     * @param unit
     * @return
     */
    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert((start+time) - System.currentTimeMillis(),TimeUnit.MILLISECONDS);
    }

    /**
     * 用於延遲佇列內部比較排序   當前時間的延遲時間 - 比較物件的延遲時間
     * @param o
     * @return
     */
    @Override
    public int compareTo(Delayed o) {
        MyDelayedTask o1 = (MyDelayedTask) o;
        return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
    }

    @Override
    public String toString() {
        return "MyDelayedTask{" +
                "name='" + name + '\'' +
                ", time=" + time +
                '}';
    }
}

  其中,compareTo 方法 getDelay 方法 就是Delayed介面的方法,我們必須實現,而且按照JAVASE文件,compareTo 方法必須提供與 getDelay 方法一致的排序,也就是說compareTo方法裡可以按照getDelay方法的返回值大小排序,即在compareTo方法裡比較getDelay方法返回值大小

寫main方法測試

  定義一個DelayQueue,新增幾個元素,while迴圈獲取元素

private static DelayQueue delayQueue  = new DelayQueue();
    public static void main(String[] args) throws InterruptedException {

        new Thread(new Runnable() {
            @Override
            public void run() {

                delayQueue.offer(new MyDelayedTask("task1",10000));
                delayQueue.offer(new MyDelayedTask("task2",3900));
                delayQueue.offer(new MyDelayedTask("task3",1900));
                delayQueue.offer(new MyDelayedTask("task4",5900));
                delayQueue.offer(new MyDelayedTask("task5",6900));
                delayQueue.offer(new MyDelayedTask("task6",7900));
                delayQueue.offer(new MyDelayedTask("task7",4900));

            }
        }).start();

        while (true) {
            Delayed take = delayQueue.take();
            System.out.println(take);
        }
    }

執行結果

1 2 3 4 5 6 7 MyDelayedTask{name='task3', time=1900} MyDelayedTask{name='task2', time=3900} MyDelayedTask{name='task7', time=4900} MyDelayedTask{name='task4', time=5900} MyDelayedTask{name='task5', time=6900} MyDelayedTask{name='task6', time=7900} MyDelayedTask{name='task1', time=10000}

 DelayQueue屬於排序佇列,它的特殊之處在於佇列的元素必須實現Delayed介面,該介面需要實現compareTo和getDelay方法。

static class Task implements Delayed{
        @Override
                //比較延時,佇列裡元素的排序依據
        public int compareTo(Delayed o) {
            return 0;
        }
        
        @Override
                //獲取剩餘時間
        public long getDelay(TimeUnit unit) {
            return 0;
        }
    }

  元素進入佇列後,先進行排序,然後,只有getDelay也就是剩餘時間為0的時候,該元素才有資格被消費者從佇列中取出來,所以建構函式一般都有一個時間傳入。

具體另一個例項:

import java.sql.Time;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

public class Delayquue {

    public static void main(String[] args) throws Exception {
        BlockingQueue<Task> delayqueue = new DelayQueue<>();
        long now = System.currentTimeMillis();
        delayqueue.put(new Task(now+3000));
        delayqueue.put(new Task(now+4000));
        delayqueue.put(new Task(now+6000));
        delayqueue.put(new Task(now+1000));
        System.out.println(delayqueue);
        
        for(int i=0; i<4; i++) {
            System.out.println(delayqueue.take());
        }
        
    }
    
    static class Task implements Delayed{
        long time = System.currentTimeMillis();
        public Task(long time) {
            this.time = time;
        }
        @Override
        public int compareTo(Delayed o) {
            if(this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS))
                return -1;
            else if(this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS)) 
                return 1;
            else 
                return 0;
        }

        @Override
        public long getDelay(TimeUnit unit) {
            return unit.convert(time - System.currentTimeMillis(),TimeUnit.MILLISECONDS);
        }
        @Override
        public String toString() {
            return "" + time;
        }
    }
}

輸出結果:

  可以看出來,每隔一段時間就會輸出一個元素,這個間隔時間就是由建構函式定義的秒數來決定的。


原理分析:

 內部結構

  • 可重入鎖
  • 用於根據delay時間排序的優先順序佇列
  • 用於優化阻塞通知的執行緒元素leader
  • 用於實現阻塞和通知的Condition物件

delayed和PriorityQueue

 在理解delayQueue原理之前我們需要先了解兩個東西,delayed和PriorityQueue.

  • delayed是一個具有過期時間的元素
  • PriorityQueue是一個根據佇列裡元素某些屬性排列先後的順序佇列

  delayQueue其實就是在每次往優先順序佇列中新增元素,然後以元素的delay/過期值作為排序的因素,以此來達到先過期的元素會拍在隊首,每次從佇列裡取出來都是最先要過期的元素

offer方法

  1. 執行加鎖操作
  2. 吧元素新增到優先順序佇列中
  3. 檢視元素是否為隊首
  4. 如果是隊首的話,設定leader為空,喚醒所有等待的佇列
  5. 釋放鎖
程式碼如下:
public boolean offer(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            q.offer(e);
            if (q.peek() == e) {
                leader = null;
                available.signal();
            }
            return true;
        } finally {
            lock.unlock();
        }
    }

take方法

  1. 執行加鎖操作
  2. 取出優先順序佇列元素q的隊首
  3. 如果元素q的隊首/佇列為空,阻塞請求
  4. 如果元素q的隊首(first)不為空,獲得這個元素的delay時間值
  5. 如果first的延遲delay時間值為0的話,說明該元素已經到了可以使用的時間,呼叫poll方法彈出該元素,跳出方法
  6. 如果first的延遲delay時間值不為0的話,釋放元素first的引用,避免記憶體洩露
  7. 判斷leader元素是否為空,不為空的話阻塞當前執行緒
  8. 如果leader元素為空的話,把當前執行緒賦值給leader元素,然後阻塞delay的時間,即等待隊首到達可以出隊的時間,在finally塊中釋放leader元素的引用
  9. 迴圈執行從1~8的步驟
  10. 如果leader為空並且優先順序佇列不為空的情況下(判斷還有沒有其他後續節點),呼叫signal通知其他的執行緒
  11. 執行解鎖操作
public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                E first = q.peek();
                if (first == null)
                    available.await();
                else {
                    long delay = first.getDelay(NANOSECONDS);
                    if (delay <= 0)
                        return q.poll();
                    first = null; // don't retain ref while waiting
                    if (leader != null)
                        available.await();
                    else {
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            available.awaitNanos(delay);
                        } finally {
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            if (leader == null && q.peek() != null)
                available.signal();
            lock.unlock();
        }
    }  

get點

 整個程式碼的過程中並沒有使用上太難理解的地方,但是有幾個比較難以理解他為什麼這麼做的地方

leader元素的使用

 大家可能看到在我們的DelayQueue中有一個Thread型別的元素leader,那麼他是做什麼的呢,有什麼用呢?

 讓我們先看一下元素註解上的doc描述:

Thread designated to wait for the element at the head of the queue.
This variant of theLeader-Follower patternserves to minimize unnecessary timed waiting.
when a thread becomes the leader, it waits only for the next delay to elapse, but other threads await indefinitely.
The leader thread must signal some other thread before returning from take() or poll(...), unless some other thread becomes leader in the interim.
Whenever the head of the queue is replaced with an element with an earlier expiration time, the leader field is invalidated by being reset to null, and some waiting thread, but not necessarily the current leader, is signalled.
So waiting threads must be prepared to acquire and lose leadership while waiting.

 上面主要的意思就是說用leader來減少不必要的等待時間,那麼這裡我們的DelayQueue是怎麼利用leader來做到這一點的呢:

 這裡我們想象著我們有個多個消費者執行緒用take方法去取,內部先加鎖,然後每個執行緒都去peek第一個節點.
 如果leader不為空說明已經有執行緒在取了,設定當前執行緒等待

if (leader != null)
   available.await();

 如果為空說明沒有其他執行緒去取這個節點,設定leader並等待delay延時到期,直到poll後結束迴圈

     else {
         Thread thisThread = Thread.currentThread();
         leader = thisThread;
         try {
              available.awaitNanos(delay);
         } finally {
              if (leader == thisThread)
                  leader = null;
         }
     }

take方法中為什麼釋放first元素

first = null; // don't retain ref while waiting

 我們可以看到doug lea後面寫的註釋,那麼這段程式碼有什麼用呢?

 想想假設現在延遲佇列裡面有三個物件。

  • 執行緒A進來獲取first,然後進入 else 的else ,設定了leader為當前執行緒A
  • 執行緒B進來獲取first,進入else的阻塞操作,然後無限期等待
  • 這時在JDK 1.7下面他是持有first引用的
  • 如果執行緒A阻塞完畢,獲取物件成功,出隊,這個物件理應被GC回收,但是他還被執行緒B持有著,GC鏈可達,所以不能回收這個first.
  • 假設還有執行緒C 、D、E.. 持有物件1引用,那麼無限期的不能回收該物件1引用了,那麼就會造成記憶體洩露.

連結:   https://www.jianshu.com/p/e0bcc9eae0ae   https://www.jianshu.com/p/bf9f6b08ba5b   https://blog.csdn.net/toocruel/article/details/82769595