1. 程式人生 > 程式設計 >AbstractQueuedSynchronizer 原理分析

AbstractQueuedSynchronizer 原理分析

AQS 簡介

什麼是AQS

AQS ,AbstractQueuedSynchronizer ,即佇列同步器。它是構建鎖或者其他同步元件的基礎框架(如 ReentrantLock、ReentrantReadWriteLock、Semaphore 等),J.U.C 併發包的作者(Doug Lea)期望它能夠成為實現大部分同步需求的基礎。

它是 J.U.C 併發包中的核心基礎元件。

AQS 優勢

AQS 解決了在實現同步器時涉及當的大量細節問題,例如獲取同步狀態、FIFO 同步佇列。

基於 AQS 來構建同步器可以帶來很多好處。它不僅能夠極大地減少實現工作,而且也不必處理在多個位置上發生的競爭問題。

在基於 AQS 構建的同步器中,只能在一個時刻發生阻塞,從而降低上下文切換的開銷,提高了吞吐量。同時在設計 AQS 時充分考慮了可伸縮性,因此 J.U.C 中,所有基於 AQS 構建的同步器均可以獲得這個優勢。

同步狀態

AQS 的主要使用方式是繼承,子類通過繼承同步器,並實現它的抽象方法來管理同步狀態。

AQS 使用一個 int 型別的成員變數 state 來表示同步狀態:

當 state > 0 時,表示已經獲取了鎖。
當 state = 0 時,表示釋放了鎖。
複製程式碼

它提供了三個方法,來對同步狀態 state 進行操作,並且 AQS 可以確保對 state 的操作是安全的:

#getState()
#setState(int newState)
#compareAndSetState(int expect,int update)
複製程式碼

同步佇列

AQS 通過內建的 FIFO 同步佇列來完成資源獲取執行緒的排隊工作:

如果當前執行緒獲取同步狀態失敗(鎖)時,AQS 則會將當前執行緒以及等待狀態等資訊構造成一個節點(Node)並將其加入同步佇列,同時會阻塞當前執行緒 當同步狀態釋放時,則會把節點中的執行緒喚醒,使其再次嘗試獲取同步狀態。

主要內建方法

AQS 主要提供瞭如下方法:

  1. #getState():返回同步狀態的當前值
  2. #setState(int newState):設定當前同步狀態。
  3. #compareAndSetState(int expect,int update):使用 CAS 設定當前狀態,該方法能夠保證狀態設定的原子性。
  4. 【可重寫】#tryAcquire(int arg):獨佔式獲取同步狀態,獲取同步狀態成功後,其他執行緒需要等待該執行緒釋放同步狀態才能獲取同步狀態。
  5. 【可重寫】#tryRelease(int arg):獨佔式釋放同步狀態。
  6. 【可重寫】#tryAcquireShared(int arg):共享式獲取同步狀態,返回值大於等於 0 ,則表示獲取成功;否則,獲取失敗。
  7. 【可重寫】#tryReleaseShared(int arg):共享式釋放同步狀態。
  8. 【可重寫】#isHeldExclusively():當前同步器是否在獨佔式模式下被執行緒佔用,一般該方法表示是否被當前執行緒所獨佔。
  9. acquire(int arg):獨佔式獲取同步狀態。如果當前執行緒獲取同步狀態成功,則由該方法返回;否則,將會進入同步佇列等待。該方法將會呼叫可重寫的 #tryAcquire(int arg) 方法;不響應中斷
  10. #acquireInterruptibly(int arg):與 #acquire(int arg) 相同,但是該方法響應中斷。當前執行緒為獲取到同步狀態而進入到同步佇列中,如果當前執行緒被中斷,則該方法會丟擲InterruptedException 異常並返回。
  11. #tryAcquireNanos(int arg,long nanos):超時獲取同步狀態。如果當前執行緒在 nanos 時間內沒有獲取到同步狀態,那麼將會返回 false ,已經獲取則返回 true 。
  12. #acquireShared(int arg):共享式獲取同步狀態,如果當前執行緒未獲取到同步狀態,將會進入同步佇列等待,與獨佔式的主要區別是在同一時刻可以有多個執行緒獲取到同步狀態;
  13. #acquireSharedInterruptibly(int arg):共享式獲取同步狀態,響應中斷。
  14. #tryAcquireSharedNanos(int arg,long nanosTimeout):共享式獲取同步狀態,增加超時限制。
  15. #release(int arg):獨佔式釋放同步狀態,該方法會在釋放同步狀態之後,將同步佇列中第一個節點包含的執行緒喚醒。
  16. #releaseShared(int arg):共享式釋放同步狀態。

從上面的方法看下來,基本上可以分成 3 類:

獨佔式獲取與釋放同步狀態
共享式獲取與釋放同步狀態
查詢同步佇列中的等待執行緒情況
複製程式碼

CLH 同步佇列

CLH簡介

CLH 同步佇列是一個 FIFO 雙向佇列,AQS 依賴它來完成同步狀態的管理:

當前執行緒如果獲取同步狀態失敗時,AQS則會將當前執行緒已經等待狀態等資訊構造成一個節點(Node)並將其加入到CLH同步佇列,同時會阻塞當前執行緒

當同步狀態釋放時,會把首節點喚醒(公平鎖),使其再次嘗試獲取同步狀態。

Node

Node 是 AbstractQueuedSynchronizer 的內部靜態類。

