1. 程式人生 > 其它 >Java 併發程式設計(三)鎖與 AQS

Java 併發程式設計(三)鎖與 AQS

本文 JDK 對應的版本為 JDK 13


由於傳統的 synchronized 關鍵字提供的內建鎖存在的一些缺點,自 JDK 1.5 開始提供了 Lock 介面來提供內建鎖不具備的功能。顯式鎖的出現不是為了替代 synchronized提供的內建鎖,而是當內建鎖的機制不適用時,作為一種可選的高階功能


內建鎖與顯式鎖

內建鎖於顯式鎖的比較如下表:

類別 synchronized Lock
存在層次 Java的關鍵字 是一個類
鎖的釋放 1、以獲取鎖的執行緒執行完同步程式碼,釋放鎖
2、執行緒執行發生異常,jvm會讓執行緒釋放鎖
在finally中必須釋放鎖,
不然容易造成執行緒死鎖
鎖的獲取 假設A執行緒獲得鎖,B執行緒等待。
如果A執行緒阻塞,B執行緒會一直等待
Lock有多個鎖獲取的方式
鎖狀態 無法判斷 可以判斷
鎖型別 可重入
不可中斷
非公平
可重入
可判斷
可公平(兩者皆可)
效能 少量同步 大量同步

### 顯式鎖的基本使用

Lock 的定義如下:

public interface Lock {
    // 顯式地獲取鎖
    void lock();
    // 可中斷地獲取鎖,與 lock() 方法的不同之處在於在鎖的獲取過程可以被中斷
    void lockInterruptibly() throws InterruptedException;
    // 以非阻塞的方式獲取鎖,呼叫該方法將會立即返回,如果成功獲取到鎖則返回 true,否則返回 false
    boolean tryLock();
    /* 帶時間引數的 tryLock,
       有三種情況:在規定時間內獲取到了鎖;在規定的時間內執行緒被中斷了;在規定的時間內沒有獲取到鎖
    */
    boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
    // 釋放鎖
    void unlock();
    /* 
    	獲取 “等待/通知” 元件,該元件和當前的執行緒繫結,當前的執行緒只有獲取到了鎖,
    	才能呼叫該元件的 wait 方法,而呼叫之後,當前執行緒將會釋放鎖
    */
    Condition newCondition();
}

常用的 Lock 的實現類為 java.util.concurrent.locks.ReentrantLock,使用的示例如下:

private final static Lock lock = new ReentrantLock();
static int value = 0;

static class Demo implements Runnable {
    @Override
    public void run() {
        lock.lock();
        try {
            value++;
        } finally { // 一定要講解鎖操作放入到 finally 中,否則有可能會造成死鎖
            lock.unlock();
        }
    }
}

ReentrantLock 是基於 java.util.concurrent.locks.AbstractQueuedSynchronizer 的具體子類來實現同步的,這個類也被稱為 AQS,是 JUC 中實現 Lock 最為核心的部分

AQS

構建同步類

使用 AQS 構建同步類時獲取鎖和釋放鎖的標準形式如下:[1]

boolean acquire() throws InterruptedException {
    while (當前狀態不允許獲取操作) {
        if (需要阻塞獲取請求) {
            如果當前執行緒不在佇列中,則將其插入佇列
            阻塞當前執行緒
        } else {
            返回失敗
        }
    }
    
    可能更新同步器的狀態
    如果執行緒位於佇列中,則將其移出佇列
    返回成功
}

void release () {
    更新同步器狀態
    if (新的狀態允許某個阻塞的執行緒獲取成功) {
        解除佇列中一個或多個執行緒的阻塞狀態
    }
}

對於支援獨佔式的同步器,需要實現一些 protected 修飾的方法,包括 tryAcquiretryReleaseisHeldExclusively等;

對於支援共享式的同步器,應該實現的方法有 tryAcquireSharedtryReleaseShared

AQSacquireacquireSharedreleasereleaseShared 等方法都將呼叫這些方法在子類中帶有的字首 try 的版本來判斷某個操作能否被執行。

在同步器的子類中,可以根據其獲取操作和釋放操作的語義,使用 getStatesetState以及 compareAndSetState 來檢查和更新狀態,並根據返回的狀態值來告知基類 “獲取” 和 “釋放” 同步的操作是否是成功的。


原始碼解析

AQS 的類結構圖如下:

