1. 程式人生 > >ArrayBlockinQueue及Condition原始碼解析

ArrayBlockinQueue及Condition原始碼解析

在進入ArrayBlockingQueue原始碼之前,我們先看看AbstractQueuedSynchronizer類的內部結構: AbstractQueuedSynchronizer內部結構 其有一個重要的靜態內部類Node,將一個執行緒包裝成一個Node,每個Node會持有prev,next,nextWaiter 節點的引用。

AbstractQueuedSynchronizer裡的state屬性就表示被加鎖的次數,如果state>0,則表示當前鎖物件被其他執行緒持有。 AbstractQueuedSynchronizer和其內部類ConditionObject都維護了一條Node的連結串列,且都持有連結串列的頭尾節點,AbstractQueuedSynchronizer通過prev,next 來實現雙向的連結串列;而ConditionObject藉助nextWaiter實現單向的連結串列。

ArrayBlockingQueue在例項化是會呼叫如下建構函式:


    final ReentrantLock lock;
    private final Condition notEmpty;
    private final Condition notFull;
    
	public ArrayBlockingQueue(int capacity, boolean fair) {
	    if (capacity <= 0)
	        throw new IllegalArgumentException();
	    this.items = new Object
[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); }

預設情況下使用可重入的排他的非公平鎖,同時通過lock建立兩個Condition物件,notEmpty和notFull共用同一個鎖。

接下來我們看看ArrayBlockingQueue的兩個阻塞方法,put(Object)和take() :

	public void put(E e) throws InterruptedException {
        checkNotNull
(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); enqueue(e); } finally { lock.unlock(); } } public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); return dequeue(); } finally { lock.unlock(); } }

put和take方法都在lock的包圍之中,也就是說同一時間只能有一個執行緒能夠執行put或take方法,向容器內加入或者取出元素。

如果某個執行緒執行put後者take方法,搶到了鎖,那麼會將AQS的state屬性通過CAS增加1,然後進入try程式碼塊。

在高併發下,同一時間肯定有大量執行緒線下執行put或者take方法,但只有一個能夠獲取到鎖,其他的執行緒都會被阻塞。對於未阻塞直接搶到鎖的執行緒,其不會加入到Node連結串列中;對於未搶到鎖而被阻塞的執行緒,會被封裝為Node物件,新增到AQS的成員變數中,以連結串列的形式儲存在head和tail之間。

阻塞佇列方法連結串列圖

每個被阻塞的執行緒封裝成一個Node,通過CAS設定為連結串列的新tail,其prev節點為之前的tail,他們在連結串列上的順序就代表著他們搶鎖失敗的順序。

在最開始時,第一個執行緒沒有搶到鎖,先建立一個空的Node,賦值給head,然後將此執行緒封裝為一個Node,賦值給tail,維護head和tail之間的引用,之後沒有搶到鎖的執行緒按順序依次向連結串列末尾新增,賦值為tail。

當第一個直接搶到鎖的執行緒執行unlock方法時,其會喚醒head節點的next,也就是上圖中的put( )方法所線上程,此時這個執行緒被喚醒,重新執行搶鎖操作,有如下兩種可能結果:

  1. 搶鎖失敗:因為是非公平鎖,中途有一個額外的執行緒插進來,搶到了鎖,head的next節點所表示的執行緒搶鎖再次失敗,被再次阻塞,等待新插進來的執行緒執行unlock來再次喚醒,重複上述流程
  2. 搶鎖成功:head的next節點將自身設定為新的head節點,之後其執行unlock方法時,也喚醒自身的next節點,重複上述流程

可重入的非公平鎖就是按照上述流程來實現執行緒的互斥的,當加入了Condition之後,又有了一些額外的變化。

當順利的搶到鎖,進入try程式碼塊之後,可能因為某些原因導致不滿足後續執行條件,因而執行condition的await方法。在await方法中,首先會將當前執行緒再次封裝成一個Node節點,新增到Condition物件的屬性中,如果firstWaiter為null,那麼賦值給firstWaiter;如果firstWaiter有值,那麼將此Node定義為新的lastWaiter,之前的lastWaiter.next = Node。

在這裡插入圖片描述

多個執行緒呼叫同一個Condition物件的await方法,會按照呼叫順序依次新增到Condition物件的Node節點連結串列中,ArrayBlockingQueue裡的lock生成了兩個Condition,每個Condition儲存對應方法的Node節點連結串列,一個put,一個take。

在新增到waiter連結串列中後,會釋放當前的鎖,也就是將AQS的state屬性置為0,然後喚醒head節點的next節點代表的執行緒。之後會進入一個while迴圈,當await方法生成的Node節點沒有被加入到head,tail所代表的連結串列中,那麼會阻塞當前執行緒。正常情況下,執行await方法後當前執行緒會被阻塞,直到其對應的在waiter連結串列中的Node節點被新增到head,tail的連結串列中。

當前執行緒通過await方法阻塞了,那麼就需要其他執行緒執行同一個Condition的signal方法來喚醒當前執行緒。比如put方法被阻塞,那麼需要其他執行緒呼叫put對應Condition的signal。在await方法中會釋放持有的鎖,同時喚醒head節點的next對應的執行緒,一般來說此執行緒最終會呼叫await對應Condition的signal方法。

public class ConditionObject implements Condition, java.io.Serializable {