static final class Node {

    // 共享
    static final Node SHARED = new Node();
    // 獨佔
    static final Node EXCLUSIVE = null;

    /**
     * 因為超時或者中斷,節點會被設定為取消狀態,被取消的節點時不會參與到競爭中的,他會一直保持取消狀態不會轉變為其他狀態
     */
    static final int CANCELLED =  1;
    /**
     * 後繼節點的執行緒處於等待狀態,而當前節點的執行緒如果釋放了同步狀態或者被取消,將會通知後繼節點,使後繼節點的執行緒得以執行
     */
    static final int SIGNAL    = -1;
    /**
     * 節點在等待佇列中,節點執行緒等待在Condition上,當其他執行緒對Condition呼叫了signal()後,該節點將會從等待佇列中轉移到同步佇列中,加入到同步狀態的獲取中
     */
    static final int CONDITION = -2;
    /**
     * 表示下一次共享式同步狀態獲取,將會無條件地傳播下去
     */
    static final int PROPAGATE = -3;

    /** 等待狀態 */
    volatile int waitStatus;

    /** 前驅節點,當節點新增到同步佇列時被設定(尾部新增) */
    volatile Node prev;

    /** 後繼節點 */
    volatile Node next;

    /** 等待佇列中的後續節點。如果當前節點是共享的,那麼欄位將是一個 SHARED 常量,也就是說節點型別(獨佔和共享)和等待佇列中的後續節點共用同一個欄位 */
    Node nextWaiter;
    
    /** 獲取同步狀態的執行緒 */
    volatile Thread thread;