類屬性分析

  • AQS 例項物件的屬性

    AQS 中存在非 static 的欄位如下(static 欄位沒有分析的必要):

    // 頭節點,即當前持有鎖的執行緒
    private transient volatile Node head;
    
    // 阻塞佇列的尾結點,每個新的節點進來都會插入到尾部
    private transient volatile Node tail;
    
    /*
    	代表鎖的狀態,0 表示沒有被佔用,大於 0 表示有執行緒持有當前的鎖
    	這個值可以大於 1,因為鎖是可重入的,每次重入時都會將這個值 +1
    */
    private volatile int state;
    
    /*
    	這個屬性繼承自 AbstractOwnableSynchronizer,
    	表示當前持有獨佔鎖的執行緒
    */
    private transient Thread exclusiveOwnerThread;
    
  • 佇列節點物件的屬性

    static final class Node {
        // 標記當前的節點處於共享模式
        static final Node SHARED = new Node();
        // 表示當前的節點處於獨佔模式
        static final Node EXCLUSIVE = null;
        // 這個值表示當前節點的執行緒已經被取消了
        static final int CANCELLED =  1;
        // 表示當前節點的下一個節點需要被喚醒
        static final int SIGNAL    = -1;
        // 表示當前節點在等待一個條件
        static final int CONDITION = -2;
        // 表示下一個 acquireShared 應當無條件地傳播
        static final int PROPAGATE = -3;
        
        /* 
        	當前節點的等待狀態,取值為上面的 
        	CANCELLED、SIGNAL、CONDITION、PROPAGATE 或者 0
        */
        volatile int waitStatus;
        // 當前節點的前節點
        volatile Node prev;
        // 當前節點的下一個節點
        volatile Node next;
        // 當前節點儲存的執行緒
        volatile Thread thread;
        // 連結到下一個等待條件的節點(條件佇列),或者是特殊值為 SHARED 的節點
        Node nextWaiter;
    }
    

最後得到的阻塞佇列如下圖所示:

[2]

注意,這裡的阻塞佇列不包含頭結點 head

