1. 程式人生 > >DelayQueue阻塞佇列第二章:原始碼解析

DelayQueue阻塞佇列第二章:原始碼解析

DelayQueue阻塞佇列系列文章

介紹

DelayQueue是java併發包中提供的延遲阻塞佇列,業務場景一般是下單後多長時間過期,定時執行程式等

1-DelayQueue的組成結構

/**
 * DelayQueue佇列繼承了AbstractQueue,並且實現BlockingQueue的方法
 */
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
    implements BlockingQueue<E> {

    //使用ReentrantLock進行執行緒的同步
    private final transient ReentrantLock lock = new ReentrantLock();
    //使用優先順序佇列PriorityQueue作為存放資料的佇列
    private final PriorityQueue<E> q = new PriorityQueue<E>();
  
    //使用leader/follower模式來避免多執行緒效能的消耗
    private Thread leader = null;

    //使用Condition等待佇列來儲存請求的執行緒(l/f模式)
    private final Condition available = lock.newCondition();

DelayQueue中的元素需要實現Delayed介面,重寫getDelay()和compareTo()方法,其中getDelay()方法是為了獲取佇列元素延遲剩餘時間,compareTo()方法是為了對佇列中的元素進行一個排序,使符合條件的元素排在佇列的最前面

DelayQueue內部的實現基本就是依靠重寫BlockingQueue方法,使用ReentrantLock進行同步操作,使用PriorityQueue存放佇列元素,Condition存放訪問執行緒

DelayQueue內部採用了leader/follower設計模式,旨在減小多執行緒的消耗,本文不詳細介紹

2原始碼實現細節

offer方法:將元素加入到延遲佇列中去

 public boolean offer(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            //元素加入優先順序佇列
            q.offer(e);
            //如果新加入的元素e就是佇列的頭元素,將leader置為null切喚醒等待執行緒
            if (q.peek() == e) {        //Q1此處為何要獲取佇列頭節點元素並與新加入元素進行比較
                leader = null;          //Q2為何要將leader執行緒置為null且喚起等待佇列
                available.signal();     
                }
                return true;
            } finally {
                lock.unlock();
            }
        }

offer方法比較簡單,只針對以上兩處做詳細說明 Q1和Q2兩個操作都是為了解決一個問題,就是leader對應佇列首節點元素的問題,因為元素是不斷在加入的,比如,leader對應需要取出的首節點是A,此時A雖然是首節點元素,但是還沒有到達延遲時間,所以leader還在等待A,他們的關係是對應的(對應關係的邏輯參考take()原始碼),那麼此時加入了元素B,這時候元素B排在了隊首,那麼此時需要處理元素B的就不再是當前的leader了,所以我們需要將leader置空,重新選取新的leader來處理這個B,至於之前的leader執行緒,在take原始碼中,在呼叫available.awaitNanos(delay)後,當時間到了會重新獲取鎖然後執行操作

所以我們要首先判斷加入的新元素是否是首節點,以便確定對應執行緒的處理關係

絕大多數的文章對原始碼中為什麼進行if (q.peek() == e)和leader = null的操作的原因隻字不提,我覺得還是有必要寫下的,我對於此處原因的理解可能也存在偏差,希望各位不吝賜教

take方法:取出元素並處理元素事件

/**
     * 首先獲取優先佇列的首個元素,如果為空則呼叫執行緒沉睡。
     * 如果優先順序佇列不為空,檢視當前首元素是否到達過期時間,到達過期時間了就獲取並移除佇列
     * 如果沒有到達過期時間,將first變數置為null(防止記憶體洩漏),如果leader執行緒不為空則進入等待佇列
     * 如果leader為空,則當前執行緒為leader,並限時進入等待佇列中進行等待
     * 如果leader為空,佇列中還有元素存在,則喚醒所有等待的follower執行緒
     * 繼續迴圈,直到獲取延時佇列中的元素
     */
public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                E first = q.peek(); //獲取優先佇列中的首個節點
                if (first == null)  //如果優先佇列中沒有節點,則該執行緒進入等待執行緒
                    available.await();
                else {  //如果首節點不為空
                    long delay = first.getDelay(NANOSECONDS);   //獲取當前元素還需要延時多長時間
                    if (delay <= 0) //如果延時時間小於或是等於0,則移出佇列
                        return q.poll();
                    first = null; // don't retain ref while waiting防止記憶體洩漏
                    if (leader != null) //說明leader執行緒正在工作,當前執行緒就進入等待佇列中
                        available.await();//當前執行緒轉變為follower執行緒
                    else {  //如果首節點不為空,延時時間還沒到,沒有相應的處理執行緒
                        Thread thisThread = Thread.currentThread(); //獲取當前執行緒
                        leader = thisThread;    //當前執行緒設定為首執行緒
                        try {
                            available.awaitNanos(delay);    //限時進入等待佇列中處理延時時間最小的元素,並釋放鎖
                        } finally {
                            if (leader == thisThread)
                                leader = null;  //執行事件之後,將leader執行緒置為null讓給其他執行緒
                        }
                    }
                }
            }
        } finally {
            if (leader == null && q.peek() != null) //如果leader執行緒為null,優先順序佇列中還有元素,則喚醒通知佇列中的執行緒
                available.signal();
            lock.unlock();
        }
    }

take方法是DelayQueue的核心方法,獲取延遲佇列中的元素,檢索並移除這個佇列的頭部,等待直到這個佇列的過期元素可用 關於原始碼的疑惑,不將first=null為什麼會導致記憶體洩露? 核心點在於leader呼叫await方法時會釋放鎖,比如,當執行緒A獲取了first,然後將當前執行緒設為leader執行緒,接著進入await方法,釋放鎖,這時執行緒B也獲取了first,因為leader != null,所以進入阻塞佇列,這時執行緒A從等待佇列中返回,獲取物件釋放first,但由於執行緒B中依然有first的引用,所以gc無法對first進行回收,導致記憶體的洩露

在DelayQueue還有很多值得研究的原始碼和問題,我在日後也會慢慢的加上來,第一次寫先寫這麼多吧,不足之處希望可以共同討論進步!

  • 技術理解不到或有錯誤請直(bu)接(yao)指(ma)出(wo)
  • 寫作不易!

恆大-塔利斯卡