    final boolean isShared() {
        return nextWaiter == SHARED;
    }

    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }

    Node() { // Used to establish initial head or SHARED marker
    }

    Node(Thread thread,Node mode) { // Used by addWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }

    Node(Thread thread,int waitStatus) { // Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
    
}
複製程式碼
  1. waitStatus 欄位

等待狀態,用來控制執行緒的阻塞和喚醒,並且可以避免不必要的呼叫LockSupport的 #park(...) 和 #unpark(...) 方法。。目前有 4 種:CANCELLED SIGNAL CONDITION PROPAGATE 。實際上,有第 5 種,INITAL ,值為 0 ,初始狀態。每個等待狀態代表的含義,它不僅僅指的是 Node 自己的執行緒的等待狀態,也可以是下一個節點的執行緒的等待狀態

  1. CLH 同步佇列

head 和 tail 欄位,是 AbstractQueuedSynchronizer 的欄位,分別指向同步佇列的頭和尾。再配合上 prev 和 next 欄位,快速定位到同步佇列的頭尾。

prev 和 next 欄位,分別指向 Node 節點的前一個和後一個 Node 節點,從而實現鏈式雙向佇列。

  1. thread 欄位,Node 節點對應的執行緒 Thread 。

  2. nextWaiter 欄位,Node 節點獲取同步狀態的模型( Mode )。#tryAcquire(int args) 和 #tryAcquireShared(int args) 方法,分別是獨佔式和共享式獲取同步狀態。在獲取失敗時,它們都會呼叫 #addWaiter(Node mode) 方法入隊。而 nextWaiter 就是用來表示是哪種模式:

     SHARED 靜態 + 不可變欄位,列舉共享模式。
     EXCLUSIVE 靜態 + 不可變欄位,列舉獨佔模式。
     #isShared() 方法,判斷是否為共享式獲取同步狀態。
    複製程式碼
  3. #predecessor() 方法,獲得 Node 節點的前一個 Node 節點。在方法的內部,Node p = prev 的本地拷貝,是為了避免併發情況下,prev 判斷完 == null 時,恰好被修改,從而保證執行緒安全。

  4. 構造方法有 3 個,分別是:

     #Node() 方法:用於 SHARED 的建立。
     
     #Node(Thread thread,Node mode) 方法:用於 #addWaiter(Node mode) 方法。
     從 mode 方法引數中,我們也可以看出它代表獲取同步狀態的模式。
     
     #Node(Thread thread,int waitStatus) 方法,用於 #addConditionWaiter() 方法。
    複製程式碼

入列

CLH 佇列入列很簡單: tail 指向新節點。 新節點的 prev 指向當前最後的節點。 當前最後一個節點的 next 指向當前節點。

但是,實際上,入隊邏輯實現的 #addWaiter(Node) 方法,需要考慮併發的情況。它通過 CAS 的方式,來保證正確的新增 Node 。程式碼如下:

private Node addWaiter(Node mode) {
    // 新建節點
    Node node = new Node(Thread.currentThread(),mode);
    // 記錄原尾節點
    Node pred = tail;
    // 快速嘗試,新增新節點為尾節點
    //當原尾節點非空,才執行快速嘗試的邏輯. 在下面的 #enq(Node node) 方法中,我們會看到,首節點未初始化的時,head 和 tail 都為空。
    if (pred != null) {
        // 設定新 Node 節點的尾節點為原尾節點
        node.prev = pred;
        // CAS 設定新的尾節點
        if (compareAndSetTail(pred,node)) {
            // 成功,原尾節點的下一個節點為新節點
            pred.next = node;
            return node;
        }
    }
    // 失敗,多次嘗試,直到成功
    enq(node);
    return node;
}
複製程式碼
  1. 建立新節點 node 。在建立的構造方法,mode 方法引數,傳遞獲取同步狀態的模式。
  2. 記錄原尾節點 tail 。
  3. 快速嘗試,新增新節點為尾節點。
  4. enq新增失敗,多次嘗試,直到成功新增。

呼叫 #enq(Node node) 方法,多次嘗試,直到成功新增

 private Node enq(final Node node) {
     // 多次嘗試,直到成功為止
     for (;;) {
         // 記錄原尾節點
         Node t = tail;
         // 原尾節點不存在,建立首尾節點都為 new Node()
         if (t == null) {
             if (compareAndSetHead(new Node()))
                 tail = head;
         // 原尾節點存在,新增新節點為尾節點
         } else {
             //設定為尾節點
             node.prev = t;
             // CAS 設定新的尾節點
             if (compareAndSetTail(t,node)) {
                 // 成功,原尾節點的下一個節點為新節點
                 t.next = node;
                 return t;
             }
         }
     }
 }
複製程式碼
  1. “死”迴圈,多次嘗試,直到成功新增為止
  2. 記錄原尾節點 t 。和 #addWaiter(Node node) 方法的相同。
  3. 原尾節點存在,新增新節點為尾節點。和 #addWaiter(Node node) 方法的相同。
  4. 原尾節點不存在,則首節點也不存在了,建立首尾節點都為 new Node() 。
  5. #compareAndSetHead(Node update) 方法,使用 Unsafe 來 CAS 設定尾節點 head 為新節點。

出列

CLH 同步佇列遵循 FIFO,首節點的執行緒釋放同步狀態後,將會喚醒它的下一個節點(Node.next)。而後繼節點將會在獲取同步狀態成功時,將自己設定為首節點( head )。

這個過程非常簡單,head 執行該節點並斷開原首節點的 next 和當前節點的 prev 即可。注意,在這個過程是不需要使用 CAS 來保證的,因為只有一個執行緒,能夠成功獲取到同步狀態。

setHead(Node node) 方法,實現上述的出列邏輯。程式碼如下:

private void setHead(Node node) {
    head = node;
    node.thread = null;
    node.prev = null;
}
複製程式碼

AQS:同步狀態的獲取與釋放

AQS 的設計模式採用的模板方法模式,子類通過繼承的方式,實現它的抽象方法來管理同步狀態。

對於子類而言,它並沒有太多的活要做,AQS 已經提供了大量的模板方法來實現同步,主要是分為三類:

獨佔式獲取和釋放同步狀態
共享式獲取和釋放同步狀態
查詢同步佇列中的等待執行緒情況。
複製程式碼

自定義子類使用 AQS 提供的模板方法,就可以實現自己的同步語義。

獨佔式

獨佔式,同一時刻,僅有一個執行緒持有同步狀態。

獨佔式同步狀態獲取

acquire(int arg)

acquire(int arg) 方法,為 AQS 提供的模板方法。該方法為獨佔式獲取同步狀態,但是該方法對中斷不敏感。也就是說,由於執行緒獲取同步狀態失敗而加入到 CLH 同步佇列中,後續對該執行緒進行中斷操作時,執行緒不會從 CLH 同步佇列中移除。程式碼如下:

public final void acquire(int arg) {
     if (!tryAcquire(arg) &&
         acquireQueued(addWaiter(Node.EXCLUSIVE),arg))
         selfInterrupt();
 }
複製程式碼

呼叫 #tryAcquire(int arg) 方法,去嘗試獲取同步狀態,獲取成功則設定鎖狀態並返回 true ,否則獲取失敗,返回 false 。

若tryAcquire獲取成功,則acquire(int arg) 方法直接返回,不用執行緒阻塞

若 tryAcquire 獲取失敗呼叫 addWaiter(Node mode) 方法,將當前執行緒加入到 CLH 同步佇列尾部,並且, mode 方法引數為 Node.EXCLUSIVE ,表示獨佔模式。然後呼叫 boolean #acquireQueued(Node node,int arg) 方法,自旋直到獲得同步狀態成功。

另外,該 acquireQueued 方法的返回值型別為 boolean ,當返回 true 時,表示在這個過程中,發生過執行緒中斷。但是呢,這個方法又會清理執行緒中斷的標識,所以在種情況下,需要呼叫 #selfInterrupt() 方法,恢復執行緒中斷的標識,程式碼如下:

static void selfInterrupt() {
    Thread.currentThread().interrupt();
}
複製程式碼

tryAcquire(int arg)

tryAcquire(int arg)方法,需要自定義同步元件自己實現,該方法必須要保證執行緒安全的獲取同步狀態。AQS裡程式碼如下:

protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}
複製程式碼

直接丟擲 UnsupportedOperationException 異常。

acquireQueued

boolean #acquireQueued(Node node,int arg) 方法,為一個自旋的過程,也就是說,當前執行緒(Node)進入同步佇列後,就會進入一個自旋的過程,每個節點都會自省地觀察,當條件滿足,獲取到同步狀態後,就可以從這個自旋過程中退出,否則會一直執行下去。

流程圖如下:

程式碼如下:


 final boolean acquireQueued(final Node node,int arg) {
     // 記錄是否獲取同步狀態成功
     boolean failed = true;
     try {
         // 記錄過程中,是否發生執行緒中斷
         boolean interrupted = false;
         /*
          * 自旋過程,其實就是一個死迴圈而已
          */
         for (;;) {
             // 當前執行緒的前驅節點
             final Node p = node.predecessor();
             // 當前執行緒的前驅節點是頭結點,且同步狀態成功
             if (p == head && tryAcquire(arg)) {
                 setHead(node);
                 p.next = null; // help GC
                 failed = false;
                 return interrupted;
             }
             // 獲取失敗,執行緒等待--具體後面介紹
             if (shouldParkAfterFailedAcquire(p,node) &&
                     parkAndCheckInterrupt())
                 interrupted = true;
         }
     } finally {
         // 獲取同步狀態發生異常,取消獲取。
         if (failed)
             cancelAcquire(node);
     }
 }
複製程式碼
  1. failed 變數,記錄是否獲取同步狀態成功。
  2. interrupted 變數,記錄獲取過程中,是否發生執行緒中斷。
  3. 呼叫 Node#predecessor() 方法,獲得當前執行緒的前一個節點 p 。
  4. p == head 程式碼塊,若滿足,則表示當前執行緒的前一個節點為頭節點,因為 head 是最後一個獲得同步狀態成功的節點,此時呼叫 #tryAcquire(int arg) 方法,嘗試獲得同步狀態
  5. 當前節點( 執行緒 )獲取同步狀態成功:
    • 設定當前節點( 執行緒 )為新的 head 。
    • 設定老的頭節點 p 不再指向下一個節點,讓它自身更快的被 GC 。
    • 標記 failed = false ,表示獲取同步狀態成功。
    • 返回記錄獲取過程中,是否發生執行緒中斷。
  6. 呼叫 #shouldParkAfterFailedAcquire(Node pre,Node node) 方法,判斷獲取失敗後,是否當前執行緒需要阻塞等待。
  7. 呼叫 #cancelAcquire(Node node) 方法,取消獲取同步狀態。

shouldParkAfterFailedAcquire

 private static boolean shouldParkAfterFailedAcquire(Node pred,Node node) {
     // 獲得前一個節點的等待狀態
     int ws = pred.waitStatus;
     if (ws == Node.SIGNAL) //  Node.SIGNAL
         /*
          * This node has already set status asking a release
          * to signal it,so it can safely park.
          */
         return true;
     if (ws > 0) { // Node.CANCEL
         /*
          * Predecessor was cancelled. Skip over predecessors and
          * indicate retry.
          */
         do {
             node.prev = pred = pred.prev;
         } while (pred.waitStatus > 0);
         pred.next = node;
     } else { // 0 或者 Node.PROPAGATE
         /*
          * waitStatus must be 0 or PROPAGATE.  Indicate that we
          * need a signal,but don't park yet.  Caller will need to
          * retry to make sure it cannot acquire before parking.
          */
         compareAndSetWaitStatus(pred,ws,Node.SIGNAL);
     }
     return false;
 }
複製程式碼
  1. pred 和 node 方法引數,傳入時,要求前者必須是後者的前一個節點。
  2. 獲得前一個節點( pre )的等待狀態。下面會根據這個狀態有三種情況的處理。
    • 等待狀態為 Node.SIGNAL 時,表示 pred 的下一個節點 node 的執行緒需要阻塞等待。
    • 在 pred 的執行緒釋放同步狀態時,會對 node 的執行緒進行喚醒通知。所以返回 true ,表明當前執行緒可以被 park,安全的阻塞等待。
    • 等待狀態為 0 或者 Node.PROPAGATE 時,通過 CAS 設定,將狀態修改為 Node.SIGNAL ,即下一次重新執行 #shouldParkAfterFailedAcquire(Node pred,Node node) 方法時,滿足條件。但是,對於本次執行,返回 false 。
    • 另外,等待狀態不會為 Node.CONDITION ,因為它用在 ConditonObject 中。
    • 等待狀態為 NODE.CANCELLED 時,則表明該執行緒的前一個節點已經等待超時或者被中斷了,則需要從 CLH 佇列中將該前一個節點刪除掉,迴圈回溯,直到前一個節點狀態 <= 0 。 對於本次執行,返回 false ,需要下一次再重新執行 #shouldParkAfterFailedAcquire(Node pred,Node node) 方法,看看滿足哪個條件。

整個過程如下圖:

cancelAcquire

