delayQueue原理理解之原始碼解析
http://www.jianshu.com/p/e0bcc9eae0ae
內部結構
- 可重入鎖
- 用於根據delay時間排序的優先順序佇列
- 用於優化阻塞通知的執行緒元素leader
- 用於實現阻塞和通知的Condition物件
delayed和PriorityQueue
在理解delayQueue原理之前我們需要先了解兩個東西,delayed和PriorityQueue.
- delayed是一個具有過期時間的元素
- PriorityQueue是一個根據佇列裡元素某些屬性排列先後的順序佇列
delayQueue其實就是在每次往優先順序佇列中新增元素,然後以元素的delay/過期值作為排序的因素,以此來達到先過期的元素會拍在隊首,每次從佇列裡取出來都是最先要過期的元素
offer方法
- 執行加鎖操作
- 吧元素新增到優先順序佇列中
- 檢視元素是否為隊首
- 如果是隊首的話,設定leader為空,喚醒所有等待的佇列
- 釋放鎖
程式碼如下:
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
q.offer(e);
if (q.peek() == e) {
leader = null;
available.signal();
}
return true;
} finally {
lock.unlock();
}
}
take方法
- 執行加鎖操作
- 取出優先順序佇列元素q的隊首
- 如果元素q的隊首/佇列為空,阻塞請求
- 如果元素q的隊首(first)不為空,獲得這個元素的delay時間值
- 如果first的延遲delay時間值為0的話,說明該元素已經到了可以使用的時間,呼叫poll方法彈出該元素,跳出方法
- 如果first的延遲delay時間值不為0的話,釋放元素first的引用,避免記憶體洩露
- 判斷leader元素是否為空,不為空的話阻塞當前執行緒
- 如果leader元素為空的話,把當前執行緒賦值給leader元素,然後阻塞delay的時間,即等待隊首到達可以出隊的時間,在finally塊中釋放leader元素的引用
- 迴圈執行從1~8的步驟
- 如果leader為空並且優先順序佇列不為空的情況下(判斷還有沒有其他後續節點),呼叫signal通知其他的執行緒
- 執行解鎖操作
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();
if (first == null)
available.await();
else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
return q.poll();
first = null; // don't retain ref while waiting
if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}
get點
整個程式碼的過程中並沒有使用上太難理解的地方,但是有幾個比較難以理解他為什麼這麼做的地方
leader元素的使用
大家可能看到在我們的DelayQueue中有一個Thread型別的元素leader,那麼他是做什麼的呢,有什麼用呢?
讓我們先看一下元素註解上的doc描述:
Thread designated to wait for the element at the head of the queue.
This variant of the Leader-Follower pattern serves to minimize unnecessary timed waiting.
when a thread becomes the leader, it waits only for the next delay to elapse, but other threads await indefinitely.
The leader thread must signal some other thread before returning from take() or poll(...), unless some other thread becomes leader in the interim.
Whenever the head of the queue is replaced with an element with an earlier expiration time, the leader field is invalidated by being reset to null, and some waiting thread, but not necessarily the current leader, is signalled.
So waiting threads must be prepared to acquire and lose leadership while waiting.
上面主要的意思就是說用leader來減少不必要的等待時間,那麼這裡我們的DelayQueue是怎麼利用leader來做到這一點的呢:
這裡我們想象著我們有個多個消費者執行緒用take方法去取,內部先加鎖,然後每個執行緒都去peek第一個節點.
如果leader不為空說明已經有執行緒在取了,設定當前執行緒等待
if (leader != null)
available.await();
如果為空說明沒有其他執行緒去取這個節點,設定leader並等待delay延時到期,直到poll後結束迴圈
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
take方法中為什麼釋放first元素
first = null; // don't retain ref while waiting
我們可以看到doug lea後面寫的註釋,那麼這段程式碼有什麼用呢?
想想假設現在延遲佇列裡面有三個物件。
- 執行緒A進來獲取first,然後進入 else 的else ,設定了leader為當前執行緒A
- 執行緒B進來獲取first,進入else的阻塞操作,然後無限期等待
- 這時在JDK 1.7下面他是持有first引用的
- 如果執行緒A阻塞完畢,獲取物件成功,出隊,這個物件理應被GC回收,但是他還被執行緒B持有著,GC鏈可達,所以不能回收這個first.
- 假設還有執行緒C 、D、E.. 持有物件1引用,那麼無限期的不能回收該物件1引用了,那麼就會造成記憶體洩露.
文/jsondream(簡書作者)
原文連結:http://www.jianshu.com/p/e0bcc9eae0ae
著作權歸作者所有,轉載請聯絡作者獲得授權,並標註“簡書作者”。