1. 程式人生 > 其它 >【死磕 Java 併發】—– J.U.C 之 AQS:同步狀態的獲取與釋放

【死磕 Java 併發】—– J.U.C 之 AQS:同步狀態的獲取與釋放

摘要: 原創出處 http://cmsblogs.com/?p=2197 「小明哥」歡迎轉載,保留摘要,謝謝!

在前面提到過,AQS 是構建 Java 同步元件的基礎,我們期待它能夠成為實現大部分同步需求的基礎。

AQS 的設計模式採用的模板方法模式,子類通過繼承的方式,實現它的抽象方法來管理同步狀態。對於子類而言,它並沒有太多的活要做,AQS 已經提供了大量的模板方法來實現同步,主要是分為三類:

  • 獨佔式獲取和釋放同步狀態

  • 共享式獲取和釋放同步狀態

  • 查詢同步佇列中的等待執行緒情況。

自定義子類使用 AQS 提供的模板方法,就可以實現自己的同步語義

1. 獨佔式

獨佔式,同一時刻,僅有一個執行緒持有同步狀態

1.1 獨佔式同步狀態獲取

#acquire(int arg) 方法,為 AQS 提供的模板方法。該方法為獨佔式獲取同步狀態,但是該方法對中斷不敏感。也就是說,由於執行緒獲取同步狀態失敗而加入到 CLH 同步佇列中,後續對該執行緒進行中斷操作時,執行緒不會從 CLH 同步佇列中移除。程式碼如下

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

第 2 行:呼叫 #tryAcquire(int arg) 方法,去嘗試獲取同步狀態,獲取成功則設定鎖狀態並返回 true ,否則獲取失敗,返回 false 。若獲取成功,#acquire(int arg) 方法,直接返回,不用執行緒阻塞,自旋直到獲得同步狀態成功。

#tryAcquire(int arg) 方法,需要自定義同步元件自己實現,該方法必須要保證執行緒安全的獲取同步狀態。程式碼如下:

protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}

直接丟擲 UnsupportedOperationException 異常。

第 3 行:如果 #tryAcquire(int arg) 方法返回 false ,即獲取同步狀態失敗,則呼叫 #addWaiter(Node mode) 方法,將當前執行緒加入到 CLH 同步佇列尾部。並且, mode 方法引數為 Node.EXCLUSIVE ,表示獨佔模式。

第 3 行:呼叫 boolean #acquireQueued(Node node, int arg) 方法,自旋直到獲得同步狀態成功。詳細解析,見 「1.1.1 acquireQueued」 中。另外,該方法的返回值型別為 boolean ,當返回 true 時,表示在這個過程中,發生過執行緒中斷。但是呢,這個方法又會清理執行緒中斷的標識,所以在種情況下,需要呼叫【第 4 行】的 #selfInterrupt() 方法,恢復執行緒中斷的標識,程式碼如下:

static void selfInterrupt() {
    Thread.currentThread().interrupt();
}
1.1.1 acquireQueued

boolean #acquireQueued(Node node, int arg) 方法,為一個自旋的過程,也就是說,當前執行緒(Node)進入同步佇列後,就會進入一個自旋的過程,每個節點都會自省地觀察,當條件滿足,獲取到同步狀態後,就可以從這個自旋過程中退出,否則會一直執行下去。

流程圖如下:

程式碼如下:

 final boolean acquireQueued(final Node node, int arg) {
        // 記錄是否獲取同步狀態成功
        boolean failed = true;
        try {
            // 記錄過程中,是否發生執行緒中斷
            boolean interrupted = false;
          /*
           * 自旋過程,其實就是一個死迴圈而已
           */
            for (;;) {
                // 當前執行緒的前驅節點
                final Node p = node.predecessor();
                // 當前執行緒的前驅節點是頭結點,且同步狀態成功
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                 // 獲取失敗,執行緒等待--具體後面介紹
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            // 獲取同步狀態發生異常,取消獲取。
            if (failed)
                cancelAcquire(node);
        }
    }
1.1.2 shouldParkAfterFailedAcquire
 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
         // 獲得前一個節點的等待狀態
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
  • 第 4 至 9 行:等待狀態為 Node.SIGNAL 時,表示 pred 的下一個節點 node 的執行緒需要阻塞等待。在 pred 的執行緒釋放同步狀態時,會對 node 的執行緒進行喚醒通知。所以,【第 9 行】返回 true ,表明當前執行緒可以被 park安全的阻塞等待。