private void cancelAcquire(Node node) {
    // Ignore if node doesn't exist
    if (node == null)
        return;

    node.thread = null;

    // Skip cancelled predecessors
    Node pred = node.prev;
    while (pred.waitStatus > 0)
        node.prev = pred = pred.prev;

    // predNext is the apparent node to unsplice. CASes below will
    // fail if not,in which case,we lost race vs another cancel
    // or signal,so no further action is necessary.
    Node predNext = pred.next;

    // Can use unconditional write instead of CAS here.
    // After this atomic step,other Nodes can skip past us.
    // Before,we are free of interference from other threads.
    node.waitStatus = Node.CANCELLED;

    // If we are the tail,remove ourselves.
    if (node == tail && compareAndSetTail(node,pred)) {
        compareAndSetNext(pred,predNext,null);
    } else {
        // If successor needs signal,try to set pred's next-link
        // so it will get one. Otherwise wake it up to propagate.
        int ws;
        if (pred != head &&
            ((ws = pred.waitStatus) == Node.SIGNAL ||
             (ws <= 0 && compareAndSetWaitStatus(pred,Node.SIGNAL))) &&
            pred.thread != null) {
            Node next = node.next;
            if (next != null && next.waitStatus <= 0)
                compareAndSetNext(pred,next);
        } else {
            unparkSuccessor(node);
        }

        node.next = node; // help GC
    }
}
複製程式碼
  1. 若傳入引數 node 為空。

  2. 將節點的等待執行緒置空。

  3. 獲得 node 節點的前一個節點 pred 。

  4. 獲得 pred 的下一個節點 predNext 。predNext 從表面上看,和 node 是等價的。 但是實際上,存在多執行緒併發的情況,所以我們呼叫 #compareAndSetNext(...) 方法,使用 CAS 的方式,設定 pred 的下一個節點。 如果設定失敗,說明當前執行緒和其它執行緒競爭失敗,不需要做其它邏輯,因為 pred 的下一個節點已經被其它執行緒設定成功。

  5. 設定 node 節點的為取消的等待狀態 Node.CANCELLED 。 這裡可以使用直接寫,而不是 CAS 。 在這個操作之後,其它 Node 節點可以忽略 node 。 Before,we are free of interference from other threads. 如何理解。

  6. 下面開始開始修改 pred 的新的下一個節點,一共分成三種情況。

    • 如果 node 是尾節點,呼叫 #compareAndSetTail(...) 方法,CAS 設定 pred 為新的尾節點。若上述操作成功,呼叫 #compareAndSetNext(...) 方法,CAS 設定 pred 的下一個節點為空( null )。
    • pred 非首節點。pred 的等待狀態為 Node.SIGNAL ,或者可被 CAS 為 Node.SIGNAL 。pred 的執行緒非空。若 node 的 下一個節點 next 的等待狀態非 Node.CANCELLED ,則呼叫 #compareAndSetNext(...) 方法,CAS 設定 pred 的下一個節點為 next 。
    • 如果 pred 為首節點,呼叫 #unparkSuccessor(Node node) 方法,喚醒 node 的下一個節點的執行緒等待。為什麼此處需要喚醒呢?因為,pred 為首節點,node 的下一個節點的阻塞等待,需要 node 釋放同步狀態時進行喚醒。但是,node 取消獲取同步狀態,則不會再出現 node 釋放同步狀態時進行喚醒 node 的下一個節點。因此,需要此處進行喚醒。

獨佔式獲取響應中斷

AQS 提供了acquire(int arg) 方法,以供獨佔式獲取同步狀態,但是該方法對中斷不響應,對執行緒進行中斷操作後,該執行緒會依然位於CLH同步佇列中,等待著獲取同步狀態。

為了響應中斷,AQS 提供了 #acquireInterruptibly(int arg) 方法。該方法在等待獲取同步狀態時,如果當前執行緒被中斷了,會立刻響應中斷,並丟擲 InterruptedException 異常。

public final void acquireInterruptibly(int arg) throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (!tryAcquire(arg))
        doAcquireInterruptibly(arg);
}
複製程式碼
  1. 首先,校驗該執行緒是否已經中斷了,如果是,則丟擲InterruptedException 異常。
  2. 然後,呼叫 #tryAcquire(int arg) 方法,嘗試獲取同步狀態,如果獲取成功,則直接返回。
  3. 最後,呼叫 #doAcquireInterruptibly(int arg) 方法,自旋直到獲得同步狀態成功,或執行緒中斷丟擲 InterruptedException 異常。

doAcquireInterruptibly