具體分析

  • acquire(int arg)

    該方法位於 java.util.concurrent.locks.AbstractQueuedSynchronizer 中,具體對應的原始碼如下:

    public final void acquire(int arg) {
      /*
      	如果 tryAcquire(arg) 成功了(即嘗試獲取鎖成功了),那麼就直接獲取到了鎖
      	否則,就需要呼叫 acquireQueued 方法將這個執行緒放入到阻塞佇列中
      */
      if (!tryAcquire(arg) &&
          // 如果嘗試獲取鎖沒有成功,那麼久將當前的執行緒掛起,放入到阻塞佇列中
          acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
    }
    

    tryAcquire(arg) 對應的原始碼如下:

    // AbstractQueuedSynchronizer 中定義的。。。
    protected boolean tryAcquire(int arg) { // 在 AbstractQueuedSynchronizer 中定義的模版方法,需要具體的子類來實現
      throw new UnsupportedOperationException();
    }
    

    為了簡化這個過程,以 ReentrantLockFairSync 為例檢視具體的實現:

    // ReentrantLock.FairSync。。。
    @ReservedStackAccess
    /*
    	嘗試直接獲取鎖,返回值為 boolean,表示是否獲取到鎖
    	返回為 true: 1.沒有執行緒在等待鎖 2.重入鎖,執行緒本來就持有鎖,因此可以再次獲取當前的鎖
    */
    protected final boolean tryAcquire(int acquires) {
      final Thread current = Thread.currentThread();
      int c = getState();
      if (c == 0) { // state 為 0 表示此時沒有執行緒持有鎖
        /*
        	當前的鎖為公平鎖(FairSync),因此即使當前鎖是可以獲取的,
        	但是需要首先檢查是否已經有別的執行緒在等待這個鎖
        */
        if (!hasQueuedPredecessors() &&
            /*
            	如果沒有執行緒在等待,那麼則嘗試使用 CAS 修改狀態獲取鎖,如果成功,則獲取到當前的鎖
            	如果使用 CAS 獲取鎖失敗,那麼就說明幾乎在同一時刻有個執行緒搶先獲取了這個鎖
            */
            compareAndSetState(0, acquires)) {
          // 到這裡就已經獲取到鎖了,標記一下當前的鎖,表示已經被當前的執行緒佔用了
          setExclusiveOwnerThread(current);
          return true;
        }
      }
      /*
      	如果已經有執行緒持有了當前的鎖,那麼首先需要檢測一下是不是當前執行緒持有的鎖
      	如果是當前執行緒持有的鎖,那麼就是一個重入鎖,需要對 state 變數 +1
      	否則,當前的鎖已經被其它執行緒持有了,獲取失敗
      */
      else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
          throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
      }
      return false;
    }
    

    現在再回到 acquire 方法,如果 trAcquire(arg) 成功獲取到了鎖,那麼就是成功獲取到了鎖,直接返回即可;如果 tryAcquire(arg) 獲取鎖失敗了,則再執行 acquireQueued 方法將當前執行緒放入到阻塞佇列尾部

    在那之前,首先會執行 acquireQueued 方法中呼叫的 addWaiter(Node.EXCLUSIVE) 方法,具體的原始碼如下:

    // AbstractQueuedSynchronizer
    
    /*
    	這個方法的作用是將當前的執行緒結合給定的 mode 組合成為一個 Node,以便插入到阻塞佇列的末尾
    	結合當前的上下文,傳入的 mode 為 Node.EXCLUSIVE,即獨佔鎖的模式
    */
    private Node addWaiter(Node mode) {
      Node node = new Node(mode);
    
      for (;;) { // 注意這裡的永真迴圈。。。
        Node oldTail = tail;
        /*
        	如果尾結點不為 null,則使用 CAS 的方式將 node 插入到阻塞佇列的尾部
        */
        if (oldTail != null) {
          node.setPrevRelaxed(oldTail); // 設定當前 node 的前驅節點為原先的 tail 節點
          if (compareAndSetTail(oldTail, node)) { // CAS 的方式設定尾結點
            oldTail.next = node;
            return node; // 返回當前的節點 
          }
        } else {
          // 如果當前的阻塞佇列為空的話,那麼首先需要初始化阻塞佇列
          initializeSyncQueue();
        }
      }
    }
    
    // 初始化阻塞佇列對應的原始碼如下
    private final void initializeSyncQueue() {
      Node h;
      // 依舊是使用 CAS 的方式,這裡的 h 的初始化為延遲初始化
      if (HEAD.compareAndSet(this, null, (h = new Node())))
        tail = h;
    }
    

    之後就是執行 acquireQueued 方法了,對應的原始碼如下:

    // AbstractQueuedSynchronizer
    
    /*
    	此時的引數 node 已經經過 addWaiter 的處理,已經被新增到阻塞佇列的末尾了
    	如果 acquireQueued(addWaiter(Node.EXCLUSIVE), arg) 呼叫之後返回 true,那麼就會執行 acquire(int arg) 方法中的 selfInterrupt() 方法
    
    	這個方法是比較關鍵的部分,是真正處理執行緒掛起,然後被喚醒去獲取鎖,都在這個方法中定義
    */
    final boolean acquireQueued(final Node node, int arg) {
      boolean interrupted = false;
      try {
        for (;;) { // 注意這裡的永真迴圈
          // predecessor() 返回的是當前 node 節點的前驅節點
          final Node p = node.predecessor();
    
          /*
          	p == head 表示當前的節點雖然已經進入到了阻塞佇列,但是是阻塞佇列中的第一個元素(阻塞佇列不包含 head 節點)
          	因此當前的節點可以嘗試著獲取一下鎖,這是由於當前的節點是阻塞佇列的第一個節點,而 head 節點又是延遲初始化的,在這種情況下是有可能獲取到鎖的
          */
          if (p == head && tryAcquire(arg)) {
            setHead(node);
            p.next = null; // help GC
            return interrupted;
          }
    
          /*
          	如果執行到這個位置,則說明 node 要麼就不是隊頭元素,要麼就是嘗試獲取鎖失敗
          */
          if (shouldParkAfterFailedAcquire(p, node))
            interrupted |= parkAndCheckInterrupt();
        }
      } catch (Throwable t) {
        cancelAcquire(node);
        if (interrupted)
          selfInterrupt();
        throw t;
      }
    }
    
    // parkAndCheckInterrupt() 對應的原始碼
    /*
    	該方法的主要任務是掛起當前執行緒,使得當前執行緒在此等待被喚醒
    */
    private final boolean parkAndCheckInterrupt() {
      LockSupport.park(this); // 該方法用於掛起當前執行緒
      return Thread.interrupted();
    }
    

    shouldParkAfterFailedAcquire(p, node) 對應的原始碼如下:

    // AbstractQueuedSynchronizer
    
    /*
    	這個方法的主要任務是判斷當前沒有搶到鎖的執行緒是否需要阻塞
    	第一個引數表示當前節點的前驅節點,第二個引數表示當前執行緒的節點
    */
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
      int ws = pred.waitStatus;
      // 前驅節點正常,則需要阻塞當前執行緒節點
      if (ws == Node.SIGNAL)
        /*
        * This node has already set status asking a release
        * to signal it, so it can safely park.
        */
        return true;
      
      /*
      	前驅節點的狀態值大於 0 表示前驅節點取消了排隊
      	如果當前的節點被阻塞了,喚醒它的為它的前驅節點,因此為了使得能夠正常工作,
      	需要將當前節點的前驅節點設定為一個正常的節點,使得當前的節點能夠被正常地喚醒
      */
      if (ws > 0) {
        /*
        * Predecessor was cancelled. Skip over predecessors and
        * indicate retry.
        */
        do {
          node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
      } else {
        /*
        * waitStatus must be 0 or PROPAGATE.  Indicate that we
        * need a signal, but don't park yet.  Caller will need to
        * retry to make sure it cannot acquire before parking.
        */
        
        /*
        	如果不滿足以上兩個條件,那麼當前的 ws 的狀態就只能為 0, -2, -3 了
        	在當前的上下文環境中,ws 的狀態為 0,因此這裡就是將當前節點的前驅節點的 ws 值設定為 Node.SIGNAL
        */
        pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
      }
      
      /*
      	本次執行到此處會返回 false,而 acquireQueued 中的永真迴圈將會再次進入這個方法
      	由於上面的一系列操作,當前節點的前驅節點一定是正常的 Node.SIGNAL,因此會在第一個 if 語句中直接返回 true
      */
      return false;
    }
    
  • release(int arg)

    該方法用於釋放當前獲取到的鎖,對應的具體的原始碼如下:

    // AbstractQueuedSynchronizer
    
    // 釋放在獨佔模式中獲取到的鎖
    public final boolean release(int arg) {
      if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
          unparkSuccessor(h);
        return true;
      }
      return false;
    }
    

    tryRelease(arg) 對應的原始碼如下:

    // AbstractQueuedSynchronizer
    
    // 很明顯,這也是一個模版方法,需要具體子類來定義對應的實現
    protected boolean tryRelease(int arg) {
      throw new UnsupportedOperationException();
    }
    

    依舊以 ReentrantLock 為例,檢視一下 tryRelease(int arg) 的具體實現

    // ReentrantLock.Sync
    
    @ReservedStackAccess
    protected final boolean tryRelease(int releases) {
      int c = getState() - releases;
      if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    
      boolean free = false; // 是否已經完全釋放鎖的標記
      
      // 如果 c > 0,則說明獲取的鎖是一個重入鎖,還沒有完全釋放
      if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
      }
      setState(c);
      return free;
    }
    

    再回到 release(int arg) 方法中,如果是已經完全釋放了鎖,則執行後面的 return false 語句,執行結束。如果沒有完全釋放鎖,那麼則會繼續執行 unparkSuccessor(h) 方法,對應的原始碼如下:

    // AbstractQueuedSynchronizer
    
    // 喚醒後繼節點
    private void unparkSuccessor(Node node) {
      /*
      * If status is negative (i.e., possibly needing signal) try
      * to clear in anticipation of signalling.  It is OK if this
      * fails or if status is changed by waiting thread.
      */
      int ws = node.waitStatus;
      if (ws < 0)
        node.compareAndSetWaitStatus(ws, 0);
    
      /*
      * Thread to unpark is held in successor, which is normally
      * just the next node.  But if cancelled or apparently null,
      * traverse backwards from tail to find the actual
      * non-cancelled successor.
      */
      
      /*
      	喚醒後繼節點,但是可能後繼節點取消了等待(即 waitStatus = Node.CANCELLED)
      	在這種情況下,將會從隊尾向前查詢,找到最靠近 head 的 waitStatus < 0 的節點
      */
      Node s = node.next;
      if (s == null || s.waitStatus > 0) {
        s = null;
        // 從隊尾開始向前查詢,找到第一個合適的節點
        for (Node p = tail; p != node && p != null; p = p.prev)
          if (p.waitStatus <= 0) // 可能排在前面的節點取消的可能性更大
            s = p;
      }
      
      if (s != null) // 喚醒這個合適的節點對應的執行緒
        LockSupport.unpark(s.thread);
    }
    

    在釋放了所有的鎖之後,喚醒後繼的一個還沒有被取消的執行緒節點,然後喚醒它,喚醒之後的節點將恢復原來在 parkAndCheckInterrupt() 中的執行狀態

    private final boolean parkAndCheckInterrupt() {
      LockSupport.park(this); // 被喚醒後將繼續執行後面的程式碼
      return Thread.interrupted(); // 此時應當是沒有被中斷的
    }
    

    再回到原先的 acquireQueued(node, arg) 方法,此時由於 head 已經釋放了鎖,而當前的 node 節點是距離 head 最近的一個有效的執行緒節點,因此它能夠獲取到鎖,執行緒在獲取鎖之後再繼續執行對應的程式碼邏輯

