死磕Java併發:J.U.C之阻塞佇列:DelayQueue
作者:chenssy
公眾號:Java技術驛站
DelayQueue是一個支援延時獲取元素的無界阻塞佇列。裡面的元素全部都是“可延期”的元素,列頭的元素是最先“到期”的元素,如果佇列裡面沒有元素到期,是不能從列頭獲取元素的,哪怕有元素也不行。也就是說只有在延遲期到時才能夠從佇列中取元素。
DelayQueue主要用於兩個方面:
快取:清掉快取中超時的快取資料
任務超時處理
DelayQueue
DelayQueue實現的關鍵主要有如下幾個:
可重入鎖ReentrantLock
用於阻塞和通知的Condition物件
根據Delay時間排序的優先順序佇列:PriorityQueue
用於優化阻塞通知的執行緒元素leader
ReentrantLock、Condition這兩個物件就不需要闡述了,他是實現整個BlockingQueue的核心。PriorityQueue是一個支援優先順序執行緒排序的佇列(參考【死磕Java併發】-----J.U.C之阻塞佇列:PriorityBlockingQueue),leader後面闡述。這裡我們先來了解Delay,他是實現延時操作的關鍵。
Delayed
Delayed介面是用來標記那些應該在給定延遲時間之後執行的物件,它定義了一個long getDelay(TimeUnit unit)方法,該方法返回與此物件相關的的剩餘時間。同時實現該介面的物件必須定義一個compareTo 方法,該方法提供與此介面的 getDelay 方法一致的排序。
publicinterfaceDelayedextendsComparable<Delayed> {
long getDelay(TimeUnit unit);
}
如何使用該介面呢?上面說的非常清楚了,實現該介面的getDelay()方法,同時定義compareTo()方法即可。
內部結構
先看DelayQueue的定義:
publicclassDelayQueue<E extendsDelayed> extendsAbstractQueue<E>
implementsBlockingQueue<E> {
/** 可重入鎖 */
privatefinaltransient
/** 支援優先順序的BlockingQueue */
privatefinalPriorityQueue<E> q = newPriorityQueue<E>();
/** 用於優化阻塞 */
privateThread leader = null;
/** Condition */
privatefinalCondition available = lock.newCondition();
/**
* 省略很多程式碼
*/
}
看了DelayQueue的內部結構就對上面幾個關鍵點一目瞭然了,但是這裡有一點需要注意,DelayQueue的元素都必須繼承Delayed介面。同時也可以從這裡初步理清楚DelayQueue內部實現的機制了:以支援優先順序無界佇列的PriorityQueue作為一個容器,容器裡面的元素都應該實現Delayed介面,在每次往優先順序佇列中新增元素時以元素的過期時間作為排序條件,最先過期的元素放在優先順序最高。
offer()
publicboolean offer(E e) {
finalReentrantLock lock = this.lock;
lock.lock();
try {
// 向 PriorityQueue中插入元素
q.offer(e);
// 如果當前元素的對首元素(優先順序最高),leader設定為空,喚醒所有等待執行緒
if (q.peek() == e) {
leader = null;
available.signal();
}
// 無界佇列,永遠返回true
returntrue;
} finally {
lock.unlock();
}
}
offer(E e)就是往PriorityQueue中新增元素,具體可以參考(【死磕Java併發】-----J.U.C之阻塞佇列:PriorityBlockingQueue)。整個過程還是比較簡單,但是在判斷當前元素是否為對首元素,如果是的話則設定leader=null,這是非常關鍵的一個步驟,後面闡述。
take()
public E take() throwsInterruptedException {
finalReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
// 對首元素
E first = q.peek();
// 對首為空,阻塞,等待off()操作喚醒
if (first == null)
available.await();
else {
// 獲取對首元素的超時時間
long delay = first.getDelay(NANOSECONDS);
// <=0 表示已過期,出對,return
if (delay <= 0)
return q.poll();
first = null; // don't retain ref while waiting
// leader != null 證明有其他執行緒在操作,阻塞
if (leader != null)
available.await();
else {
// 否則將leader 設定為當前執行緒,獨佔
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
// 超時阻塞
available.awaitNanos(delay);
} finally {
// 釋放leader
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
// 喚醒阻塞執行緒
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}
首先是獲取對首元素,如果對首元素的延時時間 delay <= 0 ,則可以出對了,直接return即可。否則設定first = null,這裡設定為null的主要目的是為了避免記憶體洩漏。如果 leader != null 則表示當前有執行緒佔用,則阻塞,否則設定leader為當前執行緒,然後呼叫awaitNanos()方法超時等待。
first = null
這裡為什麼如果不設定first = null,則會引起記憶體洩漏呢?執行緒A到達,列首元素沒有到期,設定leader = 執行緒A,這是執行緒B來了因為leader != null,則會阻塞,執行緒C一樣。假如執行緒阻塞完畢了,獲取列首元素成功,出列。這個時候列首元素應該會被回收掉,但是問題是它還被執行緒B、執行緒C持有著,所以不會回收,這裡只有兩個執行緒,如果有執行緒D、執行緒E...呢?這樣會無限期的不能回收,就會造成記憶體洩漏。
這個入隊、出對過程和其他的阻塞佇列沒有很大區別,無非是在出對的時候增加了一個到期時間的判斷。同時通過leader來減少不必要阻塞。
- END -
近期熱文:
關注我
點選“閱讀原文”,看本號其他精彩內容