深入理解阻塞佇列(四)——LinkedBlockingDeque原始碼分析
LinkedBlockingDeque是一個基於連結串列的雙端阻塞佇列。和LinkedBlockingQueue類似,區別在於該類實現了Deque介面,而LinkedBlockingQueue實現了Queue介面。該類的繼承關係如下圖:
本文將與LinkedBlockingQueue進行比較,關於LinkedBlockingQueue可以參考:深入理解阻塞佇列(三)——LinkedBlockingQueue原始碼分析
概述
容量問題
LinkedBlockingDeque是一個可選容量的阻塞佇列,如果沒有設定容量,那麼容量將是Int的最大值。
底層資料結構
LinkedBlockingDeque的底層資料結構是一個雙端佇列,該佇列使用連結串列實現,其結構圖如下:
原始碼分析
重要欄位
LinkedBlockingDeque的重要欄位有如下幾個:
//佇列的頭節點
transient Node<E> first;
//佇列的尾節點
transient Node<E> last;
//佇列中元素的個數
private transient int count;
//佇列中元素的最大個數
private final int capacity;
//鎖
final ReentrantLock lock = new ReentrantLock();
//佇列為空時,阻塞take執行緒的條件佇列
private final Condition notEmpty = lock.newCondition();
//佇列滿時,阻塞put執行緒的條件佇列
private final Condition notFull = lock.newCondition();
從上面的欄位,可以得到LinkedBlockingDeque內部只有一把鎖以及該鎖上關聯的兩個條件,所以可以推斷同一時刻只有一個執行緒可以在隊頭或者隊尾執行入隊或出隊操作。可以發現這點和LinkedBlockingQueue不同,LinkedBlockingQueue可以同時有兩個執行緒在兩端執行操作。
構造方法
LinkedBlockingDeque的構造方法有三個,如下:
public LinkedBlockingDeque() {
this(Integer.MAX_VALUE);
}
public LinkedBlockingDeque(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
}
public LinkedBlockingDeque(Collection<? extends E> c) {
this(Integer.MAX_VALUE);
final ReentrantLock lock = this.lock;
lock.lock(); // Never contended, but necessary for visibility
try {
for (E e : c) {
if (e == null)
throw new NullPointerException();
if (!linkLast(new Node<E>(e)))
throw new IllegalStateException("Deque full");
}
} finally {
lock.unlock();
}
}
可以看到這三個構造方法的結構和LinkedBlockingQueue是相同的。 但是LinkedBlockingQueue是存在一個哨兵節點維持頭節點的,而LinkedBlockingDeque中是沒有的。
入隊、出隊方法
由於LinkedBlockingDeque是一個雙端佇列,所以就可以在隊頭執行入隊和出隊操作,也可以在隊尾執行入隊和出隊操作,不過實現的方法基本類似,下面就以putFirst()為例,說明一下阻塞方法的執行過程:
public void putFirst(E e) throws InterruptedException {
//不允許元素為null
if (e == null) throw new NullPointerException();
//新建節點
Node<E> node = new Node<E>(e);
final ReentrantLock lock = this.lock;
//佔有鎖
lock.lock();
try {
//如果新增失敗,等待
while (!linkFirst(node))
notFull.await();
} finally {
//釋放鎖
lock.unlock();
}
}
從上面的程式碼可以看到offerFirst()的流程:
1. 不允許元素為null
2. 首先獲取鎖,一旦獲取到鎖後,呼叫linkFirst()將節點插入在隊頭,最後釋放鎖。
linkFirst()的實現如下:
private boolean linkFirst(Node<E> node) {
// 如果容量滿了
if (count >= capacity)
return false;
//插入節點,將first指向新建節點
Node<E> f = first;
node.next = f;
first = node;
if (last == null)
last = node;
else
f.prev = node;
++count;
//因為插入了一個元素,通知因元素為0時阻塞的take執行緒
notEmpty.signal();
return true;
}
從上面可以看到,返回false只有在佇列中元素滿了的情況下;其他情況都會返回true,並且由於成功插入了一個節點,會呼叫notEmpty條件的signal()方法釋放因佇列中元素個數為0時的take執行緒。 關於一把鎖,兩個條件的實現方式和ArrayBlockingQueue的原理一樣,可以參考深入理解阻塞佇列(二)——ArrayBlockingQueue原始碼分析,這兒就不再過多介紹了。
設想
在分析完LinkedBlockingQueue的原始碼之後,再看LinkedBlockingDeque原始碼之前,我在想LinkedBlockingDeque可能是如何實現的?
我的想法是兩把鎖+四個條件,兩把鎖中,一把管理隊頭,一把管理隊尾,每把鎖兩個條件,分別是notEmpty和notFull,這樣的話,就可以同時有兩個執行緒可以同時在佇列兩端執行入隊和出隊操作,為了實現同步,藉助於一個AtomicInteger的count變數儲存LinkeBlockingDeque中元素的個數。
但是Java的原始碼中,並不是像我這樣的思路,而是使用一把鎖+兩個條件,這種實現方式是和ArrayBlockingQueue一樣的。然後我就在想,上面的想法是否可以實現?
細想了一下,我的這種想法是不可行的,不然Java也有可能採取這種方式了。
舉個例子:現線上程A假設呼叫putFirst(),不過佇列容量滿了,所以執行緒A就阻塞了,這是一個執行緒B呼叫了putLast(),同樣由於佇列容量滿了,執行緒B也阻塞了,這時一個執行緒C呼叫了takeLast()取走了一個元素,那麼該執行緒就可以通知尾部的鎖上的notFull,這樣執行緒B就可以釋放呼叫putLast了,而如果想要釋放執行緒A,只有兩個方法:
1. 就是執行緒C在呼叫takeLast()方法中取走一個數據時,也通知頭部鎖上的notFull,這也就得意味著takeLast也得佔有頭部鎖,即佔有頭和尾兩把鎖
2. 如果takeLast()只佔有一把尾部鎖的話,那麼想要釋放執行緒A的話,就只能希望有一個執行緒D呼叫takeFirst()取走一個頭元素時通知頭鎖的notFull條件釋放執行緒A
如果使用第一種方式,那麼其實和使用一把鎖是相同的,因為不管是隊頭還是隊尾的入隊和出隊,都得先獲取兩把鎖,最後釋放兩把鎖,這樣的實現方式是可以的;但是如果採用第二種方式,那麼就會有問題,比如說:
現在佇列容量還有最後一個元素可以插入,這時執行緒A執行putFirst()方法,執行緒B執行putLast()方法,當兩個執行緒進入while迴圈處判斷AtomicInteger的值的時候,都通過了,那麼執行緒A會呼叫linkFirst將節點插入到前端,然後將count+1,執行緒B會呼叫linkLast將節點插入到尾部,然後將count+1,這時由於兩個執行緒各插入了一個元素,那麼該佇列中的元素就超過了容量了,所以說第二種方式是不可行的。歸根到底是因為可能存在兩個執行緒同時對AtomicInteger做一個方向的操作,比如說都+1,都-1,而LinkedBlockingQueue可行是因為雖然會有兩個執行緒對AtomicInteger操作,但是方向是相反的,一個+1,一個-1。
而第一種方式其實和使用一把鎖是相同的,所以Java原始碼採用了一把鎖+兩個條件的方式。
總結
LinkedBlockingDeque和LinkedBlockingQueue的相同點在於:
1. 基於連結串列
2. 容量可選,不設定的話,就是Int的最大值
和LinkedBlockingQueue的不同點在於:
1. 雙端連結串列和單鏈表
2. 不存在哨兵節點
3. 一把鎖+兩個條件
LinkedBlockingDeque和ArrayBlockingQueue的相同點在於:使用一把鎖+兩個條件維持佇列的同步。
到底為此,關於阻塞佇列系列就到這兒完結了。