1. 程式人生 > >二十三、併發程式設計之深入解析Condition原始碼

二十三、併發程式設計之深入解析Condition原始碼

一、Condition簡介

1、Object的wait和notify/notifyAll方法與Condition區別

任何一個java物件都繼承於Object類,線上程間實現通訊的往往會應用到Object的幾個方法,比如wait(),wait(long timeout),wait(long timeout, int nanos)與notify(),notifyAll()幾個方法實現等待/通知機制。在java Lock體系下也有方法實現等待/通知機制。從整體上來看Object的wait/notify是與物件監視器配合完成執行緒間的等待/通知機制,而Condition是與Lock配合完成等待通知機制,前者是java底層級別的,後者是語言級別的,具有更高的可控制性和擴充套件性。

兩者除了在使用方式上不同外,在功能特性上還是有很多的不同:

  • 1.Condition能夠支援不響應中斷,而Object的wait/notify不支援;
  • 2.Condition能夠支援多個等待佇列(new多個Condition物件),而Object的wait/notify只能支援一個;
  • 3.Condition能夠支援超時時間的設定,而Object的wait/notify不支援

2、參照Object的wait和notify/notifyAll方法,Condition也提供了同樣的方法:

1.針對Object的wait方法

  • 1.void await() throws InterruptedException:當前執行緒進入等待狀態,如果其他執行緒呼叫condition的signal或者signalAll方法並且當前執行緒獲取Lock從await方法返回,如果在等待狀態中被中斷會丟擲被中斷異常;
  • 2.long awaitNanos(long nanosTimeout):當前執行緒進入等待狀態直到被通知,中斷或者超時;
  • 3.boolean await(long time, TimeUnit unit)throws InterruptedException:同第二種,支援自定義時間單位
  • 4.boolean awaitUntil(Date deadline) throws InterruptedException:當前執行緒進入等待狀態直到被通知,中斷或者到了某個時間

2.針對Object的notify/notifyAll方法

  • 1.void signal():喚醒一個等待在condition上的執行緒,將該執行緒從等待佇列中轉移到同步佇列中,如果在同步佇列中能夠競爭到Lock則可以從等待方法中返回。
  • 2.void signalAll():與1的區別在於能夠喚醒所有等待在condition上的執行緒

二、Condition實現原理分析

1、等待佇列

建立一個condition物件是通過lock.newCondition(),而這個方法實際上是會new出一個ConditionObject物件,該類是AQS(AbstractQueuedSynchronizer)的一個內部類。condition要和lock配合使用的,也就是condition和Lock是繫結在一起的,而lock的實現原理又依賴於AQS。在鎖機制的實現上,AQS內部維護了一個同步佇列,如果是獨佔式鎖的話,所有獲取鎖失敗的執行緒的尾插入到同步佇列,同樣的,condition內部也是使用同樣的方式,內部維護了一個 等待佇列,所有呼叫condition.await方法的執行緒會加入到等待佇列中,並且執行緒狀態轉換為等待狀態。另外注意到ConditionObject中有兩個成員變數:

private transient Node firstWaiter;//等待佇列頭指標
private transient Node lastWaiter;//等待佇列尾指標

這樣我們就可以看出來ConditionObject通過持有等待佇列的頭尾指標來管理等待佇列。主要注意的是Node類複用了在AQS中的Node類,Node類有這樣一個屬性:

//後繼節點
Node nextWaiter;

進一步說明,等待佇列是一個單向佇列,而AQS同步佇列是一個雙向佇列。

接下來我們用一個demo,新建了10個執行緒,沒有執行緒先獲取鎖,然後呼叫condition.await方法釋放鎖將當前執行緒加入到等待佇列中,通過debug控制當走到第10個執行緒的時候檢視firstWaiter即等待佇列中的頭結點。

