1. 程式人生 > 實用技巧 >深入理解 JUC:AQS 佇列同步器

深入理解 JUC:AQS 佇列同步器

AbstractQueuedSynchronizer 簡稱 AQS,可能我們幾乎不會直接去使用它,但它卻是 JUC 的核心基礎元件,支撐著 java 鎖和同步器的實現,例如 ReentrantLock、ReentrantReadWriteLock、CountDownLatch,以及 Semaphore 等。大神Doug Lea在設計 JUC 包時希望能夠抽象一個基礎且通用的元件以支撐上層模組的實現,AQS 應運而生。

AQS 本質上是一個 FIFO 的雙向佇列,執行緒被包裝成結點的形式,基於自旋機制在佇列中等待獲取資源(這裡的資源可以簡單理解為物件鎖)。AQS 在設計上實現了兩類佇列,即同步佇列和條件佇列,其中同步佇列服務於執行緒阻塞等待獲取資源,而條件佇列則服務於執行緒因某個條件不滿足而進入等待狀態。條件佇列中的執行緒實際上已經獲取到了資源,但是沒有能夠繼續執行下去的條件,所以被打入條件佇列並釋放持有的資源,以讓渡其它執行緒執行,如果未來某個時刻條件得以滿足,則該執行緒會被從條件佇列轉移到同步佇列,繼續參與競爭資源,以繼續向下執行。

本文我們主要分析 AQS 的設計與實現,包括 LockSupport 工具類、同步佇列、條件佇列,以及 AQS 資源獲取和釋放的通用過程。AQS 採用模板方法設計模式,具體獲取資源和釋放資源的過程都交由子類實現,對於這些方法的分析將留到後面分析具體子類的文章中再展開。

LockSupport 工具類

LockSupport 工具類是 JUC 的基礎元件,主要作用是用來阻塞和喚醒執行緒,底層依賴於 Unsafe 類實現。LockSupport 主要定義類 2 類方法:park 和 unpark,其中 park 方法用於阻塞當前執行緒,而 unpark 方法用於喚醒處於阻塞狀態的指定執行緒。

下面的示例演示了 park 和 unpark 方法的基本使用:

Thread thread = new Thread(() -> {
    System.out.println("Thread start: " + Thread.currentThread().getName());
    LockSupport.park(); // 阻塞自己
    System.out.println("Thread end: " + Thread.currentThread().getName());
});

thread.setName("A");
thread.start();

System.out.println("Main thread sleep 3 second: " + Thread.currentThread().getId());
TimeUnit.SECONDS.sleep(3);
LockSupport.unpark(thread); // 喚醒執行緒 A

執行緒 A 在啟動之後呼叫了LockSupport#park方法將自己阻塞,主執行緒在休息 3 秒之後呼叫LockSupport#unpark方法執行緒 A 喚醒。執行結果:

Thread start: A
Main thread sleep 3 second: 1
Thread end: A

LockSupport 針對 park 方法提供了多種實現,如下:

public static void park()
public static void park(Object blocker)
public static void parkNanos(long nanos)
public static void parkNanos(Object blocker, long nanos)
public static void parkUntil(long deadline)
public static void parkUntil(Object blocker, long deadline)

由方法命名不難看出,parkNanos 和 parkUntil 都屬於 park 方法的超時版本,區別在於 parkNanos 方法接收一個納秒單位的時間值,用於指定阻塞的時間長度,例如當設定nanos=3000000000時,執行緒將阻塞 3 秒後甦醒,而 parkUntil 方法則接收一個時間戳,引數 deadline 用於指定阻塞的到期時間。

所有的 park 方法都提供了包含Object blocker引數的過載版本,引數 blocker 指代導致當前執行緒阻塞等待的鎖物件,方便問題排查和系統監控,而在 LockSupport 最開始被設計時卻忽視了這一點,導致線上程 dump 時無法提供阻塞物件的相關資訊,這一點在 java 6 中得以改進。實際開發中如果使用到了 LockSupport 工具類,推薦使用帶 blocker 引數的版本。

下面以LockSupport#park(java.lang.Object)方法為例來看一下具體的實現,如下:

public static void park(Object blocker) {
    // 獲取當前執行緒物件
    Thread t = Thread.currentThread();
    // 記錄當前執行緒阻塞等待的鎖物件(設定執行緒物件的 parkBlocker 為引數指定的 blocker 物件)
    setBlocker(t, blocker);
    // 阻塞執行緒
    UNSAFE.park(false, 0L);
    // 執行緒恢復執行,清除 parkBlocker 引數記錄的鎖物件
    setBlocker(t, null);
}

具體實現比較簡單,阻塞執行緒的操作依賴於 Unsafe 類實現。上述方法會呼叫LockSupport#setBlocker方法基於 Unsafe 類將引數指定的 blocker 物件記錄到當前執行緒物件的Thread#parkBlocker欄位中,然後進入阻塞狀態,並在被喚醒之後清空對應的Thread#parkBlocker欄位。

當一個執行緒呼叫 park 方法進入阻塞狀態之後,會在滿足以下 3 個條件之一時從阻塞狀態中甦醒:

  1. 其它執行緒呼叫 unpark 方法喚醒當前執行緒。
  2. 其它執行緒中斷了當前執行緒的阻塞狀態。
  3. 方法 park 因為一些不合邏輯的原因退出。

執行緒在從 park 方法中返回時並不會攜帶具體的返回原因,呼叫者需要自行檢測,例如再次檢查之前呼叫 park 方法的條件是否仍然滿足以予以推測。

方法LockSupport#unpark的實現同樣基於 Unsafe 類實現,不同於 park 的多版本實現,LockSupport 針對 unpark 方法僅提供了單一實現,如下:

