JDK併發工具類原始碼學習系列——LinkedBlockingQueue
LinkedBlockingQueue是一個基於已連結節點的、範圍任意的 blocking queue。此佇列按 FIFO(先進先出)排序元素。佇列的頭部 是在佇列中時間最長的元素。佇列的尾部 是在佇列中時間最短的元素。新元素插入到佇列的尾部,並且佇列獲取操作會獲得位於佇列頭部的元素。連結佇列的吞吐量通常要高於基於陣列的佇列,但是在大多數併發應用程式中,其可預知的效能要低。
使用場景
LinkedBlockingQueue常用於生產者/消費者模式中,作為生產者和消費者的通訊橋樑。LinkedBlockingQueue與之前介紹的ConcurrentLinkedQueue以及PriorityBlockingQueue功能類似,都是Queue的一種,不同之處是:
- LinkedBlockingQueue和PriorityBlockingQueue是阻塞的,而ConcurrentLinkedQueue是非阻塞的,
- 同時LinkedBlockingQueue和PriorityBlockingQueue通過加鎖實現執行緒安全,而ConcurrentLinkedQueue使用CAS實現無鎖模式
- PriorityBlockingQueue支援優先順序
由於不同的特徵,所以以上三者的使用場景也不同:
- LinkedBlockingQueue適合需要阻塞的佇列場景,如果能不阻塞或者可以通過程式碼自行實現阻塞,那麼建議使用ConcurrentLinkedQueue代替
- ConcurrentLinkedQueue適合對效能要求較高,同時無需阻塞的場景使用
- PriorityBlockingQueue適合需要根據任務的不同優先順序進行調整佇列的順序的場景
結構預覽
LinkedBlockingQueue內部實現相對較簡單,直接使用一個連結串列儲存資料,通過加鎖實現執行緒安全,通過兩個Condition分別實現入隊和出隊的等待。連結串列的節點使用內部類:Node表示,Node很簡單,就兩個變數,由外部類直接修改即可。
/**
* Linked list node class
*/
static class Node<E> {
/** The item, volatile to ensure barrier separating write and read */
volatile E item;
Node<E> next;
Node(E x) { item = x; }
}
item使用volatile修飾,解決記憶體可見性。
常用方法解析
LinkedBlockingQueue常用方法有:入隊(offer(E)/offer(E, long, TimeUnit)/put(E))、出隊(poll()/poll(long, TimeUnit)/take())、刪除(remove(Object))。下面分別看看這三類方法。
入隊
/**
* @By Vicky:入隊,無阻塞,佇列未滿則直接入隊,否則直接返回false
*/
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;// 儲存當前佇列的長度
// 這裡因為count是Atomic的,所以有類似volatile的記憶體可見性效果
// 即對count的修改能夠立即被其他執行緒可見,所以此處不加鎖的情況下讀取count值是會讀取到最新值的
// 然後根據此值進行前置判斷,避免不必要的加鎖操作
if (count.get() == capacity)// 佇列已滿直接返回false
return false;
int c = -1;
final ReentrantLock putLock = this.putLock;// 獲取putLock,加鎖
putLock.lock();
try {
if (count.get() < capacity) {// 佇列未滿則插入
insert(e);
c = count.getAndIncrement();// 更新count值
if (c + 1 < capacity)// 未滿則喚醒等待在notFull上的執行緒
// 此處有點怪異,入隊喚醒notFull~
// 此處喚醒notFull是考慮有可能如果多個執行緒同時出隊,由於出隊喚醒notFull時也需要對putLock進行加鎖
// 所以有可能一個執行緒出隊,喚醒notFull,但是被另一個出隊執行緒搶到了鎖,所以入隊執行緒依舊在等待
// 當另一個執行緒也喚醒了notFull,釋放了putLock後,只能喚醒一個入隊執行緒,所以其他執行緒依舊在等待
// 所以此處需要再次喚醒notFull
notFull.signal();
}
} finally {
putLock.unlock();
}
// c==0表示佇列在插入之前是空的,所以需要喚醒等待在notEmpty上的執行緒
if (c == 0)
signalNotEmpty();
return c >= 0;
}
/**
* @By Vicky:喚醒notEmpty,需對takeLock進行加鎖,因為notEmpty與takeLock相關
*/
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
首先解析offer(),另外兩個入隊操作只是在佇列已滿的情況下進行一些特殊處理而已。文中程式碼給出了詳細註釋,這裡著重說明兩個地方:
- 對Condition的操作需要在加鎖的環境下進行,而且是需要對與Condition相關的鎖進行加鎖,如此處notEmpty是由takeLock.newCondition()得來,所以對notEmpty的操作需要對takeLock進行加鎖
- 入隊操作也執行
notFull.signal();
的原因是避免入隊執行緒未搶到鎖而遺失了出隊的喚醒操作。詳細解析可以見文中的註釋
下面直接貼出offer(E, long, TimeUnit)和put(E)的程式碼,基本同offer(E)。
/**
* @By Vicky:入隊,等待指定時間
*/
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
if (e == null) throw new NullPointerException();
long nanos = unit.toNanos(timeout);
int c = -1;
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
for (;;) {
// 此處同offer()
if (count.get() < capacity) {
insert(e);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
break;
}
// nanos是剩餘的等待時間,<=0表示等待時間已到
if (nanos <= 0)
return false;
try {
// 呼叫notFull的awaitNanos,指定等待時間,如果等待期間被喚醒,則返回剩餘等待時間,<0表示等待時間已到
nanos = notFull.awaitNanos(nanos);
} catch (InterruptedException ie) {
notFull.signal(); // propagate to a non-interrupted thread
throw ie;
}
}
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return true;
}
/**
* @By Vicky:入隊,無期限等待
*/
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset
// local var holding count negative to indicate failure unless set.
int c = -1;
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
try {
while (count.get() == capacity)// 無限等待,直到可用
notFull.await();
} catch (InterruptedException ie) {
notFull.signal(); // propagate to a non-interrupted thread
throw ie;
}
insert(e);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
出隊
出隊操作和入隊邏輯相同,看程式碼。
/**
* @By Vicky:出隊,無阻塞,佇列為空則直接返回null
*/
public E poll() {
final AtomicInteger count = this.count;
if (count.get() == 0)// 佇列為空,直接返回
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
if (count.get() > 0) {// 不為空,獲取一個元素
x = extract();
c = count.getAndDecrement();
if (c > 1)// 同offer(),此處需喚醒notEmpty
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();// 同offer(),此處需喚醒notFull
return x;
}
/**
* @By Vicky:出隊,將head指向head.next
* @return
*/
private E extract() {
Node<E> first = head.next;
head = first;
E x = first.item;
first.item = null;
return x;
}
/**
* @By Vicky:喚醒notFull,需對putLock進行加鎖,因為notFull與putLock相關
*/
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
}
出隊一個元素:extract()
,邏輯很簡單,將head指向head.next即可。其他地方與offer()的邏輯相同,如佇列未空需喚醒notEmpty,佇列由滿變空需喚醒notFull,原因完全同offer()。poll(long, TimeUnit)和take()程式碼就不貼出來了,完全與offer()相同。
刪除
/**
* @By Vicky:刪除指定元素
*/
public boolean remove(Object o) {
if (o == null) return false;
boolean removed = false;
fullyLock();// 同時對takeLock和pullLock加鎖,避免任何的入隊和出隊操作
try {
Node<E> trail = head;
Node<E> p = head.next;
while (p != null) {// 從佇列的head開始迴圈查詢與o相同的元素
if (o.equals(p.item)) {// 找到相同的元素則設定remove為true
removed = true;
break;
}
trail = p;// 繼續迴圈
p = p.next;
}
if (removed) {
// remove==true,則表示查詢到待刪除元素,即p,將trail的next指向p的next,即將p從佇列移除及完成刪除
p.item = null;
trail.next = p.next;
if (last == p)
last = trail;
if (count.getAndDecrement() == capacity)
notFull.signalAll();
}
} finally {
fullyUnlock();
}
return removed;
}
刪除的邏輯也很簡單,程式碼中給出了註釋。
以上即本篇全部內容,比較簡單,更多關於佇列的研究可參考:
以上內容如有錯誤,請不吝賜教~