private void doAcquireInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return;
            }
            if (shouldParkAfterFailedAcquire(p,node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException(); // <1>
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
複製程式碼

它與 #acquire(int arg) 方法僅有兩個差別:

  1. 方法宣告丟擲 InterruptedException 異常。

  2. 在中斷方法處不再是使用 interrupted 標誌,而是直接丟擲 InterruptedException 異常。

獨佔式超時獲取

AQS 除了提供上面兩個方法外,還提供了一個增強版的方法 #tryAcquireNanos(int arg,long nanos) 。該方法為 #acquireInterruptibly(int arg) 方法的進一步增強,它除了響應中斷外,還有超時控制。即如果當前執行緒沒有在指定時間內獲取同步狀態,則會返回 false ,否則返回 true 。

流程圖如下:

程式碼如下:

public final boolean tryAcquireNanos(int arg,long nanosTimeout)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    return tryAcquire(arg) ||
        doAcquireNanos(arg,nanosTimeout);
}
複製程式碼
  1. 首先,校驗該執行緒是否已經中斷了,如果是,則丟擲InterruptedException 異常。
  2. 然後,呼叫 #tryAcquire(int arg) 方法,嘗試獲取同步狀態,如果獲取成功,則直接返回。
  3. 最後,呼叫 #tryAcquireNanos(int arg) 方法,自旋直到獲得同步狀態成功,或執行緒中斷丟擲 InterruptedException 異常,或超過指定時間返回獲取同步狀態失敗。

tryAcquireNanos

static final long spinForTimeoutThreshold = 1000L;

private boolean doAcquireNanos(int arg,long nanosTimeout)
        throws InterruptedException {
    // nanosTimeout <= 0
    if (nanosTimeout <= 0L)
        return false;
    // 超時時間
    final long deadline = System.nanoTime() + nanosTimeout;
    // 新增 Node 節點
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        // 自旋
        for (;;) {
            final Node p = node.predecessor();
            // 獲取同步狀態成功
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return true;
            }
            /*
             * 獲取失敗,做超時、中斷判斷
             */
            // 重新計算需要休眠的時間
            nanosTimeout = deadline - System.nanoTime();
            // 已經超時,返回false
            if (nanosTimeout <= 0L)
                return false;
            // 如果沒有超時,則等待nanosTimeout納秒
            // 注:該執行緒會直接從LockSupport.parkNanos中返回,
            // LockSupport 為 J.U.C 提供的一個阻塞和喚醒的工具類,後面做詳細介紹
            if (shouldParkAfterFailedAcquire(p,node) &&
                    nanosTimeout > spinForTimeoutThreshold)
                LockSupport.parkNanos(this,nanosTimeout);
            // 執行緒是否已經中斷了
            if (Thread.interrupted())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
複製程式碼

因為是在 #doAcquireInterruptibly(int arg) 方法的基礎上,做了超時控制的增強,所以相同部分,我們直接跳過。

  1. 如果超時時間小於 0 ,直接返回 false ,已經超時。
  2. 計算最終超時時間 deadline 。
  3. 重新計算剩餘可獲取同步狀態的時間 nanosTimeout 。
  4. 如果剩餘時間小於 0 ,直接返回 false ,已經超時。
  5. 如果剩餘時間大於 spinForTimeoutThreshold ,則呼叫 LockSupport#parkNanos(Object blocker,long nanos) 方法,休眠 nanosTimeout 納秒。否則,就不需要休眠了,直接進入快速自旋的過程。原因在於,spinForTimeoutThreshold 已經非常小了,非常短的時間等待無法做到十分精確,如果這時再次進行超時等待,相反會讓 nanosTimeout 的超時從整體上面表現得不是那麼精確。所以,在超時非常短的場景中,AQS 會進行無條件的快速自旋。
  6. 若執行緒已經中斷了,丟擲 InterruptedException 異常。

獨佔式同步狀態釋放

當執行緒獲取同步狀態後,執行完相應邏輯後,就需要釋放同步狀態。AQS 提供了#release(int arg)方法,釋放同步狀態。程式碼如下:

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}
複製程式碼
  1. 呼叫 #tryRelease(int arg) 方法,去嘗試釋放同步狀態,釋放成功則設定鎖狀態並返回 true ,否則獲取失敗,返回 false 。

  2. tryRelease(int arg) 方法,需要自定義同步元件自己實現,該方法必須要保證執行緒安全的釋放同步狀態。程式碼如下:

protected boolean tryRelease(int arg) {
    throw new UnsupportedOperationException();
}
複製程式碼

直接丟擲 UnsupportedOperationException 異常。 3. 獲得當前的 head ,避免併發問題。

  1. 頭結點不為空,並且頭結點狀態不為 0 ( INITAL 未初始化)。為什麼會出現 0 的情況呢?

  2. 呼叫 #unparkSuccessor(Node node) 方法,喚醒下一個節點的執行緒等待。

總結

  1. 在 AQS 中維護著一個 FIFO 的同步佇列。

  2. 當執行緒獲取同步狀態失敗後,則會加入到這個 CLH 同步佇列的對尾,並一直保持著自旋。

  3. 在 CLH 同步佇列中的執行緒在自旋時,會判斷其前驅節點是否為首節點,如果為首節點則不斷嘗試獲取同步狀態,獲取成功則退出CLH同步佇列。

  4. 當執行緒執行完邏輯後,會釋放同步狀態,釋放後會喚醒其後繼節點。

共享式

共享式與獨佔式的最主要區別在於,同一時刻:

獨佔式只能有一個執行緒獲取同步狀態。
共享式可以有多個執行緒獲取同步狀態。
複製程式碼

例如,讀操作可以有多個執行緒同時進行,而寫操作同一時刻只能有一個執行緒進行寫操作,其他操作都會被阻塞。例子為 ReentrantReadWriteLock 。

共享式同步狀態獲取

acquireShared(int arg) 方法,對標 #acquire(int arg) 方法。

 public final void acquireShared(int arg) {
     if (tryAcquireShared(arg) < 0)
         doAcquireShared(arg);
 }
複製程式碼

呼叫 #tryAcquireShared(int arg) 方法,嘗試獲取同步狀態,獲取成功則設定鎖狀態並返回大於等於 0 ,否則獲取失敗,返回小於 0 。

若獲取成功,直接返回,不用執行緒阻塞,獲取失敗則自旋直到獲得同步狀態成功。

tryAcquireShared(int arg) 方法

需要自定義同步元件自己實現,該方法必須要保證執行緒安全的獲取同步狀態。程式碼如下:

protected int tryAcquireShared(int arg) {
    throw new UnsupportedOperationException();
}
複製程式碼

直接丟擲 UnsupportedOperationException 異常。

doAcquireShared

 private void doAcquireShared(int arg) {
     // 共享式節點
     final Node node = addWaiter(Node.SHARED);
     boolean failed = true;
     try {
         boolean interrupted = false;
         for (;;) {
             // 前驅節點
             final Node p = node.predecessor();
             // 如果其前驅節點,獲取同步狀態
             if (p == head) {
                 // 嘗試獲取同步
                 int r = tryAcquireShared(arg);
                 if (r >= 0) {
                     setHeadAndPropagate(node,r);
                     p.next = null; // help GC
                     if (interrupted)
                         selfInterrupt();
                     failed = false;
                     return;
                 }
             }
             if (shouldParkAfterFailedAcquire(p,node) &&
                     parkAndCheckInterrupt())
                 interrupted = true;
         }
     } finally {
         if (failed)
             cancelAcquire(node);
     }
 }
複製程式碼

因為和 #acquireQueued(int arg) 方法的基礎上,所以相同部分,直接跳過。

  1. 呼叫 #addWaiter(Node mode) 方法,將當前執行緒加入到 CLH 同步佇列尾部。並且, mode 方法引數為 Node.SHARED ,表示共享模式。

  2. 呼叫 #tryAcquireShared(int arg) 方法,嘗試獲得同步狀態。

  3. 呼叫 #setHeadAndPropagate(Node node,int propagate) 方法,設定新的首節點,並根據條件,喚醒下一個節點。這裡和獨佔式同步狀態獲取很大的不同:通過這樣的方式,不斷喚醒下一個共享式同步狀態, 從而實現同步狀態被多個執行緒的共享獲取。

setHeadAndPropagate

private void setHeadAndPropagate(Node node,int propagate) {
    Node h = head; // Record old head for check below
    setHead(node);
    /*
     * Try to signal next queued node if:
     *   Propagation was indicated by caller,*     or was recorded (as h.waitStatus either before
     *     or after setHead) by a previous operation
     *     (note: this uses sign-check of waitStatus because
     *      PROPAGATE status may transition to SIGNAL.)
     * and
     *   The next node is waiting in shared mode,*     or we don't know,because it appears null
     *
     * The conservatism in both of these checks may cause
     * unnecessary wake-ups,but only when there are multiple
     * racing acquires/releases,so most need signals now or soon
     * anyway.
     */
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())
            doReleaseShared();
    }
}
複製程式碼
  1. 記錄原來的首節點 h 。
  2. 呼叫 #setHead(Node node) 方法,設定 node 為新的首節點。
  3. propagate > 0 程式碼塊,說明同步狀態還能被其他執行緒獲取。
  4. 判斷原來的或者新的首節點,等待狀態為 Node.PROPAGATE 或者 Node.SIGNAL 時,可以繼續向下喚醒。
  5. 呼叫 Node#isShared() 方法,判斷下一個節點為共享式獲取同步狀態。
  6. 呼叫 #doReleaseShared() 方法,喚醒後續的共享式獲取同步狀態的節點。