public static void unpark(Thread thread) {
    if (thread != null) {
        UNSAFE.unpark(thread);
    }
}

需要注意的一點是,如果事先針對某個執行緒呼叫了 unpark 方法,則該執行緒繼續呼叫 park 方法並不會進入阻塞狀態,而是會立即返回,並且 park 方法是不可重入的。

同步佇列

同步佇列的作用在於管理競爭資源的執行緒,當一個執行緒競爭資源失敗會被記錄到同步佇列的末端,並以自旋的方式迴圈檢查能夠成功獲取到資源。AQS 的同步佇列基於 CLH(Craig, Landin, and Hagersten) 鎖思想進行設計和實現。CLH 鎖是一種基於連結串列的可擴充套件、高效能,且具備公平性的自旋鎖。執行緒以連結串列結點的形式進行組織,在等待期間相互獨立的執行自旋,並不斷輪詢前驅結點的狀態,如果發現前驅結點上的執行緒釋放了資源則嘗試獲取。

CLH 鎖是 AQS 佇列同步器實現的基礎,AQS 以內部類 Node 的形式定義了同步佇列結點,包括下一小節介紹的條件佇列,同樣以 Node 定義結點。Node 的欄位定義如下:

static final class Node {

    /** 模式定義 */

    static final Node SHARED = new Node();
    static final Node EXCLUSIVE = null;

    /** 執行緒狀態 */

    static final int CANCELLED = 1;
    static final int SIGNAL = -1;
    static final int CONDITION = -2;
    static final int PROPAGATE = -3;

    /** 執行緒等待狀態 */
    volatile int waitStatus;

    /** 前驅結點 */
    volatile Node prev;
    /** 後置結點 */
    volatile Node next;

    /** 持有的執行緒物件 */
    volatile Thread thread;

    /** 對於獨佔模式而言,指向下一個處於 CONDITION 等待狀態的結點;對於共享模式而言,則為 SHARED 結點 */
    Node nextWaiter;

    // ... 省略方法定義
}

由上述欄位定義可以看出,位於 CLH 連結串列中的執行緒以 2 種模式在等待資源,即 SHARED 和 EXCLUSIVE,其中 SHARED 表示共享模式,而 EXCLUSIVE 表示獨佔模式。共享模式與獨佔模式的主要區別在於,同一時刻獨佔模式只能有一個執行緒獲取到資源,而共享模式在同一時刻可以有多個執行緒獲取到資源。典型的場景就是讀寫鎖,讀操作可以有多個執行緒同時獲取到讀鎖資源,而寫操作同一時刻只能有一個執行緒獲取到寫鎖資源,其它執行緒在嘗試獲取資源時都會被阻塞。

AQS 的 CLH 鎖為處於 CLH 連結串列中的執行緒定義了 4 種狀態,包括 CANCELLED、SIGNAL、CONDITION,以及 PROPAGATE,並以Node#waitStatus欄位進行記錄。這 4 種狀態的含義分別為:

  • CANCELLED:表示當前執行緒處於取消狀態,一般是因為等待超時或者被中斷,處於取消狀態的執行緒不會再參與到競爭中,並一直保持該狀態。
  • SIGNAL:表示當前結點後繼結點上的執行緒正在等待被喚醒,如果當前執行緒釋放了持有的資源或者被取消,需要喚醒後繼結點上的執行緒。
  • CONDITION:表示當前執行緒正在等待某個條件,當某個執行緒在呼叫了Condition#signal方法後,當前結點將會被從條件佇列轉移到同步佇列中,參與競爭資源。
  • PROPAGATE:處於該狀態的執行緒在釋放共享資源,或接收到釋放共享資源的訊號時需要通知後繼結點,以防止通知丟失。

一個結點在被建立時,欄位Node#waitStatus的初始值為 0,表示結點上的執行緒不位於上述任何狀態。

Node 類在方法定義上除了基本的構造方法外,僅定義了Node#isSharedNode#predecessor兩個方法,其中前者用於返回當前結點是否以共享模式在等待,後者用於返回當前結點的前驅結點。

介紹完了佇列結點的定義,那麼同步佇列具體如何實現呢?這還需要依賴於 AbstractQueuedSynchronizer 類中的兩個欄位定義,即:

private transient volatile Node head;
private transient volatile Node tail;

其中 head 表示同步佇列的頭結點,而 tail 則表示同步佇列的尾結點,具體組織形式如下圖:

當呼叫 AQS 的 acquire 方法獲取資源時,如果資源不足則當前執行緒會被封裝成 Node 結點新增到同步佇列的末端,頭結點 head 用於記錄當前正在持有資源的執行緒結點,而 head 的後繼結點就是下一個將要被排程的執行緒結點,當 release 方法被呼叫時,該結點上的執行緒將被喚醒,繼續獲取資源。

關於同步佇列結點入佇列、出佇列的實現先不展開,留到後面分析 AQS 資源獲取與釋放的過程時一併分析。

條件佇列

除了上面介紹的同步佇列,在 AQS 中還定義了一個條件佇列。內部類 ConditionObject 實現了條件佇列的組織形式,包含一個起始結點(firstWaiter)和一個末尾結點(lastWaiter),並同樣以上面介紹的 Node 類定義結點,如下:

public class ConditionObject implements Condition, Serializable {

        /** 指向條件佇列中的起始結點 */
        private transient Node firstWaiter;
        /** 指向條件佇列的末尾結點 */
        private transient Node lastWaiter;

        // ... 省略方法定義

}

前面在分析 Node 內部類的時候,可以看到 Node 類還定義了一個Node#nextWaiter欄位,用於指向條件佇列中的下一個等待結點。由此我們可以描繪出條件佇列的組織形式如下:

ConditionObject 類實現了 Condition 介面,該介面定義了與 Lock 鎖相關的執行緒通訊方法,主要分為 await 和 signal 兩大類。

當執行緒呼叫 await 方法時,該執行緒會被包裝成結點新增到條件佇列的末端,並釋放持有的資源。當條件得以滿足時,方法 signal 可以將條件佇列中的一個或全部的執行緒結點從條件佇列轉移到同步佇列以參與競爭資源。應用可以建立多個 ConditionObject 物件,每個物件都對應一個條件佇列,對於同一個條件佇列而言,其中的執行緒所等待的條件是相同的。

Condition 介面的定義如下:

public interface Condition {

    void await() throws InterruptedException;
    void awaitUninterruptibly();
    long awaitNanos(long nanosTimeout) throws InterruptedException;
    boolean await(long time, TimeUnit unit) throws InterruptedException;
    boolean awaitUntil(Date deadline) throws InterruptedException;

    void signal();
    void signalAll();
}

等待:await

下面來分析一下 ConditionObject 類針對 Condition 介面方法的實現,首先來看一下ConditionObject#await方法,該方法用於將當前執行緒新增到條件佇列中進行等待,同時支援響應中斷。方法實現如下:

public final void await() throws InterruptedException {
    if (Thread.interrupted()) {
        // 立即響應中斷
        throw new InterruptedException();
    }
    // 將當前執行緒新增到等待佇列末尾,等待狀態為 CONDITION
    Node node = this.addConditionWaiter();
    // 釋放當前執行緒持有的資源
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) { // 如果當前結點位於條件佇列中,則迴圈
        // 阻塞當前執行緒
        LockSupport.park(this);
        // 如果執行緒在阻塞期間被中斷,則退出迴圈
        if ((interruptMode = this.checkInterruptWhileWaiting(node)) != 0) {
            break;
        }
    }
    // 如果在同步佇列中等待期間被中斷,且之前的中斷狀態不為 THROW_IE
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE) {
        interruptMode = REINTERRUPT;
    }
    if (node.nextWaiter != null) {
        // 清除條件佇列中所有狀態不為 CONDITION 的結點
        this.unlinkCancelledWaiters();
    }
    // 如果等待期間被中斷,則響應中斷
    if (interruptMode != 0) {
        this.reportInterruptAfterWait(interruptMode);
    }
}

因為ConditionObject#await方法支援響應中斷,所以在方法一開始會先檢查一下當前執行緒是否被中斷,如果是則丟擲 InterruptedException 異常,否則繼續將當前執行緒加入到條件佇列中進行等待。整體執行流程可以概括為:

  1. 將當前執行緒加入到條件佇列末端,並設定等待狀態為 CONDITION;
  2. 釋放當前執行緒所持有的資源,避免飢餓或死鎖;
  3. 基於自旋機制在條件佇列中等待,直到被其它執行緒轉移到同步佇列,或者等待期間被中斷;
  4. 如果等待期間被中斷,則響應中斷。

ConditionObject 定義了兩種中斷響應方式,即:REINTERRUPTTHROW_IE。如果是REINTERRUPT,則執行緒會呼叫Thread#interrupt方法中斷自己;如果是THROW_IE,則執行緒會直接丟擲 InterruptedException 異常。

下面繼續分析一下支撐ConditionObject#await執行的其它幾個方法,包括 addConditionWaiter、fullyRelease、isOnSyncQueue,以及 unlinkCancelledWaiters。

方法ConditionObject#addConditionWaiter用於將當前執行緒包裝成 Node 結點物件新增到條件佇列的末端,期間會執行清除條件佇列中處於取消狀態(等待狀態不為 CONDITION)的執行緒結點。方法實現如下:

private Node addConditionWaiter() {
    // 獲取條件佇列的末尾結點
    Node t = lastWaiter;
    // 如果末尾結點狀態不為 CONDITION,表示對應的執行緒已經取消了等待,需要執行清理操作
    if (t != null && t.waitStatus != Node.CONDITION) {
        // 清除條件佇列中所有狀態不為 CONDITION 的結點
        this.unlinkCancelledWaiters();
        t = lastWaiter;
    }
    // 構建當前執行緒對應的 Node 結點,等待狀態為 CONDITION,並新增到條件佇列末尾
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    if (t == null) {
        firstWaiter = node;
    } else {
        t.nextWaiter = node;
    }
    lastWaiter = node;
    return node;
}

將當前執行緒物件新增到條件佇列中的過程本質上是一個簡單的連結串列插入操作,在執行插入操作之前,上述方法會先對條件佇列執行一遍清理操作,清除那些狀態不為 CONDITION 的結點。具體實現位於ConditionObject#unlinkCancelledWaiters方法中:

private void unlinkCancelledWaiters() {
    Node t = firstWaiter;
    Node trail = null; // 記錄上一個不被刪除的結點
    while (t != null) {
        Node next = t.nextWaiter;
        // 如果結點上的執行緒等待狀態不為 CONDITION,則刪除對應結點
        if (t.waitStatus != Node.CONDITION) {
            t.nextWaiter = null;
            if (trail == null) {
                firstWaiter = next;
            } else {
                trail.nextWaiter = next;
            }
            if (next == null) {
                lastWaiter = trail;
            }
        } else {
            trail = t;
        }
        t = next;
    }
}

方法AbstractQueuedSynchronizer#fullyRelease用於釋放當前執行緒持有的資源,這也是非常容易理解的,畢竟當前執行緒即將進入等待狀態,如果持有的資源不被釋放,將可能導致程式最終被餓死,或者死鎖。方法的實現如下:

final int fullyRelease(Node node) {
    boolean failed = true;
    try {
        // 獲取當前執行緒的同步狀態,可以理解為持有的資源數量
        int savedState = this.getState();
        // 嘗試釋放當前執行緒持有的資源
        if (this.release(savedState)) {
            failed = false;
            return savedState;
        } else {
            // 釋放資源失敗
            throw new IllegalMonitorStateException();
        }
    } finally {
        if (failed) {
            // 如果釋放資源失敗,則取消當前執行緒
            node.waitStatus = Node.CANCELLED;
        }
    }
}

如果資源釋放失敗,則上述方法會將當前執行緒的狀態設定為 CANCELLED,以退出等待狀態。

方法AbstractQueuedSynchronizer#isOnSyncQueue用於檢測當前結點是否位於同步佇列中,方法實現如下:

final boolean isOnSyncQueue(Node node) {
    // 如果結點位於等待佇列,或是頭結點則返回 false
    if (node.waitStatus == Node.CONDITION || node.prev == null) {
        return false;
    }
    // If has successor, it must be on queue
    if (node.next != null) {
        return true;
    }

    /*
     * node.prev can be non-null, but not yet on queue because the CAS to place it on queue can fail.
     * So we have to traverse from tail to make sure it actually made it. It will always be near the tail in calls to this method,
     * and unless the CAS failed (which is unlikely), it will be there, so we hardly ever traverse much.
     */

    // 從後往前檢測目標結點是否位於同步佇列中
    return this.findNodeFromTail(node);
}

如果一個執行緒所等待的條件被滿足,則觸發條件滿足的執行緒會將等待該條件的一個或全部執行緒結點從條件佇列轉移到同步佇列,此時,這些執行緒將從ConditionObject#await方法中退出,以參與競爭資源。

方法ConditionObject#awaitNanosConditionObject#awaitUntilConditionObject#await(long, TimeUnit)在上面介紹的ConditionObject#await方法的基礎上引入了超時機制,當一個執行緒在條件佇列中等待的時間超過設定值時,執行緒結點將被從條件佇列轉移到同步佇列,參與競爭資源。其它執行過程與ConditionObject#await方法相同,故不再展開。

下面來分析一下ConditionObject#awaitUninterruptibly方法,由方法命名可以看出該方法相對於ConditionObject#await方法的區別在於在等待期間不響應中斷。方法實現如下:

public final void awaitUninterruptibly() {
    // 將當前執行緒新增到等待佇列末尾,等待狀態為 CONDITION
    Node node = this.addConditionWaiter();
    // 釋放當前執行緒持有的資源
    int savedState = fullyRelease(node);
    boolean interrupted = false;
    // 如果當前結點位於條件佇列中,則迴圈
    while (!isOnSyncQueue(node)) {
        // 阻塞當前執行緒
        LockSupport.park(this);
        if (Thread.interrupted()) {
            // 標識執行緒等待期間被中斷,但不立即響應
            interrupted = true;
        }
    }
    // 自旋獲取資源,返回 true 則說明等待期間被中斷過
    if (acquireQueued(node, savedState) || interrupted) {
        // 響應中斷
        selfInterrupt();
    }
}

如果執行緒在等待期間被中斷,則上述方法會用一個欄位進行記錄,並在最後集中處理,而不會因為中斷而退出等待狀態。

通知:signal

呼叫 await 方法會將執行緒物件自身加入到條件佇列中進行等待,而 signal 通知方法則用於將一個或全部的等待執行緒從條件佇列轉移到同步佇列,以參與競爭資源。ConditionObject 定義了兩個通知方法:signal 和 signalAll,前者用於將條件佇列的頭結點(也就是等待時間最長的結點)從條件佇列轉移到同步佇列,後者用於將條件佇列中所有處於等待狀態的結點從條件佇列轉移到同步佇列。下面分別來分析一下這兩個方法的實現。

方法ConditionObject#signal的實現如下:

public final void signal() {
    // 先檢測當前執行緒是否獲取到了鎖,否則不允許繼續執行
    if (!isHeldExclusively()) {
        throw new IllegalMonitorStateException();
    }
    // 獲取條件佇列頭結點,即等待時間最長的結點
    Node first = firstWaiter;
    if (first != null) {
        // 將頭結點從條件佇列轉移到同步佇列,參與競爭資源
        this.doSignal(first);
    }
}

呼叫ConditionObject#signal方法的執行緒必須位於臨界區,也就是必須先持有獨佔鎖,所以上述方法一開始會對這一條件進行校驗,方法AbstractQueuedSynchronizer#isHeldExclusively是一個模板方法,交由子類來實現。如果滿足執行條件,則上述方法會呼叫ConditionObject#doSignal方法將條件佇列的頭結點從條件佇列轉移到同步佇列。

private void doSignal(Node first) {
    // 從前往後遍歷,直到遇到第一個不為 null 的結點,並將其從條件佇列轉移到同步佇列
    do {
        if ((firstWaiter = first.nextWaiter) == null) {
            lastWaiter = null;
        }
        first.nextWaiter = null;
    } while (!transferForSignal(first) && (first = firstWaiter) != null);
}

// AbstractQueuedSynchronizer#transferForSignal
final boolean transferForSignal(Node node) {
    // 更新當前結點的等待狀態:CONDITION -> 0
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
        // 更新失敗,說明對應的結點上的執行緒已經被取消
        return false;
    }

    /*
     * Splice onto queue and try to set waitStatus of predecessor to indicate that thread is (probably) waiting.
     * If cancelled or attempt to set waitStatus fails, wake up to resync (in which case the waitStatus can be transiently and harmlessly wrong).
     */

    // 將結點新增到同步佇列末端,並返回該結點的前驅結點
    Node p = this.enq(node);
    int ws = p.waitStatus;
    // 如果前驅結點被取消,或者設定前驅結點的狀態為 SIGNAL 失敗,則喚醒當前結點上的執行緒
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) {
        LockSupport.unpark(node.thread);
    }
    return true;
}

