DelayQueue阻塞佇列第二章:原始碼解析
DelayQueue阻塞佇列系列文章
介紹
DelayQueue是java併發包中提供的延遲阻塞佇列,業務場景一般是下單後多長時間過期,定時執行程式等
1-DelayQueue的組成結構
/** * DelayQueue佇列繼承了AbstractQueue,並且實現BlockingQueue的方法 */ public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> { //使用ReentrantLock進行執行緒的同步 private final transient ReentrantLock lock = new ReentrantLock(); //使用優先順序佇列PriorityQueue作為存放資料的佇列 private final PriorityQueue<E> q = new PriorityQueue<E>(); //使用leader/follower模式來避免多執行緒效能的消耗 private Thread leader = null; //使用Condition等待佇列來儲存請求的執行緒(l/f模式) private final Condition available = lock.newCondition();
DelayQueue中的元素需要實現Delayed介面,重寫getDelay()和compareTo()方法,其中getDelay()方法是為了獲取佇列元素延遲剩餘時間,compareTo()方法是為了對佇列中的元素進行一個排序,使符合條件的元素排在佇列的最前面
DelayQueue內部的實現基本就是依靠重寫BlockingQueue方法,使用ReentrantLock進行同步操作,使用PriorityQueue存放佇列元素,Condition存放訪問執行緒
DelayQueue內部採用了leader/follower設計模式,旨在減小多執行緒的消耗,本文不詳細介紹
2原始碼實現細節
offer方法:將元素加入到延遲佇列中去
public boolean offer(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { //元素加入優先順序佇列 q.offer(e); //如果新加入的元素e就是佇列的頭元素,將leader置為null切喚醒等待執行緒 if (q.peek() == e) { //Q1此處為何要獲取佇列頭節點元素並與新加入元素進行比較 leader = null; //Q2為何要將leader執行緒置為null且喚起等待佇列 available.signal(); } return true; } finally { lock.unlock(); } }
offer方法比較簡單,只針對以上兩處做詳細說明 Q1和Q2兩個操作都是為了解決一個問題,就是leader對應佇列首節點元素的問題,因為元素是不斷在加入的,比如,leader對應需要取出的首節點是A,此時A雖然是首節點元素,但是還沒有到達延遲時間,所以leader還在等待A,他們的關係是對應的(對應關係的邏輯參考take()原始碼),那麼此時加入了元素B,這時候元素B排在了隊首,那麼此時需要處理元素B的就不再是當前的leader了,所以我們需要將leader置空,重新選取新的leader來處理這個B,至於之前的leader執行緒,在take原始碼中,在呼叫available.awaitNanos(delay)後,當時間到了會重新獲取鎖然後執行操作
所以我們要首先判斷加入的新元素是否是首節點,以便確定對應執行緒的處理關係
絕大多數的文章對原始碼中為什麼進行if (q.peek() == e)和leader = null的操作的原因隻字不提,我覺得還是有必要寫下的,我對於此處原因的理解可能也存在偏差,希望各位不吝賜教
take方法:取出元素並處理元素事件
/**
* 首先獲取優先佇列的首個元素,如果為空則呼叫執行緒沉睡。
* 如果優先順序佇列不為空,檢視當前首元素是否到達過期時間,到達過期時間了就獲取並移除佇列
* 如果沒有到達過期時間,將first變數置為null(防止記憶體洩漏),如果leader執行緒不為空則進入等待佇列
* 如果leader為空,則當前執行緒為leader,並限時進入等待佇列中進行等待
* 如果leader為空,佇列中還有元素存在,則喚醒所有等待的follower執行緒
* 繼續迴圈,直到獲取延時佇列中的元素
*/
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek(); //獲取優先佇列中的首個節點
if (first == null) //如果優先佇列中沒有節點,則該執行緒進入等待執行緒
available.await();
else { //如果首節點不為空
long delay = first.getDelay(NANOSECONDS); //獲取當前元素還需要延時多長時間
if (delay <= 0) //如果延時時間小於或是等於0,則移出佇列
return q.poll();
first = null; // don't retain ref while waiting防止記憶體洩漏
if (leader != null) //說明leader執行緒正在工作,當前執行緒就進入等待佇列中
available.await();//當前執行緒轉變為follower執行緒
else { //如果首節點不為空,延時時間還沒到,沒有相應的處理執行緒
Thread thisThread = Thread.currentThread(); //獲取當前執行緒
leader = thisThread; //當前執行緒設定為首執行緒
try {
available.awaitNanos(delay); //限時進入等待佇列中處理延時時間最小的元素,並釋放鎖
} finally {
if (leader == thisThread)
leader = null; //執行事件之後,將leader執行緒置為null讓給其他執行緒
}
}
}
}
} finally {
if (leader == null && q.peek() != null) //如果leader執行緒為null,優先順序佇列中還有元素,則喚醒通知佇列中的執行緒
available.signal();
lock.unlock();
}
}
take方法是DelayQueue的核心方法,獲取延遲佇列中的元素,檢索並移除這個佇列的頭部,等待直到這個佇列的過期元素可用 關於原始碼的疑惑,不將first=null為什麼會導致記憶體洩露? 核心點在於leader呼叫await方法時會釋放鎖,比如,當執行緒A獲取了first,然後將當前執行緒設為leader執行緒,接著進入await方法,釋放鎖,這時執行緒B也獲取了first,因為leader != null,所以進入阻塞佇列,這時執行緒A從等待佇列中返回,獲取物件釋放first,但由於執行緒B中依然有first的引用,所以gc無法對first進行回收,導致記憶體的洩露
在DelayQueue還有很多值得研究的原始碼和問題,我在日後也會慢慢的加上來,第一次寫先寫這麼多吧,不足之處希望可以共同討論進步!
- 技術理解不到或有錯誤請直(bu)接(yao)指(ma)出(wo)
- 寫作不易!