ConditionObject

ConditionObject 一般用於 “生產者—消費者” 的模式中,與基於Objectwait()notifyAll() 實現的通訊機制十分類似。

對應的 ConditionObject 的原始碼如下:

public class ConditionObject implements Condition, java.io.Serializable {
  // 條件佇列的第一個節點
  private transient Node firstWaiter;
  // 條件佇列的最後一個節點
  private transient Node lastWaiter;
}

與前文的阻塞佇列相對應,條件佇列與阻塞佇列的對應關係圖如下所示:

[3]

具體解釋:

  1. 條件佇列和阻塞佇列的節點,都是 Node 的例項物件,因為條件佇列的節點是需要轉移到阻塞佇列中取得
  2. ReentrantLock 的例項物件可以通過多次呼叫 newCondition() 方法來生成新的 Condition 物件(最終由 AQS 的具體子類物件生成)。在 AQS 中,對於 Condition 的具體實現為 ConditionObject,這個物件只有兩個屬性欄位:firstWaiterlastWaiter
  3. 每個 ConditionObject 都有一個自己的條件佇列,執行緒 1 通過呼叫 Condition 物件的 await 方法即可將當前的呼叫執行緒包裝成為 Node 後加入到條件佇列中,然後阻塞在條件佇列中,不再繼續執行後面的程式碼
  4. 呼叫 Condition 物件的 signal() 方法將會觸發一次喚醒事件,與 Objectnotify() 方法類似。此時喚醒的是條件佇列的隊頭節點,喚醒後會將 firstWaiter 的節點移動到阻塞佇列的末尾,然後在阻塞佇列中等待獲取鎖,之後獲取鎖之後才能繼續執行