方法ConditionObject#doSignal會從前往後遍歷條件佇列,尋找第一個不為 null 的結點,並應用AbstractQueuedSynchronizer#transferForSignal方法嘗試將其從條件佇列轉移到同步佇列。

在入同步佇列之前,方法AbstractQueuedSynchronizer#transferForSignal會基於 CAS 機制清除結點的 CONDITION 狀態,如果清除失敗則說明該結點上的執行緒已被取消,此時ConditionObject#doSignal方法會繼續尋找下一個可以被喚醒的結點。如果清除結點狀態成功,則接下來會將該結點新增到同步佇列的末端,同時依據前驅結點的狀態決定是否喚醒當前結點上的執行緒。

繼續來看ConditionObject#signalAll方法的實現,相對於上面介紹的ConditionObject#signal方法,該方法的特點在於它會喚醒條件佇列中所有不為 null 的等待結點。方法實現如下:

public final void signalAll() {
    if (!isHeldExclusively()) {
        // 先檢測當前執行緒是否獲取到了鎖,否則不允許繼續執行
        throw new IllegalMonitorStateException();
    }
    // 獲取條件佇列頭結點
    Node first = firstWaiter;
    if (first != null) {
        // 將所有結點從條件佇列轉移到同步佇列,參與競爭資源
        this.doSignalAll(first);
    }
}

private void doSignalAll(Node first) {
    lastWaiter = firstWaiter = null;
    do {
        Node next = first.nextWaiter;
        first.nextWaiter = null;
        transferForSignal(first);
        first = next;
    } while (first != null);
}

實際上理解了ConditionObject#doSignal的執行機制,再理解ConditionObject#signalAll的執行機制也是水到渠成的事情。

資源的獲取與釋放

前面的小節我們分析了 LockSupport 工具類,以及 AQS 同步佇列和條件佇列的設計與實現,這些都是支撐 AQS 執行的基礎元件,本小節我們將正式開始分析 AQS 的實現機制。

AQS 對應的 AbstractQueuedSynchronizer 實現類,在屬性定義上主要包含 4 個欄位(如下),其中 exclusiveOwnerThread 由父類 AbstractOwnableSynchronizer 繼承而來,用於記錄當前持有獨佔鎖的執行緒物件,而 head 和 tail 欄位分別指向同步佇列的頭結點和尾結點:

private transient Thread exclusiveOwnerThread;

private transient volatile Node head;
private transient volatile Node tail;

private volatile int state;

欄位 state 用於描述同步狀態,對於不同的實現類來說具備不同的用途:

  • 對於 ReentrantLock 而言,表示當前執行緒獲取鎖的重入次數。
  • 對於 ReentrantReadWriteLock 而言,高 16 位表示獲取讀鎖的重入次數,低 16 位表示獲取寫鎖的重入次數。
  • 對於 Semaphore 而言,表示當前可用的訊號個數。
  • 對於 CountDownLatch 而言,表示計數器當前的值。

具體細節我們將在後面分析相應元件實現機制的文章中再展開說明。

AbstractQueuedSynchronizer 是一個抽象類,在方法設計上引入了模板方法設計模式,下面的程式碼塊中列出了所有需要子類依據自身執行機制針對性實現的模板方法:

protected boolean tryAcquire(int arg)
protected boolean tryRelease(int arg)
protected int tryAcquireShared(int arg)
protected boolean tryReleaseShared(int arg)
protected boolean isHeldExclusively()

這裡先簡單說明一下各個方法的作用,具體實現留到後面分析各個基於 AQS 實現元件的文章中再進一步分析:

  • tryAcquire:嘗試以獨佔模式獲取資源,如果獲取成功則返回 true,否則返回 false。
  • tryRelease:嘗試以獨佔模式釋放資源,如果釋放成功則返回 true,否則返回 false。
  • tryAcquireShared:嘗試以共享模式獲取資源,如果返回正數則說明獲取成功,且還有可用的剩餘資源;如果返回 0 則說明獲取成功,但是沒有可用的剩餘資源;如果返回負數則說明獲取資源失敗。
  • tryReleaseShared:嘗試以共享模式釋放資源,如果釋放成功則返回 true,否則返回 false。
  • isHeldExclusively:判斷當前執行緒是否正在獨佔資源,如果是則返回 true,否則返回 false。

AbstractQueuedSynchronizer 中的方法實現按照功能劃分可以分為兩大類,即獲取資源(acquire)和釋放資源(release),同時區分獨佔模式和共享模式。下面的小節中主要對獲取和釋放資源的方法區分獨佔模式和共享模式進行分析。

獨佔獲取資源

針對獨佔模式獲取資源,AbstractQueuedSynchronizer 定義了多個版本的 acquire 方法實現,包括:acquire、acquireInterruptibly,以及 tryAcquireNanos,其中 acquireInterruptibly 是 acquire 的中斷版本,在等待獲取資源期間支援響應中斷請求,tryAcquireNanos 除了支援響應中斷以外,還引入了超時等待機制。

下面主要分析一下AbstractQueuedSynchronizer#acquire的實現,理解了該方法的實現機制,也就自然而然理解了另外兩個版本的實現機制。方法AbstractQueuedSynchronizer#acquire的實現如下:

