深入理解阻塞佇列(二)——ArrayBlockingQueue原始碼分析
在深入理解阻塞佇列(一)——基本結構中,介紹了BlockingQueue這一介面的子類以及子介面。本文主要就其中的一個實現類:ArrayBlockingQueue進行原始碼分析,分析阻塞佇列的阻塞是如何實現的。
概述
ArrayBlockingQueue底層是使用一個數組實現佇列的,並且在構造ArrayBlockingQueue時需要指定容量,也就意味著底層陣列一旦建立了,容量就不能改變了,因此ArrayBlockingQueue是一個容量限制的阻塞佇列。因此,在佇列全滿時執行入隊將會阻塞,在佇列為空時出隊同樣將會阻塞。
原始碼分析
重要欄位
ArrayBlockingQueue的重要欄位有如下幾個:
/** The queued items */
final Object[] items;
/** Main lock guarding all access */
final ReentrantLock lock;
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;
上面程式碼中的items陣列就代表的是佇列,ReentrantLock和兩個Condition都是用於用於併發的,並且這幾個欄位都是final的,意味著ArrayBloackingQueue初始化時就必須完成賦值。
ArrayBlockingQueue中有幾個int型的欄位表示當前操作items陣列的索引,如下:
//記錄下一個take、remove、peek的索引
int takeIndex;
//記錄下一個put、offer、add的索引
int putIndex;
//佇列中元素的個數
int count;
構造方法
ArrayBlockingQueue一共有三個構造方法,如下:
//只指定容量
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
//指定容量和ReentrantLock是否公平
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
//將集合中的元素初始化佇列的元素
public ArrayBlockingQueue(int capacity, boolean fair,
Collection<? extends E> c) {
this(capacity, fair);
final ReentrantLock lock = this.lock;
lock.lock(); // Lock only for visibility, not mutual exclusion
try {
int i = 0;
try {
for (E e : c) {
checkNotNull(e);
items[i++] = e;
}
} catch (ArrayIndexOutOfBoundsException ex) {
throw new IllegalArgumentException();
}
count = i;
putIndex = (i == capacity) ? 0 : i;
} finally {
lock.unlock();
}
}
從上面的程式碼可以看到,構造方法主要使用容量對items陣列完成初始化,fair引數用來構造一個公平的或不公平的ReentrantLock。對於不瞭解ReentrantLock的朋友,可以參考下面這篇文章: AbstractQueuedSynchronizer詳解(一)——分析ReentrantLock原始碼。
另外一個構造方法就是使用集合中的元素初始化佇列中的元素。
put(E e)方法
put(E e)方法在佇列不滿的情況下,將會將元素新增到佇列尾部,如果佇列已滿,將會阻塞,直到佇列中有剩餘空間可以插入。該方法的實現如下:
public void put(E e) throws InterruptedException {
//檢查元素是否為null,如果是,丟擲NullPointerException
checkNotNull(e);
final ReentrantLock lock = this.lock;
//加鎖
lock.lockInterruptibly();
try {
//如果佇列已滿,阻塞,等待佇列成為不滿狀態
while (count == items.length)
notFull.await();
//將元素入隊
enqueue(e);
} finally {
lock.unlock();
}
}
從上面程式碼可以看到幾點:
1. ArrayBlockingQueue不允許元素為null
2. ArrayBlockingQueue在佇列已滿時將會呼叫notFull的await()方法釋放鎖並處於阻塞狀態
3. 一旦ArrayBlockingQueue不為滿的狀態,就將元素入隊
下面首先看一下enqueue方法。
enqueue(E e)方法
enqueue()方法用於將元素插入到佇列中,由於有元素進入了佇列,所以就通知了為空的Condition,釋放了因佇列為空而阻塞的執行緒。程式碼如下:
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal();
}
從上面的程式碼可以看到,底層的陣列使用的迴圈插入的方式;當一旦插入一個元素後,將呼叫notEmpty.signal()。
當呼叫put()方法時,由於會首先對Lock加鎖,然後再執行插入,所以當很多執行緒一起插入時,是執行緒安全的;而一旦進入lock塊中,噹噹前佇列已滿時,該執行緒就會被阻塞,直到佇列不再為滿的時候,可以重新獲取到鎖執行插入;在插入之後,由於新加了一個元素,需要通知因為空而阻塞的執行緒,所以需要呼叫notEmpty的signal方法。
E take()方法
take()方法用於取走隊頭的元素,當佇列為空時將會阻塞,直到佇列中有元素可取走時將會被釋放。其實現如下:
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
//首先加鎖
lock.lockInterruptibly();
try {
//如果佇列為空,阻塞
while (count == 0)
notEmpty.await();
//佇列不為空,呼叫dequeue()出隊
return dequeue();
} finally {
//釋放鎖
lock.unlock();
}
}
從上面可以看到take()流程和put()流程類似,一旦獲得了鎖之後,如果佇列為空,那麼將阻塞;否則呼叫dequeue()出隊一個元素。
下面看一下dequeue()方法。
dequeu()方法
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
//取走資料
E x = (E) items[takeIndex];
//置為null,以助gc
items[takeIndex] = null;
//迴圈取
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
//通知因佇列滿而阻塞的執行緒
notFull.signal();
return x;
}
程式碼中註釋已經說明了dequeu的流程。
下面兩個過程聯合起來看,如果佇列已滿,那麼呼叫put時,因為呼叫了notFull.await(),那麼那個執行緒將會放棄鎖進入到阻塞狀態,這時一個執行緒取走了一個數據,呼叫了notFull.signal(),這時上一個執行緒有可能就被釋放了然後重新獲得了鎖,呼叫了enqueue()方法將元素插入到佇列中;如果佇列為空,執行take(),那麼由於呼叫了notEmpty.await(),該執行緒將會被阻塞,這時另一個執行緒執行了put()方法插入了一個元素,然後呼叫了notEmpty.signal(),這時取走執行緒被釋放了重新獲取了鎖取走了資料。這基本就是ArrayBlockingQueue的阻塞實現原理。
總結
根據分析原始碼可知,ArrayBlockingQueue的併發阻塞是通過ReentrantLock和Condition來實現的,關於Condition可以參考下面這篇文章:java Condition原始碼分析
ArrayBlockingQueue內部只有一把鎖,意味著同一時刻只有一個執行緒能進行入隊或者出隊的操作。 下一篇文章將介紹LinkedBlockingQueue。