await 方法

await 方法對應的原始碼如下:

// AbstractQueuedSynchronizer.ConditionObject
/*
	丟擲 InterruptedException 表示這個方法是可以被中斷的
	這個方法會被阻塞,直到呼叫 signal 方法(singnal 和 singnalAll)喚醒或者被中斷
*/
public final void await() throws InterruptedException {
    // 按照規範,應該在最開始的位置就首先檢測一次中斷
    if (Thread.interrupted())
        throw new InterruptedException();
    
    // 將當前的執行緒封裝成 Node,新增到條件佇列中 
    Node node = addConditionWaiter();
    
    /* 
    	釋放鎖,返回值是釋放鎖之前的 state 值
    	在呼叫 await 方法之前,當前的執行緒肯定是持有鎖的,在這裡需要釋放掉當前持有的鎖
    */
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    /*
    	isOnSyncQueue(node) 返回 true 表示當前的節點已經從條件佇列轉移到阻塞隊列了
    */
    while (!isOnSyncQueue(node)) {
        /* 
        	如果當前的節點不在阻塞佇列中,那麼將當前節點中的執行緒掛起,
        	直到通過呼叫 Condition 物件的 signal* 方法來喚醒它
        */
        LockSupport.park(this);
        // 執行緒被中斷,因此需要退出當前的迴圈
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    
    // 進入阻塞佇列之後,等待獲取鎖
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

addConditionWaiter() 對應的原始碼如下:

// AbstractQueuedSynchronizer.ConditionObject

/*
	將當前執行緒包裝成一個 Node,插入的條件佇列末尾
*/
private Node addConditionWaiter() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    // 當前 ConditionObject 中條件佇列的尾節點
    Node t = lastWaiter;
    // If lastWaiter is cancelled, clean out.
    /*
    	如果尾結點的執行緒已經被取消了,那麼就清除它
    	注意當前節點所處的佇列為條件佇列,因此每個節點的狀態都應該是 Node.CONDITION
    */
    if (t != null && t.waitStatus != Node.CONDITION) {
        // 該方法會從前到後清除所有的不滿足條件的節點
        unlinkCancelledWaiters();
        t = lastWaiter;
    }

    // 建立一個新的 Node,當前的 Node 的 waitStatus 為 Node.CONDITION
    Node node = new Node(Node.CONDITION);

    // 處理初始佇列為空的情況
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;

    lastWaiter = node;
    return node;
}

/*
	清除當前 ConditionObject 的條件佇列中所有 waitStatus 不為 CONDITION 的節點
*/
private void unlinkCancelledWaiters() {
    Node t = firstWaiter;
    Node trail = null;
    // 單純的連結串列移除節點的操作
    while (t != null) {
        Node next = t.nextWaiter;
        if (t.waitStatus != Node.CONDITION) {
            t.nextWaiter = null;
            if (trail == null)
                firstWaiter = next;
            else
                trail.nextWaiter = next;
            if (next == null)
                lastWaiter = trail;
        }
        else
            trail = t;
        t = next;
    }
}

fullyRelease(node) 對應的原始碼如下:

// AbstractQueuedSynchronizer.ConditionObject
/*
	該方法的主要目的是完全釋放當前節點中執行緒持有的鎖
	之所以是完全釋放,這是因為鎖是可重入的
*/
final int fullyRelease(Node node) {
    try {
        /*
        	由於顯式鎖是可重入的,因此在呼叫 await() 時也必須再恢復到原來的狀態
        	回憶一下 Node 節點中 state 屬性代表的意義,如果 state > 0 表示當前持有的鎖的數量
        	獲取這個鎖的數量,使得在進入阻塞佇列中的 Node 能夠再恢復到原來的狀態
        */
        int savedState = getState();
        if (release(savedState)) // 參見上文有關 release 方法的介紹
            return savedState;
        throw new IllegalMonitorStateException();
    } catch (Throwable t) {
        /* 
        	如果在釋放鎖的過程中失敗了,那麼就將這個節點的狀態設定為 CANCELLED,
        	在之後的處理中會移除這個節點
        */
        node.waitStatus = Node.CANCELLED;
        throw t;
    }
}

isOnSyncQueue(node) 對應的原始碼:

// AbstractQueuedSynchronizer.ConditionObject

/*
	判斷當前的節點是否是從條件佇列中轉移到了阻塞佇列,並且正在等待被喚醒
*/
final boolean isOnSyncQueue(Node node) {
    /*
    	從條件佇列中移動到阻塞佇列中時,node 的 waitStatus 將會被設定為 0
    	如果 node 的 waitStatus 依舊為 Node.CONDITION,那麼則說明它還在條件佇列中
    	如果 node 的前驅節點為 null,那麼也一定還在等待佇列中(阻塞佇列中每個節點都會有前驅節點)
    */
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        return false;

    // 如果 node 都已經存在後繼節點了,那麼肯定在阻塞佇列中了
    if (node.next != null) // If has successor, it must be on queue
        return true;

    /*
    * node.prev can be non-null, but not yet on queue because
    * the CAS to place it on queue can fail. So we have to
    * traverse from tail to make sure it actually made it.  It
    * will always be near the tail in calls to this method, and
    * unless the CAS failed (which is unlikely), it will be
    * there, so we hardly ever traverse much.
    */

    /*
    	由於 CAS 在將條件佇列中的節點移動到阻塞佇列中時可能會失敗,(具體可以檢視 AQS 的入隊方法)
    	此時當前節點的前驅節點不為 null,為了解決這個問題,
    	需要遍歷阻塞佇列來確保當前的節點確實是已經進入到了阻塞佇列
    */
    return findNodeFromTail(node);
}

// 對應的原始碼。。。。
private boolean findNodeFromTail(Node node) {
    // We check for node first, since it's likely to be at or near tail.
    // tail is known to be non-null, so we could re-order to "save"
    // one null check, but we leave it this way to help the VM.
    
    /*
    	從尾結點開始遍歷搜尋節點,檢查是否在阻塞佇列中
    */
    for (Node p = tail;;) {
        if (p == node)
            return true;
        if (p == null)
            return false;
        p = p.prev;
    }
}

signal 方法

signal 方法用於喚醒正在等待的執行緒,在當前的環境下,signal 的主要目的是喚醒在條件佇列中執行緒節點,將它們移動到阻塞佇列中

AQS 中對於 signal() 方法的實現如下:

// AbstractQueuedSynchronizer.ConditionObject
/*
	移動等待了最久的執行緒,將它從條件佇列移動到阻塞佇列
*/
public final void signal() {
    // 呼叫 signal 的執行緒必須持有當前的獨佔鎖
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    // 一般第一個節點就被視作 “等待最久” 的執行緒
    Node first = firstWaiter;
    // 真正喚醒執行緒
    if (first != null)
        doSignal(first);
}

/*
	從前往後查詢第一個符合條件的節點(有的執行緒可能已經被取消或者被中斷了)
*/
private void doSignal(Node first) {
    do {
        // 移除第一個節點
        /* 
        	如果移除第一個節點之後條件佇列中不再有節點了,那麼需要將 lastWaiter 
        	節點也置為 null
        */
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;

        // 移除該節點和佇列之間的連線關係
        first.nextWaiter = null;
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null); // 遍歷佇列,直到找到第一個滿足條件的節點
}

// AbstractQueuedSynchronizer

/*
	將條件佇列中的節點移動到阻塞佇列
	返回 true 表示轉移成功,false 則表示這個節點在呼叫 signal 之前就被取消了
*/
final boolean transferForSignal(Node node) {
    /*
    * If cannot change waitStatus, the node has been cancelled.
    */
    /* 
    	CAS 修改當前節點的 waitStatus如果失敗,說明該節點所在的執行緒已經被取消了
    */
    if (!node.compareAndSetWaitStatus(Node.CONDITION, 0))
        return false;

    /*
    * Splice onto queue and try to set waitStatus of predecessor to
    * indicate that thread is (probably) waiting. If cancelled or
    * attempt to set waitStatus fails, wake up to resync (in which
    * case the waitStatus can be transiently and harmlessly wrong).
    */
    /*
    	這裡的的 p 是 node 在進入阻塞佇列之後的前驅節點
    */
    Node p = enq(node); // 以自旋的方式進入阻塞佇列的隊尾
    int ws = p.waitStatus;
    /*
    	ws > 0 表示 node 在阻塞佇列中的前驅節點取消了等待,直接喚醒 node 對應的執行緒
    	ws <= 0,那麼在進入阻塞佇列的時候需要將 node 的前驅節點設定為 SIGNAL,表示前驅節點會喚醒後繼節點
    */
    if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}

在喚醒執行緒之後,再檢視 await() 方法中的邏輯:

public final void await() throws InterruptedException {
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this); // 當前執行緒被掛起
        // 掛起後的後置處理
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    // ……………………………………………………
}