public final void acquire(int arg) {
    if (!this.tryAcquire(arg) // 嘗試獲取資源
            // 如果獲取資源失敗,則將當前執行緒加入到同步佇列的末端(獨佔模式),並基於自旋機制等待獲取資源
            && this.acquireQueued(this.addWaiter(Node.EXCLUSIVE), arg)) {
        // 等待獲取資源期間曾被中斷過,在獲取資源成功之後再響應中斷
        selfInterrupt();
    }
}

方法AbstractQueuedSynchronizer#tryAcquire的功能在前面已經簡單介紹過了,用於嘗試獲取資源,如果獲取資源失敗則會將當前執行緒新增到同步佇列中,基於自旋機制等待獲取資源。

方法AbstractQueuedSynchronizer#addWaiter用於將當前執行緒物件封裝成結點新增到同步佇列末端,並最終返回執行緒結點物件:

private Node addWaiter(Node mode) {
    // 為當前執行緒建立結點物件
    Node node = new Node(Thread.currentThread(), mode);
    // 基於 CAS 機制嘗試快速新增結點到同步佇列末端
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (this.compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    // 快速新增失敗,繼續嘗試將該結點新增到同步佇列末端,如果同步佇列未被初始化則執行初始化
    this.enq(node);
    // 返回當前執行緒對應的結點物件
    return node;
}

上述方法在新增結點的時候,如果同步佇列已經存在,則嘗試基於 CAS 操作快速將當前結點新增到同步佇列末端。如果新增失敗,或者佇列不存在,則需要再次呼叫AbstractQueuedSynchronizer#enq方法執行新增操作,該方法在判斷佇列不存在時會初始化同步佇列,然後基於 CAS 機制嘗試往同步佇列末端插入執行緒結點。方法實現如下:

private Node enq(final Node node) {
    for (; ; ) {
        // 獲取同步佇列末尾結點
        Node t = tail;
        // 如果結點不存在,則初始化
        if (t == null) { // Must initialize
            if (this.compareAndSetHead(new Node())) {
                tail = head;
            }
        } else {
            // 往末尾追加
            node.prev = t;
            if (this.compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

完成了結點的入同步佇列操作,接下來會呼叫AbstractQueuedSynchronizer#acquireQueued方法基於自旋機制等待獲取資源,在等待期間並不會響應中斷,而是記錄中斷標誌,等待獲取資源成功後延遲響應。方法實現如下:

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false; // 標記自旋過程中是否被中斷
        // 基於自旋機制等待獲取資源
        for (; ; ) {
            // 獲取前驅結點
            final Node p = node.predecessor();
            // 如果前驅結點為頭結點,說明當前結點是排在同步佇列最前面,可以嘗試獲取資源
            if (p == head && this.tryAcquire(arg)) {
                // 獲取資源成功,更新頭結點
                this.setHead(node); // 頭結點一般記錄持有資源的執行緒結點
                p.next = null; // help GC
                failed = false;
                return interrupted; // 自旋過程中是否被中斷
            }
            // 如果還未輪到當前結點,或者獲取資源失敗
            if (shouldParkAfterFailedAcquire(p, node) // 判斷是否需要阻塞當前執行緒
                    && this.parkAndCheckInterrupt()) { // 如果需要,則進入阻塞狀態,並在甦醒時檢查中斷狀態
                // 標識等待期間被中斷
                interrupted = true;
            }
        }
    } finally {
        // 嘗試獲取資源失敗,說明執行異常,取消當前結點獲取資源的程序
        if (failed) {
            this.cancelAcquire(node);
        }
    }
}

上述方法會迴圈檢測當前結點是否已經排在同步佇列的最前端,如果是則呼叫AbstractQueuedSynchronizer#tryAcquire方法嘗試獲取資源,具體獲取資源的過程由子類實現。自旋期間如果還未輪到排程當前執行緒結點,或者嘗試獲取資源失敗,則會呼叫AbstractQueuedSynchronizer#shouldParkAfterFailedAcquire方法檢測是否需要阻塞當前執行緒,具體判定的過程依賴於前驅結點的等待狀態,實現如下:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    // 獲取前驅結點狀態
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL) {
        // 前驅結點狀態為 SIGNAL,說明當前結點需要被阻塞
        return true;
    }
    if (ws > 0) {
        // 前驅結點處於取消狀態,則一直往前尋找處於等待狀態的結點,並排在其後面
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
         * 前驅結點的狀態為 0 或 PROPAGATE,但是當前結點需要一個被喚醒的訊號,
         * 所以基於 CAS 將前驅結點等待狀態設定為 SIGNAL,在阻塞之前,呼叫者需要重試以再次確認不能獲取到資源。
         */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

上述方法首先會獲取前驅結點的等待狀態,並依據具體的狀態值進行決策:

  1. 如果前驅結點等待狀態為 SIGNAL,則說明當前結點需要被阻塞,所以直接返回 true;
  2. 否則,如果前驅結點的等待狀態大於 0(即處於取消狀態),則一直往前尋找未被取消的結點,並將當前結點排在其後,這種情況下直接返回 false,再次嘗試獲取一次資源;
  3. 否則,前驅結點的狀態為 0 或 PROPAGATE(不可能為 CONDITION 狀態,因為當前處於同步佇列),因為當前結點需要一個喚醒訊號,所以修改前驅結點的狀態為 SIGNAL,這種情況下同樣返回 false,以再次確認不能獲取到資源。

如果上述檢查返回 true,則接下來會呼叫AbstractQueuedSynchronizer#parkAndCheckInterrupt方法,基於 LockSupport 工具阻塞當前執行緒,並在執行緒甦醒時檢查中斷狀態。如果期間被中斷過則記錄中斷標記,而不立即響應,直到成功獲取到資源,或者期間發生異常退出自旋。方法AbstractQueuedSynchronizer#acquireQueued最終會返回這一中斷標記,並在外圍進行響應。

如果在自旋期間發生異常,則上述方法會執行AbstractQueuedSynchronizer#cancelAcquire以取消當前結點等待獲取資源的程序,包括設定結點的等待狀態為 CANCELLED,喚醒後繼結點等。

獨佔釋放資源

針對獨佔模式釋放資源,AbstractQueuedSynchronizer 定義了單一實現,即AbstractQueuedSynchronizer#release方法,該方法本質上是一個排程的過程,具體釋放資源的操作交由 tryRelease 方法完成,由子類實現。方法AbstractQueuedSynchronizer#release實現如下:

public final boolean release(int arg) {
    // 嘗試釋放資源
    if (this.tryRelease(arg)) {
        Node h = head;
        // 如果釋放資源成功,則嘗試喚醒後繼結點
        if (h != null && h.waitStatus != 0) {
            this.unparkSuccessor(h);
        }
        return true;
    }
    return false;
}

如果 tryRelease 釋放資源成功,則上述方法會嘗試喚醒同步佇列中由後往前距離頭結點最近的一個結點上的執行緒。方法AbstractQueuedSynchronizer#unparkSuccessor的實現如下:

private void unparkSuccessor(Node node) {
    // 獲取當前結點狀態
    int ws = node.waitStatus;
    if (ws < 0) {
        // 如果當前結點未被取消,則基於 CAS 更新結點等待狀態為 0
        compareAndSetWaitStatus(node, ws, 0);
    }

    /*
     * Thread to unpark is held in successor, which is normally just the next node.
     * But if cancelled or apparently null, traverse backwards from tail to find the actual non-cancelled successor.
     */
    Node s = node.next; // 獲取後繼結點
    // 如果後繼結點為 null,或者被取消
    if (s == null || s.waitStatus > 0) {
        s = null;
        // 從後往前尋找距離當前結點最近的一個未被取消的執行緒結點
        for (Node t = tail; t != null && t != node; t = t.prev) {
            if (t.waitStatus <= 0) {
                s = t;
            }
        }
    }
    // 喚醒結點上的執行緒
    if (s != null) {
        LockSupport.unpark(s.thread);
    }
}

選舉待喚醒執行緒結點的過程被設計成從後往前遍歷,尋找距離當前結點最近的未被取消的結點,並呼叫 LockSupport 工具類喚醒結點上的執行緒。

那為什麼要設計成從後往前遍歷同步佇列呢?在Doug Lea大神的論文The java.util.concurrent Synchronizer Framework中給出了答案,摘錄如下:

AnAbstractQueuedSynchronizerqueue node contains anextlink to its successor. But because there are no applicable techniques for lock-free atomic insertion of double-linked listnodes using compareAndSet, this link is not atomically set as part of insertion; it is simply assigned:pred.next = node;after the insertion. This is reflected in all usages. Thenextlink is treated only as an optimized path. If a node's successor does not appear to exist (or appears to be cancelled) via itsnextfield, it is always possible to start at the tail of the list and traverse backwards using thepredfield to accurately check if therereally is one.

也就說對於雙向連結串列而言,沒有不加鎖的原子手段可以保證構造雙向指標的執行緒安全性。回到程式碼中,我們回顧一下往同步佇列中新增結點的執行過程,如下(其中 pred 是末尾結點,而 node 是待插入的結點):

node.prev = pred;
if (this.compareAndSetTail(pred, node)) {
    pred.next = node;
    return node;
}

上述方法會將 node 結點的 prev 指標指向 pred 結點,而將 pred 的 next 指標指向 node 結點的過程需要建立在基於 CAS 成功將 node 設定為末端結點的基礎之上,如果這一過程失敗則 next 指標將會斷掉,而選擇從後往前遍歷則始終能夠保證遍歷到頭結點。

共享獲取資源

針對共享模式獲取資源,AbstractQueuedSynchronizer 同樣定義了多個版本的 acquire 方法實現,包括:acquireShared、acquireSharedInterruptibly,以及 tryAcquireSharedNanos,其中 acquireSharedInterruptibly 是 acquireShared 的中斷版本,在等待獲取資源期間支援響應中斷請求,tryAcquireSharedNanos 除了支援響應中斷以外,還引入了超時等待機制。下面同樣主要分析一下AbstractQueuedSynchronizer#acquireShared的實現,理解了該方法的實現機制,也就自然而然理解了另外兩個版本的實現機制。

方法AbstractQueuedSynchronizer#acquireShared的實現如下:

public final void acquireShared(int arg) {
    // 返回負數表示獲取資源失敗
    if (this.tryAcquireShared(arg) < 0) {
        // 將當前執行緒新增到條件佇列,基於自旋等待獲取資源
        this.doAcquireShared(arg);
    }
}

private void doAcquireShared(int arg) {
    // 將當前執行緒加入條件佇列末端,並標記為共享模式
    final Node node = this.addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false; // 標記自旋過程中是否被中斷
        for (; ; ) {
            // 獲取前驅結點
            final Node p = node.predecessor();
            // 如果前驅結點為頭結點,說明當前結點是排在同步佇列最前面,可以嘗試獲取資源
            if (p == head) {
                // 嘗試獲取資源
                int r = this.tryAcquireShared(arg);
                if (r >= 0) {
                    // 獲取資源成功,設定自己為頭結點,並嘗試喚醒後繼結點
                    this.setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted) {
                        selfInterrupt();
                    }
                    failed = false;
                    return;
                }
            }
            // 如果還未輪到當前結點,或者獲取資源失敗
            if (shouldParkAfterFailedAcquire(p, node) // 判斷是否需要阻塞當前執行緒
                    && this.parkAndCheckInterrupt()) { // 如果需要,則進入阻塞狀態,並在甦醒時檢查中斷狀態
                // 標識等待期間被中斷
                interrupted = true;
            }
        }
    } finally {
        // 嘗試獲取資源失敗,說明執行異常,取消當前結點獲取資源的程序
        if (failed) {
            this.cancelAcquire(node);
        }
    }
}

上述方法與AbstractQueuedSynchronizer#acquire的實現邏輯大同小異,區別在於執行緒在被封裝成結點之後,是以共享(SHARED)模式在同步佇列中進行等待。這裡我們重點關注一下AbstractQueuedSynchronizer#setHeadAndPropagate方法的實現,當結點上的執行緒成功獲取到資源會觸發執行該方法,以嘗試喚醒後繼結點。實現如下:

private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // 記錄之前的頭結點
    this.setHead(node); // 頭結點一般記錄持有資源的執行緒結點
    /*
     * 如果滿足以下條件,嘗試喚醒後繼結點:
     *
     * 1. 存在剩餘可用的資源;
     * 2. 後繼結點處於等待狀態,或後繼結點為空
     *
     * Try to signal next queued node if:
     *   Propagation was indicated by caller,
     *   or was recorded (as h.waitStatus either before or after setHead) by a previous operation
     *   (note: this uses sign-check of waitStatus because PROPAGATE status may transition to SIGNAL.)
     * and
     *   The next node is waiting in shared mode,
     *   or we don't know, because it appears null
     *
     * The conservatism in both of these checks may cause unnecessary wake-ups,
     * but only when there are multiple racing acquires/releases, so most need signals now or soon anyway.
     */
    if (propagate > 0 // 存在剩餘可用的資源
            || h == null || h.waitStatus < 0 // 此時 h 是之前的頭結點
            || (h = head) == null || h.waitStatus < 0) { // 此時 h 已經更新為當前頭結點
        Node s = node.next;
        // 如果後繼結點以共享模式在等待,或者後繼結點未知,則嘗試喚醒後繼結點
        if (s == null || s.isShared()) {
            this.doReleaseShared();
        }
    }
}