	public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            // 將當前執行緒封裝為Node,新增到waiter連結串列中
            Node node = addConditionWaiter();
            // 釋放鎖,喚醒AQS內head的next節點所代表的執行緒
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            // 如果addConditionWaiter()方法生成的Node不在head,tail連結串列中,正常情況下是的
            while (!isOnSyncQueue(node)) {
            	// 阻塞當前執行緒,直至addConditionWaiter()生成的Node成為了firstWaiter,且被signal()喚醒
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

}

在await方法中,通過當前執行緒生成另外一個Node,被加入到waiter連結串列中,此時當前執行緒對應的Node可能有以下兩種情形:

  1. 當前執行緒是中途插隊搶到鎖的,所以未被阻塞過,也就是說其不存在head,tail的連結串列中。當前執行緒僅僅對應一個Node,在waiter連結串列上。
  2. 當前執行緒是阻塞喚醒後搶到鎖的,那麼在執行當前執行緒時,其對應的Node應該就是head節點。在await方法內喚醒head的next節點,也就是此節點的next對應的執行緒。如果next執行緒搶到鎖了,那麼會將next設定為新的head;如果沒有搶到鎖,也就是被中途插入的執行緒搶到鎖了,在中途插入的執行緒釋放鎖後,再次喚醒head的next執行緒。

綜上兩種情形,當await方法執行時,當前執行緒重新生成了一個Node,轉移到了waiter連結串列上。可能還有另一個Node在head,tail連結串列上,是head,但是已經執行過了,不會再次被喚醒

在Condition的signal方法中,其會將firstWaiter節點的Node新增到head,tail連結串列中,賦值為新的tail,然後喚醒Node節點對應的執行緒,同時將firstWaiter指向之前first的nextWaiter,也就是下一個Node。

public class ConditionObject implements Condition, java.io.Serializable {

	public final void signal() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
            	// 喚醒firstWaiter所代表的執行緒
                doSignal(first);
    }

	private void doSignal(Node first) {
            do {
            	// 將firstWaiter 指向 其 nextWaiter, 也就是向後移動一位
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;
            } while (!transferForSignal(first) &&
                     (first = firstWaiter) != null);
        }

	final boolean transferForSignal(Node node) {
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;
        // 將firstWaiter指向的Node新增到head,tail的連結串列中,設定為新的tail節點
        Node p = enq(node);
        int ws = p.waitStatus;
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        	// 喚醒firstWaiter關聯的執行緒,也就是上文中await()方法阻塞的一個執行緒
            LockSupport.unpark(node.thread);
        return true;
    }
}

在signal方法中,將await方法生成的Node從waiter連結串列上重新轉移至head,tail連結串列上設定為新的tail

在await( ) 方法內 被阻塞的執行緒被喚醒後,在while迴圈內,再次判斷當前Node是否在head,tail連結串列中,會返回true,也就是能夠跳出while迴圈:

		// 判斷await生成的Node是否被加入到head,tail的連結串列中
		while (!isOnSyncQueue(node)) {
         	// 阻塞當前執行緒,直至addConditionWaiter()生成的Node成為了firstWaiter,且被signal()喚醒
             LockSupport.park(this);
             if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                 break;
         }

接下去執行acquireQueued()方法, 嘗試獲得鎖:

	final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
            	// 獲取當前Node的prev,也就是前一個節點
                final Node p = node.predecessor();
                // 如果前一個節點是head,那麼當前執行緒嘗試去獲取鎖
                if (p == head && tryAcquire(arg)) {
                	// 如果獲取成功了,那麼將當前節點設定為新的head,結束流程
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                // 如果不滿足上述條件,那麼阻塞當前執行緒,直至被其他執行緒用signal再次喚醒
                if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

如果await代表的執行緒被重新喚醒後,有下列兩種情形:

  1. 如果其位置不是head的next節點,那麼會被重新阻塞,直至其位置前移到head的next,被head的unlock喚醒
  2. 如果其位置是head的next,那麼其會重新嘗試去搶鎖,如果沒搶到,那麼阻塞,等待其他執行緒的unlock喚醒;如果搶到了,那麼設定自設為新的head,繼續執行。

如果await的Node位置是head的next,那麼signal會喚醒此Node對應的執行緒,而signal( ) 外圍lock.unlock( ) 方法也能喚醒此Node,也就是一個執行緒的signal和unlock兩次喚醒head的next節點,這中間存在著多種可能性,感興趣的可以自己思考下,不過因為同一時間 只有一個執行緒能夠獲得鎖,基本不會出現執行緒安全問題,這裡不再討論。

await,signal的簡要流程圖如下:

await,signal的流程圖

以上就是ArrayBlockingQueue裡阻塞方法put和take的原始碼流程了,寫的不是很好,邏輯不是很清晰,如果有什麼問題可以在部落格下留言,我會盡量回復。

下面做一個總結:

  1. 一個lock可以new多個Condition,也就是上圖中會有多個waiter連結串列,每個Condition的await和signal都只操作自身的waiter連結串列節點,但他們共用同一個Lock的head,tail連結串列
  2. 一般只有head節點Thread在執行,其他節點都被阻塞,也就是await和signal執行的執行緒都是head節點所在的執行緒
  3. await方法會將head的Thread封裝成另一個Node,追加到waiter連結串列的末尾,然後釋放鎖,喚醒head的next節點。流程就相當於將head節點轉移到waiter連結串列的lastWaiter,喚醒head的next
  4. signal方法會將firstWaiter節點轉移到head,tail連結串列,追加到末尾,同時喚醒firstWaiter節點所在的執行緒
  5. lock.unlock()方法會釋放鎖,然後喚醒head的next節點所在的執行緒