interruptMode 可選的值如下:

  • REINTERRUPT:在 await 方法返回的時候,需要重新設定中斷狀態
  • THROW_IE:代表 await 方法返回的時候,需要丟擲 InterruptedException 異常
  • 0:表示在 await 方法呼叫期間,該執行緒沒有被中斷

執行緒被喚醒之後的第一步操作是呼叫 checkInterruptWhileWaiting(node) 檢查當前的執行緒是否被中斷了,對應的原始碼如下:

// AbstractQueuedSynchronizer.ConditionObject
// 返回對應 interruptMode 中的三個值
private int checkInterruptWhileWaiting(Node node) {
    return Thread.interrupted() ?
        (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
    0;
}

// AbstractQueuedSynchronizer
/*
	只有執行緒被中斷的情況下,才會呼叫此方法
	如果需要的話,將這個已經取消等待的節點轉移到阻塞佇列
	返回 true :如果此執行緒在 signal 呼叫之前被取消
*/
final boolean transferAfterCancelledWait(Node node) {
    /*
    	CAS 將節點狀態設定為 0
    	如果這一步 CAS 成功,則說明是呼叫 signal 方法之前就已經發生了中斷,
    	因為 signal 方法會將條件佇列的首個節點的 waitStatus 置為 0 再移動到阻塞佇列
    	如果不為 0 則說明要麼被取消了,要麼還沒有呼叫 signal 進行處理
    */
    if (node.compareAndSetWaitStatus(Node.CONDITION, 0)) {
        enq(node); // 可以看到,即使被中斷了,依舊會將這個節點放入到阻塞佇列
        return true;
    }
    
    /*
    * If we lost out to a signal(), then we can't proceed
    * until it finishes its enq().  Cancelling during an
    * incomplete transfer is both rare and transient, so just
    * spin.
    */
    /*
    	如果會走到這,那麼一定是 CAS 設定 node 的 waitStatus 失敗了,
    	即是在呼叫 signal 之後發生的中斷
    	
    	signal 會將節點移動從條件佇列移動到阻塞佇列,但是可能由於某些原因還沒有移動完成,
    	因此在這裡通過自旋的方式等待其完成
    */
    while (!isOnSyncQueue(node))
        Thread.yield();
    return false;
}

可以看到,即使發生了中斷,依舊會完成將 node 從條件佇列轉移到阻塞佇列


喚醒執行緒後繼續向下走,對應的原始碼如下:

public final void await() throws InterruptedException {
    // 省略部分程式碼
    
    /*
    	當 acquireQueued 方法返回 true 時,說明執行緒已經被中斷了
    	如果此時 interruptMode 為 THROW_IE 的話,說明在呼叫 signal 方法之前就已經被中斷了
    	在這種情況下,將 interruptMode 置為 REINTERRUPT,以便之後重新中斷
    */
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
}

繼續向下執行,對應的原始碼:

/* 
	在呼叫 signal 時會斷開當前節點和後繼節點之間的連線,
	如果此時後繼節點不為 null,說明是被中斷的,同樣需要斷開這個節點在條件佇列中的連線
*/
if (node.nextWaiter != null) // clean up if cancelled
    unlinkCancelledWaiters();
// 處理中斷
if (interruptMode != 0)
    reportInterruptAfterWait(interruptMode);

reportInterruptAfterWait(interruptMode) 對應的原始碼:

// AbstractQueuedSynchronizer.ConditionObject
// 處理中斷
private void reportInterruptAfterWait(int interruptMode)
    throws InterruptedException {
    // 根據 interruptMode 對中斷進行不同的處理
    if (interruptMode == THROW_IE)
        throw new InterruptedException();
    else if (interruptMode == REINTERRUPT)
        selfInterrupt();
}

處理中斷

acquireQueued 方法的執行過程中,對於中斷的處理程式碼如下:

if (shouldParkAfterFailedAcquire(p, node))
    interrupted |= parkAndCheckInterrupt();

// 重點在於 parkAndCheckInterrupt 方法
private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted(); // 該方法會清除中斷標記
}

