1. 程式人生 > >併發佇列-無界阻塞延遲佇列DelayQueue原理探究

併發佇列-無界阻塞延遲佇列DelayQueue原理探究

一、前言

DelayQueue佇列中每個元素都有個過期時間,並且佇列是個優先順序佇列,當從佇列獲取元素時候,只有過期元素才會出佇列。

二、 DelayQueue類圖結構

image.png

如圖DelayQueue中內部使用的是PriorityQueue存放資料,使用ReentrantLock實現執行緒同步,可知是阻塞佇列。另外佇列裡面的元素要實現Delayed介面,一個是獲取當前剩餘時間的介面,一個是元素比較的介面,因為這個是有優先順序的佇列。

三、offer操作

插入元素到佇列,主要插入元素要實現Delayed介面。


public boolean offer(E e) {
    final ReentrantLock lock = this
.lock; lock.lock(); try { q.offer(e); if (q.peek() == e) {(2) leader = null; available.signal(); } return true; } finally { lock.unlock(); } }

首先獲取獨佔鎖,然後新增元素到優先順序佇列,由於q是優先順序佇列,所以新增元素後,peek並不一定是當前新增的元素,如果(2)為true,說明當前元素e的優先順序最小也就即將過期的,這時候啟用avaliable變數條件佇列裡面的執行緒,通知他們佇列裡面有元素了。

四、take操作

獲取並移除佇列首元素,如果佇列沒有過期元素則等待。


    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                //獲取但不移除隊首元素(1)
                E first = q.peek();
                if (first == null
) available.await();//(2) else { long delay = first.getDelay(TimeUnit.NANOSECONDS); if (delay <= 0)//(3) return q.poll(); else if (leader != null)//(4) available.await(); else { Thread thisThread = Thread.currentThread(); leader = thisThread;//(5) try { available.awaitNanos(delay); } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && q.peek() != null)//(6) available.signal(); lock.unlock(); } }

第一次呼叫take時候由於佇列空,所以呼叫(2)把當前執行緒放入available的條件佇列等待,當執行offer並且新增的元素就是隊首元素時候就會通知最先等待的執行緒啟用,迴圈重新獲取隊首元素,這時候first假如不空,則呼叫getdelay方法看該元素海剩下多少時間就過期了,如果delay<=0則說明已經過期,則直接出隊返回。否者看leader是否為null,不為null則說明是其他執行緒也在執行take則把該執行緒放入條件佇列,否者是當前執行緒執行的take方法,則呼叫(5)await直到剩餘過期時間到(這期間該執行緒會釋放鎖,所以其他執行緒可以offer新增元素,也可以take阻塞自己),剩餘過期時間到後,該執行緒會重新競爭得到鎖,重新進入迴圈。

(6)說明當前take返回了元素,如果當前佇列還有元素則呼叫singal啟用條件佇列裡面可能有的等待執行緒。leader那麼為null,那麼是第一次呼叫take獲取過期元素的執行緒,第一次呼叫的執行緒呼叫設定等待時間的await方法等待資料過期,後面呼叫take的執行緒則呼叫await直到signal。

五、poll操作

獲取並移除隊頭過期元素,否者返回null

    public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            E first = q.peek();
           //如果佇列為空,或者不為空但是隊頭元素沒有過期則返回null
            if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
                return null;
            else
                return q.poll();
        } finally {
            lock.unlock();
        }
    }

六、一個例子

class DelayedEle implements Delayed {

    private final long delayTime; //延遲時間
    private final long expire;  //到期時間
    private String data;   //資料

    public DelayedEle(long delay, String data) {
        delayTime = delay;
        this.data = data;
        expire = System.currentTimeMillis() + delay; 
    }

    /**
     * 剩餘時間=到期時間-當前時間
     */
    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(this.expire - System.currentTimeMillis() , TimeUnit.MILLISECONDS);
    }

    /**
     * 優先佇列裡面優先順序規則
     */
    @Override
    public int compareTo(Delayed o) {
        return (int) (this.getDelay(TimeUnit.MILLISECONDS) -o.getDelay(TimeUnit.MILLISECONDS));
    }

    @Override
    public String toString() {
        final StringBuilder sb = new StringBuilder("DelayedElement{");
        sb.append("delay=").append(delayTime);
        sb.append(", expire=").append(expire);
        sb.append(", data='").append(data).append('\'');
        sb.append('}');
        return sb.toString();
    }
}

public static void main(String[] args) {


        DelayQueue<DelayedEle> delayQueue = new DelayQueue<DelayedEle>();

    DelayedEle element1 = new DelayedEle(1000,"zlx");
    DelayedEle element2 = new DelayedEle(1000,"gh");

    delayQueue.offer(element1);
    delayQueue.offer(element2);

    element1 =  delayQueue.take();
    System.out.println(element1);

}

七、使用場景

  • TimerQueue的內部實現
  • ScheduledThreadPoolExecutor中DelayedWorkQueue是對其的優化使用

歡迎看官們拍磚,讓我們共同進步!


加多

加多

高階 Java 攻城獅 at 阿里巴巴加多,目前就職於阿里巴巴,熱衷併發程式設計、ClassLoader,Spring等開源框架,分散式RPC框架dubbo,springcloud等;愛好音樂,運動。微信公眾號:技術原始積累。知識星球賬號:技術原始積累