併發佇列-有界阻塞佇列ArrayBlockingQueue原理探究
一、 前言
上節介紹了無界連結串列方式的阻塞佇列LinkedBlockingQueue,本節來研究下有界使用陣列方式實現的阻塞佇列ArrayBlockingQueue
二、 ArrayBlockingQueue類圖結構
如圖ArrayBlockingQueue內部有個陣列items用來存放佇列元素,putindex下標標示入隊元素下標,takeIndex是出隊下標,count統計佇列元素個數,從定義可知道並沒有使用volatile修飾,這是因為訪問這些變數使用都是在鎖塊內,並不存在可見性問題。另外有個獨佔鎖lock用來對出入隊操作加鎖,這導致同時只有一個執行緒可以訪問入隊出隊,另外notEmpty,notFull條件變數用來進行出入隊的同步。
另外建構函式必須傳入佇列大小引數,所以為有界佇列,預設是Lock為非公平鎖。
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
三、offer操作
在隊尾插入元素,如果佇列滿則返回false,否者入隊返回true。
public boolean offer(E e) {
//e為null,則丟擲NullPointerException異常
checkNotNull(e);
//獲取獨佔鎖
final ReentrantLock lock = this.lock;
lock.lock();
try {
//如果佇列滿則返回false
if (count == items.length)
return false;
else {
//否者插入元素
insert(e);
return true;
}
} finally {
//釋放鎖
lock.unlock();
}
}
private void insert(E x) {
//元素入隊
items[putIndex] = x;
//計算下一個元素應該存放的下標
putIndex = inc(putIndex);
++count;
notEmpty.signal();
}
//迴圈佇列,計算下標
final int inc(int i) {
return (++i == items.length) ? 0 : i;
}
這裡由於在操作共享變數前加了鎖,所以不存在記憶體不可見問題,加過鎖後獲取的共享變數都是從主記憶體獲取的,而不是在CPU快取或者暫存器裡面的值,釋放鎖後修改的共享變數值會重新整理會主記憶體中。
另外這個佇列是使用迴圈陣列實現,所以計算下一個元素存放下標時候有些特殊。另外insert後呼叫 notEmpty.signal();是為了啟用呼叫notEmpty.await()阻塞後放入notEmpty條件佇列中的執行緒。
四、put操作
在佇列尾部新增元素,如果佇列滿則等待佇列有空位置插入後返回
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
//獲取可被中斷鎖
lock.lockInterruptibly();
try {
//如果佇列滿,則把當前執行緒放入notFull管理的條件佇列
while (count == items.length)
notFull.await();
//插入元素
insert(e);
} finally {
lock.unlock();
}
}
需要注意的是如果佇列滿了那麼當前執行緒會阻塞,知道出隊操作呼叫了notFull.signal方法啟用該執行緒。
程式碼邏輯很簡單,但是這裡需要思考一個問題為啥呼叫lockInterruptibly方法而不是Lock方法。我的理解是因為呼叫了條件變數的await()方法,而await()方法會在中斷標誌設定後丟擲InterruptedException異常後退出,所以還不如在加鎖時候先看中斷標誌是不是被設定了,如果設定了直接丟擲InterruptedException異常,就不用再去獲取鎖了。然後看了其他併發類裡面凡是呼叫了await的方法獲取鎖時候都是使用的lockInterruptibly方法而不是Lock也驗證了這個想法。
五、poll操作
從隊頭獲取並移除元素,佇列為空,則返回null。
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
//當前佇列為空則返回null,否者
return (count == 0) ? null : extract();
} finally {
lock.unlock();
}
}
private E extract() {
final Object[] items = this.items;
//獲取元素值
E x = this.<E>cast(items[takeIndex]);
//陣列中值值為null;
items[takeIndex] = null;
//隊頭指標計算,佇列元素個數減一
takeIndex = inc(takeIndex);
--count;
//傳送訊號啟用notFull條件佇列裡面的執行緒
notFull.signal();
return x;
}
六、take操作
從隊頭獲取元素,如果佇列為空則阻塞直到佇列有元素。
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
//佇列為空,則等待,直到佇列有元素
while (count == 0)
notEmpty.await();
return extract();
} finally {
lock.unlock();
}
}
需要注意的是如果佇列為空,當前執行緒會被掛起放到notEmpty的條件佇列裡面,直到入隊操作執行呼叫notEmpty.signal後當前執行緒才會被啟用,await才會返回。
七、peek操作
返回佇列頭元素但不移除該元素,佇列為空,返回null
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
//佇列為空返回null,否者返回頭元素
return (count == 0) ? null : itemAt(takeIndex);
} finally {
lock.unlock();
}
}
final E itemAt(int i) {
return this.<E>cast(items[i]);
}
八、 size操作
獲取佇列元素個數,非常精確因為計算size時候加了獨佔鎖,其他執行緒不能入隊或者出隊或者刪除元素
public int size() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return count;
} finally {
lock.unlock();
}
}
九總結
ArrayBlockingQueue通過使用全域性獨佔鎖實現同時只能有一個執行緒進行入隊或者出隊操作,這個鎖的粒度比較大,有點類似在方法上新增synchronized的意味。其中offer,poll操作通過簡單的加鎖進行入隊出隊操作,而put,take則使用了條件變數實現如果佇列滿則等待,如果佇列空則等待,然後分別在出隊和入隊操作中傳送訊號啟用等待執行緒實現同步。另外相比LinkedBlockingQueue,ArrayBlockingQueue的size操作的結果是精確的,因為計算前加了全域性鎖。
歡迎看官們拍磚,讓我們共同進步!