acquireQueued 中,只是單純地使用一個變數 interrupted 來標記是否被中斷過,也就是說,在 acquireQueued 中,並不會處理中斷,即使當前的執行緒節點被中斷了,它依舊會嘗試去獲取鎖

具體對於中斷的處理由具體的實現來定義,可以忽略這個中斷,也可以丟擲一個異常

ReentrantLock 對於 lockInterruptibly() 的實現為例,具體的實現程式碼如下:

// ReentrantLock
public void lockInterruptibly() throws InterruptedException {
    sync.acquireInterruptibly(1); // 該方法為 AQS 中定義的方法
}

AQS 中對於 acquireInterruptibly 方法的定義如下:

// AbstractQueuedSynchronizer
public final void acquireInterruptibly(int arg)
    throws InterruptedException {
    /*
    	在 parkAndCheckInterrupt() 方法中通過 Thread.interrupted() 
    	方法清除了執行緒的中斷標記,因此不會走這
    */
    if (Thread.interrupted())
        throw new InterruptedException();
    // 繼續往下走
    if (!tryAcquire(arg))
        doAcquireInterruptibly(arg);
}

// doAcquireInterruptibly 方法的定義如下
private void doAcquireInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.EXCLUSIVE);
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                return;
            }
            /*
            	關鍵在這,與不丟擲 InterruptedException 的相比,最大的區別就在於對於中斷的處理,
            	上文的 acquireQueued 則只是將中斷標記返回給呼叫者而不是顯式地丟擲一個異常
            */
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } catch (Throwable t) {
        cancelAcquire(node); // 取消該節點去獲取鎖的行為
        throw t; // 傳遞捕獲到的異常
    }
}