因為當前結點已經獲取到資源,所以需要將當前結點記錄到頭結點中。此外,如果滿足以下 2 種情況之一,還需要喚醒後繼結點:

  1. 引數propagate > 0,即存在可用的剩餘資源;
  2. 前任頭結點或當前頭結點不存在,或指明後繼結點需要被喚醒。

如果滿足上述條件之一,且後繼結點狀態未知或以共享模式在等待,則呼叫AbstractQueuedSynchronizer#doReleaseShared方法喚醒後繼結點,關於該方法的實現留到下一小節進行分析。

共享釋放資源

針對共享模式釋放資源,AbstractQueuedSynchronizer 同樣定義了單一實現,即AbstractQueuedSynchronizer#releaseShared方法,該方法本質上也是一個排程的過程,具體釋放資源的操作交由 tryReleaseShared 方法完成,由子類實現。方法AbstractQueuedSynchronizer#releaseShared實現如下:

public final boolean releaseShared(int arg) {
    // 嘗試釋放資源
    if (this.tryReleaseShared(arg)) {
        // 釋放資源成功,喚醒後繼結點
        this.doReleaseShared();
        return true;
    }
    return false;
}

private void doReleaseShared() {
    /*
     * Ensure that a release propagates, even if there are other in-progress acquires/releases.
     * This proceeds in the usual way of trying to unparkSuccessor of head if it needs signal.
     * But if it does not, status is set to PROPAGATE to ensure that upon release, propagation continues.
     * Additionally, we must loop in case a new node is added while we are doing this.
     * Also, unlike other uses of unparkSuccessor, we need to know if CAS to reset status fails, if so rechecking.
     */
    for (; ; ) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            // 如果頭結點狀態為 SIGNAL,則在喚醒後繼結點之前嘗試清除當前結點的狀態
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) {
                    // loop to recheck cases
                    continue;
                }
                // 喚醒後繼結點
                this.unparkSuccessor(h);
            }
            /*
             * 如果後繼結點暫時不需要被喚醒,則基於 CAS 嘗試將目標結點的 waitStatus 由 0 修改為 PROPAGATE,
             * 以保證後續由喚醒通知到來時,能夠將通知傳遞下去
             */
            else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) {
                // loop on failed CAS
                continue;
            }
        }
        // 如果頭結點未變更,則說明期間持有鎖的執行緒未發生變化,能夠走到這一步說明前面的操作已經成功完成
        if (h == head) {
            break;
        }
        // 如果頭結點發生變更,則說明期間持有鎖的執行緒發生了變化,需要重試以保證喚醒動作的成功執行
    }
}

如果釋放資源成功,需要依據頭結點當下等待狀態分別處理:

  1. 如果頭結點的等待狀態為 SIGNAL,則表明後繼結點需要被喚醒,在執行喚醒操作之前需要清除等待狀態。
  2. 如果頭結點狀態為 0,則表示後繼結點不需要被喚醒,此時需要將結點狀態修改為 PROPAGATE,以保證後續接收到喚醒通知時能夠將通知傳遞下去。

總結

本文我們分析了 AQS 的設計與實現。理解了 AQS 的執行機制也就理解了 java 的 Lock 鎖是如何實現執行緒的阻塞、喚醒、等待和通知機制的,所以理解 AQS 也是我們後面分析 Lock 鎖和同步器實現的基礎。

從下一篇開始,我們將介紹 JUC 中基於 AQS 實現的元件,包括 ReentrantLock、ReentrantReadWriteLock、CountDownLatch,以及 Semaphore 等,去分析 AQS 中定義的模板方法是如何在這些元件中進行實現的。

更多精彩內容,歡迎訪問個人主頁https://www.sxyya.com