ArrayBlockinQueue及Condition原始碼解析
在進入ArrayBlockingQueue原始碼之前,我們先看看AbstractQueuedSynchronizer類的內部結構: 其有一個重要的靜態內部類Node,將一個執行緒包裝成一個Node,每個Node會持有prev,next,nextWaiter 節點的引用。
AbstractQueuedSynchronizer裡的state屬性就表示被加鎖的次數,如果state>0,則表示當前鎖物件被其他執行緒持有。 AbstractQueuedSynchronizer和其內部類ConditionObject都維護了一條Node的連結串列,且都持有連結串列的頭尾節點,AbstractQueuedSynchronizer通過prev,next 來實現雙向的連結串列;而ConditionObject藉助nextWaiter實現單向的連結串列。
ArrayBlockingQueue在例項化是會呼叫如下建構函式:
final ReentrantLock lock;
private final Condition notEmpty;
private final Condition notFull;
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object [capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
預設情況下使用可重入的排他的非公平鎖,同時通過lock建立兩個Condition物件,notEmpty和notFull共用同一個鎖。
接下來我們看看ArrayBlockingQueue的兩個阻塞方法,put(Object)和take() :
public void put(E e) throws InterruptedException {
checkNotNull (e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
put和take方法都在lock的包圍之中,也就是說同一時間只能有一個執行緒能夠執行put或take方法,向容器內加入或者取出元素。
如果某個執行緒執行put後者take方法,搶到了鎖,那麼會將AQS的state屬性通過CAS增加1,然後進入try程式碼塊。
在高併發下,同一時間肯定有大量執行緒線下執行put或者take方法,但只有一個能夠獲取到鎖,其他的執行緒都會被阻塞。對於未阻塞直接搶到鎖的執行緒,其不會加入到Node連結串列中;對於未搶到鎖而被阻塞的執行緒,會被封裝為Node物件,新增到AQS的成員變數中,以連結串列的形式儲存在head和tail之間。
每個被阻塞的執行緒封裝成一個Node,通過CAS設定為連結串列的新tail,其prev節點為之前的tail,他們在連結串列上的順序就代表著他們搶鎖失敗的順序。
在最開始時,第一個執行緒沒有搶到鎖,先建立一個空的Node,賦值給head,然後將此執行緒封裝為一個Node,賦值給tail,維護head和tail之間的引用,之後沒有搶到鎖的執行緒按順序依次向連結串列末尾新增,賦值為tail。
當第一個直接搶到鎖的執行緒執行unlock方法時,其會喚醒head節點的next,也就是上圖中的put( )方法所線上程,此時這個執行緒被喚醒,重新執行搶鎖操作,有如下兩種可能結果:
- 搶鎖失敗:因為是非公平鎖,中途有一個額外的執行緒插進來,搶到了鎖,head的next節點所表示的執行緒搶鎖再次失敗,被再次阻塞,等待新插進來的執行緒執行unlock來再次喚醒,重複上述流程
- 搶鎖成功:head的next節點將自身設定為新的head節點,之後其執行unlock方法時,也喚醒自身的next節點,重複上述流程
可重入的非公平鎖就是按照上述流程來實現執行緒的互斥的,當加入了Condition之後,又有了一些額外的變化。
當順利的搶到鎖,進入try程式碼塊之後,可能因為某些原因導致不滿足後續執行條件,因而執行condition的await方法。在await方法中,首先會將當前執行緒再次封裝成一個Node節點,新增到Condition物件的屬性中,如果firstWaiter為null,那麼賦值給firstWaiter;如果firstWaiter有值,那麼將此Node定義為新的lastWaiter,之前的lastWaiter.next = Node。
多個執行緒呼叫同一個Condition物件的await方法,會按照呼叫順序依次新增到Condition物件的Node節點連結串列中,ArrayBlockingQueue裡的lock生成了兩個Condition,每個Condition儲存對應方法的Node節點連結串列,一個put,一個take。
在新增到waiter連結串列中後,會釋放當前的鎖,也就是將AQS的state屬性置為0,然後喚醒head節點的next節點代表的執行緒。之後會進入一個while迴圈,當await方法生成的Node節點沒有被加入到head,tail所代表的連結串列中,那麼會阻塞當前執行緒。正常情況下,執行await方法後當前執行緒會被阻塞,直到其對應的在waiter連結串列中的Node節點被新增到head,tail的連結串列中。
當前執行緒通過await方法阻塞了,那麼就需要其他執行緒執行同一個Condition的signal方法來喚醒當前執行緒。比如put方法被阻塞,那麼需要其他執行緒呼叫put對應Condition的signal。在await方法中會釋放持有的鎖,同時喚醒head節點的next對應的執行緒,一般來說此執行緒最終會呼叫await對應Condition的signal方法。
public class ConditionObject implements Condition, java.io.Serializable {
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 將當前執行緒封裝為Node,新增到waiter連結串列中
Node node = addConditionWaiter();
// 釋放鎖,喚醒AQS內head的next節點所代表的執行緒
int savedState = fullyRelease(node);
int interruptMode = 0;
// 如果addConditionWaiter()方法生成的Node不在head,tail連結串列中,正常情況下是的
while (!isOnSyncQueue(node)) {
// 阻塞當前執行緒,直至addConditionWaiter()生成的Node成為了firstWaiter,且被signal()喚醒
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
}
在await方法中,通過當前執行緒生成另外一個Node,被加入到waiter連結串列中,此時當前執行緒對應的Node可能有以下兩種情形:
- 當前執行緒是中途插隊搶到鎖的,所以未被阻塞過,也就是說其不存在head,tail的連結串列中。當前執行緒僅僅對應一個Node,在waiter連結串列上。
- 當前執行緒是阻塞喚醒後搶到鎖的,那麼在執行當前執行緒時,其對應的Node應該就是head節點。在await方法內喚醒head的next節點,也就是此節點的next對應的執行緒。如果next執行緒搶到鎖了,那麼會將next設定為新的head;如果沒有搶到鎖,也就是被中途插入的執行緒搶到鎖了,在中途插入的執行緒釋放鎖後,再次喚醒head的next執行緒。
綜上兩種情形,當await方法執行時,當前執行緒重新生成了一個Node,轉移到了waiter連結串列上。可能還有另一個Node在head,tail連結串列上,是head,但是已經執行過了,不會再次被喚醒
在Condition的signal方法中,其會將firstWaiter節點的Node新增到head,tail連結串列中,賦值為新的tail,然後喚醒Node節點對應的執行緒,同時將firstWaiter指向之前first的nextWaiter,也就是下一個Node。
public class ConditionObject implements Condition, java.io.Serializable {
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
// 喚醒firstWaiter所代表的執行緒
doSignal(first);
}
private void doSignal(Node first) {
do {
// 將firstWaiter 指向 其 nextWaiter, 也就是向後移動一位
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
// 將firstWaiter指向的Node新增到head,tail的連結串列中,設定為新的tail節點
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
// 喚醒firstWaiter關聯的執行緒,也就是上文中await()方法阻塞的一個執行緒
LockSupport.unpark(node.thread);
return true;
}
}
在signal方法中,將await方法生成的Node從waiter連結串列上重新轉移至head,tail連結串列上設定為新的tail
在await( ) 方法內 被阻塞的執行緒被喚醒後,在while迴圈內,再次判斷當前Node是否在head,tail連結串列中,會返回true,也就是能夠跳出while迴圈:
// 判斷await生成的Node是否被加入到head,tail的連結串列中
while (!isOnSyncQueue(node)) {
// 阻塞當前執行緒,直至addConditionWaiter()生成的Node成為了firstWaiter,且被signal()喚醒
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
接下去執行acquireQueued()方法, 嘗試獲得鎖:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 獲取當前Node的prev,也就是前一個節點
final Node p = node.predecessor();
// 如果前一個節點是head,那麼當前執行緒嘗試去獲取鎖
if (p == head && tryAcquire(arg)) {
// 如果獲取成功了,那麼將當前節點設定為新的head,結束流程
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 如果不滿足上述條件,那麼阻塞當前執行緒,直至被其他執行緒用signal再次喚醒
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
如果await代表的執行緒被重新喚醒後,有下列兩種情形:
- 如果其位置不是head的next節點,那麼會被重新阻塞,直至其位置前移到head的next,被head的unlock喚醒
- 如果其位置是head的next,那麼其會重新嘗試去搶鎖,如果沒搶到,那麼阻塞,等待其他執行緒的unlock喚醒;如果搶到了,那麼設定自設為新的head,繼續執行。
如果await的Node位置是head的next,那麼signal會喚醒此Node對應的執行緒,而signal( ) 外圍lock.unlock( ) 方法也能喚醒此Node,也就是一個執行緒的signal和unlock兩次喚醒head的next節點,這中間存在著多種可能性,感興趣的可以自己思考下,不過因為同一時間 只有一個執行緒能夠獲得鎖,基本不會出現執行緒安全問題,這裡不再討論。
await,signal的簡要流程圖如下:
以上就是ArrayBlockingQueue裡阻塞方法put和take的原始碼流程了,寫的不是很好,邏輯不是很清晰,如果有什麼問題可以在部落格下留言,我會盡量回復。
下面做一個總結:
- 一個lock可以new多個Condition,也就是上圖中會有多個waiter連結串列,每個Condition的await和signal都只操作自身的waiter連結串列節點,但他們共用同一個Lock的head,tail連結串列
- 一般只有head節點Thread在執行,其他節點都被阻塞,也就是await和signal執行的執行緒都是head節點所在的執行緒
- await方法會將head的Thread封裝成另一個Node,追加到waiter連結串列的末尾,然後釋放鎖,喚醒head的next節點。流程就相當於將head節點轉移到waiter連結串列的lastWaiter,喚醒head的next
- signal方法會將firstWaiter節點轉移到head,tail連結串列,追加到末尾,同時喚醒firstWaiter節點所在的執行緒
- lock.unlock()方法會釋放鎖,然後喚醒head的next節點所在的執行緒