共享式獲取響應中斷

acquireSharedInterruptibly(int arg) 方法

程式碼如下:

public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node,r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p,node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
複製程式碼

共享式超時獲取

tryAcquireSharedNanos(int arg,long nanosTimeout) 方法

程式碼如下:

public final boolean tryAcquireSharedNanos(int arg,long nanosTimeout)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    return tryAcquireShared(arg) >= 0 ||
        doAcquireSharedNanos(arg,nanosTimeout);
}

private boolean doAcquireSharedNanos(int arg,long nanosTimeout)
        throws InterruptedException {
    if (nanosTimeout <= 0L)
        return false;
    final long deadline = System.nanoTime() + nanosTimeout;
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node,r);
                    p.next = null; // help GC
                    failed = false;
                    return true;
                }
            }
            nanosTimeout = deadline - System.nanoTime();
            if (nanosTimeout <= 0L)
                return false;
            if (shouldParkAfterFailedAcquire(p,node) &&
                nanosTimeout > spinForTimeoutThreshold)
                LockSupport.parkNanos(this,nanosTimeout);
            if (Thread.interrupted())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
複製程式碼

共享式同步狀態釋放

當執行緒獲取同步狀態後,執行完相應邏輯後,就需要釋放同步狀態。AQS 提供了#releaseShared(int arg)方法,釋放同步狀態。程式碼如下:

 public final boolean releaseShared(int arg) {
     if (tryReleaseShared(arg)) {
         doReleaseShared();
         return true;
     }
     return false;
 }
複製程式碼

呼叫 #tryReleaseShared(int arg) 方法,去嘗試釋放同步狀態,釋放成功則設定鎖狀態並返回 true ,否則獲取失敗,返回 false 。呼叫 #doReleaseShared() 方法,喚醒後續的共享式獲取同步狀態的節點。

tryReleaseShared(int arg) 方法

需要自定義同步元件自己實現,該方法必須要保證執行緒安全的釋放同步狀態。程式碼如下:

protected boolean tryReleaseShared(int arg) {
    throw new UnsupportedOperationException();
}
複製程式碼

直接丟擲 UnsupportedOperationException 異常。

doReleaseShared

 private void doReleaseShared() {
     /*
      * Ensure that a release propagates,even if there are other
      * in-progress acquires/releases.  This proceeds in the usual
      * way of trying to unparkSuccessor of head if it needs
      * signal. But if it does not,status is set to PROPAGATE to
      * ensure that upon release,propagation continues.
      * Additionally,we must loop in case a new node is added
      * while we are doing this. Also,unlike other uses of
      * unparkSuccessor,we need to know if CAS to reset status
      * fails,if so rechecking.
      */
     for (;;) {
         Node h = head;
         if (h != null && h != tail) {
             int ws = h.waitStatus;
             if (ws == Node.SIGNAL) {
                 if (!compareAndSetWaitStatus(h,Node.SIGNAL,0))
                     continue;            // loop to recheck cases
                 unparkSuccessor(h);
             }
             else if (ws == 0 &&
                      !compareAndSetWaitStatus(h,0,Node.PROPAGATE))
                 continue;                // loop on failed CAS
         }
         if (h == head)                   // loop if head changed
             break;
     }
 }

