深入理解阻塞佇列(三)——LinkedBlockingQueue原始碼分析
LinkedBlockingQueue是一個基於連結串列實現的可選容量的阻塞佇列。隊頭的元素是插入時間最長的,隊尾的元素是最新插入的。新的元素將會被插入到佇列的尾部。
LinkedBlockingQueue的容量限制是可選的,如果在初始化時沒有指定容量,那麼預設使用int的最大值作為佇列容量。
概述
類繼承關係
LinkedBlockingQueue的繼承關係如下圖:
可以參考深入理解阻塞佇列(二)——ArrayBlockingQueue原始碼分析中ArrayBlockingQueue的類繼承關係圖,這兩個類的繼承關係是一樣的。
底層資料結構
LinkedBlockingQueue內部是使用連結串列實現一個佇列的,但是卻有別於一般的佇列,在於該佇列至少有一個節點,頭節點不含有元素。結構圖如下:
可以發現head.item=null,last.next=null。
原理
LinkedBlockingQueue中維持兩把鎖,一把鎖用於入隊,一把鎖用於出隊,這也就意味著,同一時刻,只能有一個執行緒執行入隊,其餘執行入隊的執行緒將會被阻塞;同時,可以有另一個執行緒執行出隊,其餘執行出隊的執行緒將會被阻塞。換句話說,雖然入隊和出隊兩個操作同時均只能有一個執行緒操作,但是可以一個入隊執行緒和一個出隊執行緒共同執行,也就意味著可能同時有兩個執行緒在操作佇列,那麼為了維持執行緒安全,LinkedBlockingQueue使用一個AtomicInterger型別的變量表示當前佇列中含有的元素個數,所以可以確保兩個執行緒之間操作底層佇列是執行緒安全的,這個在後面原始碼分析的時候會說明。
原始碼分析
重要欄位
LinkedBlockingQueue可以指定容量,內部維持一個佇列,所以有一個頭節點head和一個尾節點last,內部維持兩把鎖,一個用於入隊,一個用於出隊,還有鎖關聯的Condition物件。主要物件的定義如下:
//容量,如果沒有指定,該值為Integer.MAX_VALUE;
private final int capacity;
//當前佇列中的元素
private final AtomicInteger count = new AtomicInteger();
//佇列頭節點,始終滿足head.item==null
transient Node<E> head;
//佇列的尾節點,始終滿足last.next==null
private transient Node<E> last;
//用於出隊的鎖
private final ReentrantLock takeLock = new ReentrantLock();
//當佇列為空時,儲存執行出隊的執行緒
private final Condition notEmpty = takeLock.newCondition();
//用於入隊的鎖
private final ReentrantLock putLock = new ReentrantLock();
//當佇列滿時,儲存執行入隊的執行緒
private final Condition notFull = putLock.newCondition();
構造方法
LinkedBlockingQueue的構造方法有三個,分別如下:
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);//last和head在佇列為空時都存在,所以佇列中至少有一個節點
}
public LinkedBlockingQueue(Collection<? extends E> c) {
this(Integer.MAX_VALUE);
final ReentrantLock putLock = this.putLock;
putLock.lock(); // Never contended, but necessary for visibility
try {
int n = 0;
for (E e : c) {
if (e == null)
throw new NullPointerException();
if (n == capacity)
throw new IllegalStateException("Queue full");
enqueue(new Node<E>(e));
++n;
}
count.set(n);
} finally {
putLock.unlock();
}
}
從上面的構造方法中可以得出3點結論:
1. 當呼叫無參的構造方法時,容量是int的最大值
2. 佇列中至少包含一個節點,哪怕佇列對外表現為空
3. LinkedBlockingQueue不支援null元素
put(E e)方法
put(E e)方法用於將一個元素插入到佇列的尾部,其實現如下:
public void put(E e) throws InterruptedException {
//不允許元素為null
if (e == null) throw new NullPointerException();
int c = -1;
//以當前元素新建一個節點
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
//獲得入隊的鎖
putLock.lockInterruptibly();
try {
//如果佇列已滿,那麼將該執行緒加入到Condition的等待佇列中
while (count.get() == capacity) {
notFull.await();
}
//將節點入隊
enqueue(node);
//得到插入之前佇列的元素個數
c = count.getAndIncrement();
//如果還可以插入元素,那麼釋放等待的入隊執行緒
if (c + 1 < capacity)
notFull.signal();
} finally {
//解鎖
putLock.unlock();
}
//通知出隊執行緒佇列非空
if (c == 0)
signalNotEmpty();
}
從上面的程式碼分析中可以得出6點結論:
1. LinkedBlockingQueue不允許元素為null,這一點在構造方法中也說過了。
2. 同一時刻,只能有一個執行緒執行入隊操作,因為putLock在將元素插入到佇列尾部時加鎖了
3. 如果佇列滿了,那麼將會呼叫notFull的await()方法將該執行緒加入到Condition等待佇列中。await()方法就會釋放執行緒佔有的鎖,這將導致之前由於被鎖阻塞的入隊執行緒將會獲取到鎖,執行到while迴圈處,不過可能因為由於佇列仍舊是滿的,也被加入到條件佇列中。
4. 一旦一個出隊執行緒取走了一個元素,並通知了入隊等待佇列中可以釋放執行緒了,那麼第一個加入到Condition佇列中的將會被釋放,那麼該執行緒將會重新獲得put鎖,繼而執行enqueue()方法,將節點插入到佇列的尾部
5. 然後得到插入一個節點之前的元素個數,如果佇列中還有空間可以插入,那麼就通知notFull條件的等待佇列中的執行緒。
6. 通知出隊執行緒佇列為空了,因為插入一個元素之前的個數為0,而插入一個之後佇列中的元素就從無變成了有,就可以通知因佇列為空而阻塞的出隊執行緒了。
signalNotEmpty()方法只會在put/take之類的入隊方法中才會被呼叫,並且是當佇列元素從無到有的時候。下面是signalNotEmpty()方法的實現:
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
//獲取takeLock
takeLock.lock();
try {
//釋放notEmpty條件佇列中的第一個等待執行緒
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
E take()方法
take()方法用於得到隊頭的元素,在佇列為空時會阻塞,知道佇列中有元素可取。其實現如下:
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
//獲取takeLock鎖
takeLock.lockInterruptibly();
try {
//如果佇列為空,那麼加入到notEmpty條件的等待佇列中
while (count.get() == 0) {
notEmpty.await();
}
//得到隊頭元素
x = dequeue();
//得到取走一個元素之前佇列的元素個數
c = count.getAndDecrement();
//如果佇列中還有資料可取,釋放notEmpty條件等待佇列中的第一個執行緒
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
//如果佇列中的元素從滿到非滿,通知put執行緒
if (c == capacity)
signalNotFull();
return x;
}
上面的程式碼註釋已將說明了take()方法的整體路程,大體上與put()相對。
當佇列為空時,就加入到notEmpty(的條件等待佇列中,當佇列不為空時就取走一個元素,當取完發現還有元素可取時,再通知一下自己的夥伴(等待在條件佇列中的執行緒);最後,如果佇列從滿到非滿,通知一下put執行緒。
下面看一下dequeue()的刪除節點操作,其特別之處在於頭節點是一個哨兵節點。
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}
入隊、出隊總結
LinkedBlockingQueue中除了上面的put()方法之外,還有另外幾個入隊的方法,比如offer(E)、offer(E,long,TimeUnit);也有另外幾個出隊的方法,比如poll()、poll(long,TimeUnit),但實現和put()以及take()都大同小異,這兒就不再一一分析了。下面就兩個方法總結一下:
LinkedBlockingQueue是允許兩個執行緒同時在兩端進行入隊或出隊的操作的,但一端同時只能有一個執行緒進行操作,這是通過兩把鎖來區分的;為了維持底部資料的統一,引入了AtomicInteger的一個count變數,表示佇列中元素的個數。count只能在兩個地方變化,一個是入隊的方法(可以+1),另一個是出隊的方法(可以-1),而AtomicInteger是原子安全的,所以也就確保了底層佇列的資料同步。
另外,入隊、出隊執行緒之間還存在合作的關係,這個以入隊為例:當一群執行緒執行入隊操作時,一個執行緒A幸運地佔有了putLock鎖,然後也成功的插入了一個元素,但是插完這個元素就達到了佇列的容量了,當這個執行緒A釋放了鎖之後,前面一群執行緒中一個執行緒B又獲得了putLock鎖,但是由於佇列已經滿了,那麼執行緒B釋放了putLock鎖後被加入到了notEmpty條件的等待佇列中;由於釋放了鎖,執行緒C也搶到了鎖,但是很不幸,它也被加入到了等待佇列中,並且被加在了執行緒B的尾部;這時一個出隊執行緒出現了,它成功地取走了一個元素,使得佇列從滿變為了非滿狀態,並且呼叫signalNotFull()方法通知了notFull的等待佇列,這時執行緒B又重新獲得了鎖,插入了一個元素,插完一個元素,它發現還有容量可以插元素,它也沒有忘記了和它一起被困在條件佇列中的執行緒C,就呼叫了notFull.await()通知了執行緒C,這樣執行緒C也執行了插入元素的操作。出隊的過程與這個基本相同,就不再介紹了。由此可以看到,入隊的執行緒不止和出隊的執行緒協作,還和自己的難兄難弟,在條件佇列中等待的入隊執行緒協作;出隊的執行緒同樣不止和入隊的執行緒協作,還和另外的出隊執行緒協作。
remove()方法
remove()方法用於刪除佇列中一個元素,如果佇列中不含有該元素,那麼返回false;有的話則刪除並返回true。入隊和出隊都是隻獲取一個鎖,而remove()方法需要同時獲得兩把鎖,其實現如下:
public boolean remove(Object o) {
//因為佇列不包含null元素,返回false
if (o == null) return false;
//獲取兩把鎖
fullyLock();
try {
//從頭的下一個節點開始遍歷
for (Node<E> trail = head, p = trail.next;
p != null;
trail = p, p = p.next) {
//如果匹配,那麼將節點從佇列中移除,trail表示前驅節點
if (o.equals(p.item)) {
unlink(p, trail);
return true;
}
}
return false;
} finally {
//釋放兩把鎖
fullyUnlock();
}
}
可以看到remove()方法中首先獲取兩把鎖,然後再執行遍歷刪除操作,最後釋放兩把鎖。下面先看一下是如何獲取和釋放兩把鎖的,其實現如下:
/**
* Locks to prevent both puts and takes.
*/
void fullyLock() {
putLock.lock();
takeLock.lock();
}
/**
* Unlocks to allow both puts and takes.
*/
void fullyUnlock() {
takeLock.unlock();
putLock.unlock();
}
那麼問題來了,為什麼remove()方法同時需要兩把鎖?
remove()操作會從佇列的頭遍歷到尾,用到了佇列的兩端,所以需要對兩端加鎖,而對兩端加鎖就需要獲取兩把鎖;入隊和出隊均只在佇列的一端操作,所以只需獲取一把鎖。
size()方法
size()方法用於返回佇列中元素的個數,其實現如下:
public int size() {
return count.get();
}
由於count是一個AtomicInteger的變數,所以該方法是一個原子性的操作,是執行緒安全的。
總結
在上面分析LinkedBlockingQueue的原始碼之後,可以與ArrayBlockingQueue做一個比較。
相同點有如下2點:
1. 不允許元素為null
2. 執行緒安全的佇列
不同點有如下幾點:
1. ArrayBlockingQueue底層基於定長的陣列,所以容量限制了;LinkedBlockingQueue底層基於連結串列實現佇列,所以容量可選,如果不設定,那麼容量是int的最大值
2. ArrayBlockingQueue內部維持一把鎖和兩個條件,同一時刻只能有一個執行緒佇列的一端操作;LinkedBlockingQueue內部維持兩把鎖和兩個條件,同一時刻可以有兩個執行緒在佇列的兩端操作,但同一時刻只能有一個執行緒在一端操作。
3. LinkedBlockingQueue的remove()類似方法時,由於需要對整個佇列連結串列實現遍歷,所以需要獲取兩把鎖,對兩端加鎖。
下一篇部落格將分析LinkedBlockingDeque原始碼,敬請關注。