public static void main(String[] args) {
    for (int i = 0; i < 10; i++) {
        Thread thread = new Thread(() -> {
            lock.lock();
            try {
                condition.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally {
                lock.unlock();
            }
        });
        thread.start();
    }
}

debug模式下情景圖如下:
在這裡插入圖片描述
從這個圖我們可以很清楚的看到這樣幾點:

  1. 呼叫condition.await方法後執行緒依次尾插入到等待佇列中,如圖佇列中的執行緒引用依次為Thread-0,Thread-1,Thread-2…Thread-8;
  2. 等待佇列是一個單向佇列。通過我們的猜想然後進行實驗驗證,我們可以得出等待佇列的示意圖如下圖所示:2. 等待佇列是一個單向佇列。

我們可以得出等待佇列的示意圖如下圖所示:
在這裡插入圖片描述
同時還有一點需要注意的是:我們可以多次呼叫lock.newCondition()方法建立多個condition物件,也就是一個lock可以持有多個等待佇列。而在之前利用Object的方式實際上是指在物件Object物件監視器上只能擁有一個同步佇列和一個等待佇列,而併發包中的Lock擁有一個同步佇列和多個等待佇列。示意圖如下:
在這裡插入圖片描述
如圖所示,ConditionObject是AQS的內部類,因此每個ConditionObject能夠訪問到AQS提供的方法,相當於每個Condition都擁有所屬同步器的引用。

2、await實現原理

當呼叫condition.await()方法後會使得當前獲取lock的執行緒進入到等待佇列,如果該執行緒能夠從await()方法返回的話一定是該執行緒獲取了與condition相關聯的lock。

1.await()方法原始碼

public final void await() throws InterruptedException {
    if (Thread.interrupted())//執行緒中斷丟擲異常
        throw new InterruptedException();
    // 將當前執行緒包裝成Node,尾插入到等待佇列中
    Node node = addConditionWaiter();
    // 釋放當前執行緒所佔用的lock,在釋放的過程中會喚醒同步佇列中的下一個節點
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {//如果執行緒沒有進入同步佇列
        // 當前執行緒進入到等待狀態
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    // 自旋等待獲取到同步狀態(即獲取到lock)
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    //處理被中斷的情況
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

我們都知道噹噹前執行緒呼叫condition.await()方法後,會使得當前執行緒釋放lock然後加入到等待佇列中,直至被signal/signalAll後會使得當前執行緒從等待佇列中移至到同步佇列中去,直到獲得了lock後才會從await方法返回,或者在等待時被中斷會做中斷處理。

2.將當前執行緒新增到等待佇列中去

在第1步中呼叫addConditionWaiter將當前執行緒新增到等待佇列中,該方法原始碼為:

private Node addConditionWaiter() {
    Node t = lastWaiter;//尾節點
    // If lastWaiter is cancelled, clean out.
    if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();//清除等待節點
        t = lastWaiter;
    }
    //將當前執行緒包裝成Node
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    if (t == null)//佇列為空
        firstWaiter = node;//設定node為頭節點
    else//佇列不為空。從佇列尾部插入
        //尾插入
        t.nextWaiter = node;
    //更新lastWaiter
    lastWaiter = node;
    return node;
}

將當前節點包裝成Node,如果等待佇列的firstWaiter為null的話(等待佇列為空佇列),則將firstWaiter指向當前的Node,否則,更新lastWaiter(尾節點)即可。就是通過尾插入的方式將當前執行緒封裝的Node插入到等待佇列中即可,同時可以看出等待佇列是一個不帶頭結點的鏈式佇列,學習AQS知道同步佇列是一個帶頭結點的鏈式佇列,這是兩者的一個區別。

3. 釋放鎖的過程

將當前節點插入到等待對列之後,會使當前執行緒釋放lock,由fullyRelease方法實現。
fullyRelease原始碼為:

final int fullyRelease(Node node) {
    boolean failed = true;
    try {
        int savedState = getState();
        if (release(savedState)) {
            //成功釋放同步狀態
            failed = false;
            return savedState;
        } else {
            //不成功釋放同步狀態丟擲異常
            throw new IllegalMonitorStateException();
        }
    } finally {
        if (failed)
            node.waitStatus = Node.CANCELLED;
    }
}

呼叫AQS的模板方法release方法釋放AQS的同步狀態並且喚醒在同步佇列中頭結點的後繼節點引用的執行緒,如果釋放成功則正常返回,若失敗的話就丟擲異常。

4.怎樣從await方法退出?

現在回過頭再來看await方法有這樣一段邏輯:

while (!isOnSyncQueue(node)) {//如果執行緒沒有進入同步佇列
    //當前執行緒進入到等待狀態
    LockSupport.park(this);
    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
        break;
}

很顯然,當執行緒第一次呼叫condition.await()方法時,會進入到這個while()迴圈中,然後通過LockSupport.park(this)方法使得當前執行緒進入等待狀態,那麼要想退出這個await方法第一個前提條件自然而然的是要先退出這個while迴圈,出口就只剩下兩個地方:

  • 邏輯走到break退出while迴圈;
  • while迴圈中的邏輯判斷為false。

再看程式碼出現第1種情況的條件是當前等待的執行緒被中斷後程式碼會走到break退出,第二種情況是當前節點被移動到了同步佇列中(即另外執行緒呼叫的condition的signal或者signalAll方法),while中邏輯判斷為false後結束while迴圈。
總結下,就是當前執行緒被中斷或者呼叫condition.signal/condition.signalAll方法當前節點移動到了同步佇列後 ,這是當前執行緒退出await方法的前提條件。當退出while迴圈後就會呼叫acquireQueued(node, savedState),該方法的作用是在自旋過程中執行緒不斷嘗試獲取同步狀態,直至成功(執行緒獲取到lock)。這樣也說明了退出await方法必須是已經獲得了condition引用(關聯)的lock。

5.await方法示意圖如下圖:

在這裡插入圖片描述
如圖,呼叫condition.await方法的執行緒必須是已經獲得了lock,也就是當前執行緒是同步佇列中的頭結點。呼叫該方法後會使得當前執行緒所封裝的Node尾插入到等待佇列中。

5.超時機制的支援

condition還額外支援了超時機制,使用者可呼叫方法awaitNanos,awaitUtil。這兩個方法的實現原理,基本上與AQS中的tryAcquire方法如出一轍。

6.不響應中斷的支援

要想不響應中斷可以呼叫condition.awaitUninterruptibly()方法,該方法的原始碼為:

public final void awaitUninterruptibly() {
    Node node = addConditionWaiter();//加入等待佇列
    int savedState = fullyRelease(node);//釋放執行緒所有鎖
    boolean interrupted = false;
    while (!isOnSyncQueue(node)) {//如果執行緒沒有進入同步佇列
    	//當前執行緒進入等待狀態
        LockSupport.park(this);
        if (Thread.interrupted())
            interrupted = true;
    }
    if (acquireQueued(node, savedState) || interrupted)
        selfInterrupt();
}

這段方法與上面的await方法基本一致,只不過減少了對中斷的處理,並省略了reportInterruptAfterWait方法拋被中斷的異常。

3、signal/signalAll實現原理

呼叫condition的signal或者signalAll方法可以將等待佇列中等待時間最長的節點移動到同步佇列中,使得該節點能夠有機會獲得lock。按照等待佇列是先進先出(FIFO)的,所以等待佇列的頭節點必然會是等待時間最長的節點,也就是每次呼叫condition的signal方法是將頭節點移動到同步佇列中。

1.signal方法原始碼為:

public final void signal() {
    //先檢測當前執行緒是否已經獲取lock
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    //獲取等待佇列中第一個節點,之後的操作都是針對這個節點
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}

signal方法首先會檢測當前執行緒是否已經獲取lock,如果沒有獲取lock會直接丟擲異常,如果獲取的話再得到等待佇列的頭指標引用的節點,之後的操作的doSignal方法也是基於該節點。

2.doSignal方法原始碼為:

private void doSignal(Node first) {
    do {
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        //1. 將頭結點從等待佇列中移除
        first.nextWaiter = null;
        //2. while中transferForSignal方法對頭結點做真正的處理
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}

3.transferForSignal 原始碼

具體邏輯請看註釋,真正對頭節點做處理的邏輯在transferForSignal放,該方法原始碼為:

final boolean transferForSignal(Node node) {
    //1. 更新狀態為0
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;

    //2.將該節點移入到同步佇列中去
    Node p = enq(node);
    int ws = p.waitStatus;
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}

關鍵邏輯請看註釋,這段程式碼主要做了兩件事情

  • 將頭結點的狀態更改為CONDITION;
  • 呼叫enq方法,將該節點尾插入到同步佇列中,關於enq方法請看AQS的底層實現這篇文章。

現在我們可以得出結論:呼叫condition的signal的前提條件是當前執行緒已經獲取了lock,該方法會使得等待佇列中的頭節點即等待時間最長的那個節點移入到同步佇列,而移入到同步佇列後才有機會使得等待執行緒被喚醒,即從await方法中的LockSupport.park(this)方法中返回,從而才有機會使得呼叫await方法的執行緒成功退出。

4.signal執行示意圖如下圖:

在這裡插入圖片描述

5.signalAll原始碼

signalAll與signal方法的區別體現在doSignalAll方法上,前面我們已經知道doSignal方法只會對等待佇列的頭節點進行操作,而doSignalAll的原始碼為:

private void doSignalAll(Node first) {
    lastWaiter = firstWaiter = null;
    do {
        Node next = first.nextWaiter;
        first.nextWaiter = null;
        transferForSignal(first);
        first = next;
    } while (first != null);
}

該方法只不過時間等待佇列中的每一個節點都移入到同步佇列中,即“通知”當前呼叫condition.await()方法的每一個執行緒。

三、await與signal/signalAll的結合思考

文章開篇提到等待/通知機制,通過使用condition提供的await和signal/signalAll方法就可以實現這種機制,await和signal和signalAll方法就像一個開關控制著執行緒A(等待方)和執行緒B(通知方)。它們之間的關係可以用下面一個圖來表現得更加貼切:
在這裡插入圖片描述
如圖,執行緒awaitThread先通過lock.lock()方法獲取鎖成功後呼叫了condition.await方法進入等待佇列,而另一個執行緒signalThread通過lock.lock()方法獲取鎖成功後呼叫了condition.signal或者signalAll方法,使得執行緒awaitThread能夠有機會移入到同步佇列中,當其他執行緒釋放lock後使得執行緒awaitThread能夠有機會獲取lock,從而使得執行緒awaitThread能夠從await方法中退出執行後續操作。如果awaitThread獲取lock失敗會直接進入到同步佇列。