複製程式碼

AQS:阻塞和喚醒執行緒

parkAndCheckInterrupt

線上程獲取同步狀態時,如果獲取失敗,則加入 CLH 同步佇列,通過通過自旋的方式不斷獲取同步狀態,但是在自旋的過程中,則需要判斷當前執行緒是否需要阻塞,其主要方法在acquireQueued(int arg) ,程式碼如下:

// ... 省略前面無關程式碼

if (shouldParkAfterFailedAcquire(p,node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;

// ... 省略前面無關程式碼
複製程式碼

通過這段程式碼我們可以看到,在獲取同步狀態失敗後,執行緒並不是立馬進行阻塞,需要檢查該執行緒的狀態,檢查狀態的方法為 #shouldParkAfterFailedAcquire(Node pred,Node node)方法,該方法主要靠前驅節點判斷當前執行緒是否應該被阻塞。

如果 #shouldParkAfterFailedAcquire(Node pred,Node node) 方法返回 true ,則呼叫parkAndCheckInterrupt() 方法,阻塞當前執行緒。程式碼如下:

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}
複製程式碼

開始,呼叫 LockSupport#park(Object blocker) 方法,將當前執行緒掛起,此時就進入阻塞等待喚醒的狀態。

然後,線上程被喚醒時,呼叫 Thread#interrupted()方法,返回當前執行緒是否被打斷,並清理打斷狀態。

    public static boolean interrupted() {
        return currentThread().isInterrupted(true);
    }
    private native boolean isInterrupted(boolean ClearInterrupted);
複製程式碼

所以,實際上,執行緒被喚醒有兩種情況:

第一種,當前節點(執行緒)的前序節點釋放同步狀態時,喚醒了該執行緒 。
第二種,當前執行緒被打斷導致喚醒。
複製程式碼

unparkSuccessor

當執行緒釋放同步狀態後,則需要喚醒該執行緒的後繼節點。程式碼如下:

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h); // 喚醒後繼節點
        return true;
    }
    return false;
}
複製程式碼

呼叫 unparkSuccessor(Node node) 方法,喚醒後繼節點:

private void unparkSuccessor(Node node) {
    //當前節點狀態
    int ws = node.waitStatus;
    //當前狀態 < 0 則設定為 0
    if (ws < 0)
        compareAndSetWaitStatus(node,0);

    //當前節點的後繼節點
    Node s = node.next;
    //後繼節點為null或者其狀態 > 0 (超時或者被中斷了)
    if (s == null || s.waitStatus > 0) {
        s = null;
        //從tail節點來找可用節點
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    //喚醒後繼節點
    if (s != null)
        LockSupport.unpark(s.thread);
}
複製程式碼
  1. 可能會存在當前執行緒的後繼節點為 null,例如:超時、被中斷的情況。如果遇到這種情況了,則需要跳過該節點。

  2. 但是,為何是從 tail 尾節點開始,而不是從 node.next 開始呢?原因在於,取消的 node.next.next 指向的是 node.next 自己。如果順序遍歷下去,會導致死迴圈。所以此時,只能採用 tail 回溯的辦法,找到第一個( 不是最新找到的,而是最前序的 )可用的執行緒。

  3. 但是,為什麼取消的 node.next.next 指向的是 node.next 自己呢?在 #cancelAcquire(Node node) 的末尾,node.next = node; 程式碼塊,取消的 node 節點,將其 next 指向了自己。 最後,呼叫 LockSupport的unpark(Thread thread) 方法,喚醒該執行緒。

LockSupport

LockSupport 是用來建立鎖和其他同步類的基本執行緒阻塞原語。

每個使用 LockSupport 的執行緒都會與一個許可與之關聯:

如果該許可可用,並且可在程式中使用,則呼叫 #park(...) 將會立即返回,否則可能阻塞。
如果許可尚不可用,則可以呼叫 #unpark(...) 使其可用。
但是,注意許可不可重入,也就是說只能呼叫一次 park(...) 方法,否則會一直阻塞。
LockSupport 定義了一系列以 park 開頭的方法來阻塞當前執行緒,unpark(Thread thread) 方法來喚醒一個被阻塞的執行緒。
複製程式碼

如下圖所示:

park(Object blocker)

方法的blocker引數,主要是用來標識當前執行緒在等待的物件,該物件主要用於問題排查和系統監控。

park 方法和 unpark(Thread thread) 方法,都是成對出現的。同時 unpark(Thread thread) 方法,必須要在 park 方法執行之後執行。當然,並不是說沒有呼叫 unpark(Thread thread) 方法的執行緒就會一直阻塞

park 有一個方法,它是帶了時間戳的 #parkNanos(long nanos) 方法:為了執行緒排程禁用當前執行緒,最多等待指定的等待時間,除非許可可用。

public static void park() {
    UNSAFE.park(false,0L);
}
複製程式碼

unpark

public static void unpark(Thread thread) {
    if (thread != null)
        UNSAFE.unpark(thread);
}
複製程式碼

實現原理

從上面可以看出,其內部的實現都是通過 sun.misc.Unsafe 來實現的,其定義如下:

// UNSAFE.java
public native void park(boolean var1,long var2);
public native void unpark(Object var1);
複製程式碼