cancelAcquire(node) 對應的原始碼如下:

// AbstractQueuedSynchronizer
private void cancelAcquire(Node node) {
    // Ignore if node doesn't exist
    if (node == null)
        return;

    node.thread = null;

    // Skip cancelled predecessors
    /*
    	找到符合條件的前驅節點,將不符合條件的前驅節點都清除
    */
    Node pred = node.prev;
    while (pred.waitStatus > 0)
        node.prev = pred = pred.prev;

    // predNext is the apparent node to unsplice. CASes below will
    // fail if not, in which case, we lost race vs another cancel
    // or signal, so no further action is necessary, although with
    // a possibility that a cancelled node may transiently remain
    // reachable.
    Node predNext = pred.next;

    // Can use unconditional write instead of CAS here.
    // After this atomic step, other Nodes can skip past us.
    // Before, we are free of interference from other threads.
    node.waitStatus = Node.CANCELLED;
    
    
    /*
    	一般的連結串列清除節點工作
    */
    // If we are the tail, remove ourselves.
    if (node == tail && compareAndSetTail(node, pred)) {
        pred.compareAndSetNext(predNext, null);
    } else {
        // If successor needs signal, try to set pred's next-link
        // so it will get one. Otherwise wake it up to propagate.
        int ws;
        if (pred != head &&
            ((ws = pred.waitStatus) == Node.SIGNAL ||
             (ws <= 0 && pred.compareAndSetWaitStatus(ws, Node.SIGNAL))) &&
            pred.thread != null) {
            Node next = node.next;
            if (next != null && next.waitStatus <= 0)
                pred.compareAndSetNext(predNext, next);
        } else {
            unparkSuccessor(node);
        }

        node.next = node; // help GC
    }
}


參考:

[1] 《Java 併發程式設計實戰》

[2] https://javadoop.com/post/AbstractQueuedSynchronizer

[3] https://javadoop.com/post/AbstractQueuedSynchronizer-2