1. 程式人生 > >透過 ReentrantLock 分析 AQS 的實現原理

透過 ReentrantLock 分析 AQS 的實現原理

對於 Java 開發者來說,都會碰到多執行緒訪問公共資源的情況,這時候,往往都是通過加鎖來保證訪問資源結果的正確性。在 java 中通常採用下面兩種方式來解決加鎖得問題:

  1. synchronized 關鍵字;

  2. Java.util.concurrent.locks 包中的 locks 包下面的鎖(Lock 介面和 ReentrantLock 等實現類);

synchronized 是 java 底層支援的,而 concurrent 包則是 jdk 實現。關於 synchronized 的原理可以閱讀 再有人問你synchronized是什麼,就把這篇文章發給他。

Lock 介面

Lock 是一個介面,方法定義如下

// 如果鎖可用就獲得鎖,如果鎖不可用就阻塞直到鎖釋放
void lock()

// 和 lock()方法相似, 但阻塞的執行緒可中斷,丟擲 java.lang.InterruptedException異常
void lockInterruptibly() 

// 非阻塞獲取鎖;嘗試獲取鎖,如果成功返回true
boolean tryLock()

// 帶有超時時間的獲取鎖方法
boolean tryLock(long timeout, TimeUnit timeUnit) 

// 釋放鎖
void unlock() 

Lock 的實現

實現 Lock 介面的類有很多,以下為幾個常見的鎖實現

  • ReentrantLock:表示重入鎖,它是唯一一個實現了 Lock 介面的類。重入鎖指的是執行緒在獲得鎖之後,再次獲取該鎖不需要阻塞,而是直接關聯一次計數器增加重入次數

  • ReentrantReadWriteLock:重入讀寫鎖,它實現了 ReadWriteLock 介面,在這個類中維護了兩個鎖,一個是 ReadLock,一個是 WriteLock,他們都分別實現了 Lock 介面。讀寫鎖是一種適合讀多寫少的場景下解決執行緒安全問題的工具,基本原則是:讀和讀不互斥、讀和寫互斥、寫和寫互斥。也就是說涉及到影響資料變化的操作都會存在互斥。

  • StampedLock: stampedLock 是 JDK8 引入的新的鎖機制,可以簡單認為是讀寫鎖的一個改進版本,讀寫鎖雖然通過分離讀和寫的功能使得讀和讀之間可以完全併發,但是讀和寫是有衝突的,如果大量的讀執行緒存在,可能會引起寫執行緒的飢餓。stampedLock 是一種樂觀的讀策略,使得樂觀鎖完全不會阻塞寫執行緒

AQS (AbstractQueuedSynchronizer) 

AQS 的全稱為(AbstractQueuedSynchronizer),這個類也是在 java.util.concurrent.locks 下面。這是一個抽象類,採用設計模式中的模板模式來設計的,內部提供了一系列公共的方法,主要是通過繼承的方式來使用,它本身沒有實現任何的同步介面,僅僅是定義了同步狀態的獲取以及釋放的方法來提供自定義的同步元件。

可以這麼說,只要搞懂了AQS,那麼 J.U.C 中絕大部分的 API 都能輕鬆掌握。

下圖是 AQS 的子類:

可以看到,AQS 還是有很多子類的。下面將詳細講解下 AQS。

AQS 原理概述

AQS 解決了多執行緒訪問共享資源安全性的問題。其原理圖可以表示如下:

AQS 利用了一個 volatile 型別的 int 變數 state 來表示同步狀態,當其他執行緒訪問帶有鎖的共享資源的時候,會被阻塞,然後會被放入 FIFO 的 CLH (Craig, Landin, and Hagersten)

佇列中,等待在此被喚醒。當獲取鎖的執行緒釋放鎖以後,會從佇列中喚醒一個阻塞的節點(執行緒)。由此確保了每個執行緒有序訪問共享資源,避免出現數據不一致的情況。

AQS 框架圖

下面通過一張架構圖來整體瞭解一下 AQS 框架:

  • 上圖中有顏色的為 Method,無顏色的為 Attribution。

  • 總的來說,AQS 框架共分為五層,自上而下由淺入深,從 AQS 對外暴露的 API 到底層基礎資料。

  • 當有自定義同步器接入時,只需重寫第一層所需要的部分方法即可,不需要關注底層具體的實現流程。當自定義同步器進行加鎖或者解鎖操作時,先經過第一層的API進入 AQS 內部方法,然後經過第二層進行鎖的獲取,接著對於獲取鎖失敗的流程,進入第三層和第四層的等待佇列處理,而這些處理方式均依賴於第五層的基礎資料提供層。

CLH 佇列

前面提到了 AQS 使用內建的 FIFO 佇列來完成獲取資源執行緒的排隊工作。既然是佇列,就是由很多節點(Node)組成的,下面來看下 Node 的資料構成。

// java.util.concurrent.locks.AbstractQueuedSynchronizer 
static final class Node { /** 共享模式 */ static final Node SHARED = new Node(); /** 獨佔模式 */ static final Node EXCLUSIVE = null; /** 取消等待,比如執行緒等待超時或者被中斷 */ static final int CANCELLED = 1; /** 執行緒需要 unpark 操作來喚醒 */ static final int SIGNAL = -1; /** 執行緒處於 condition 等待 */ static final int CONDITION = -2; /** 共享模式下使用,表示下一次共享模式獲取同步狀態時會被無條件傳播下去 */ static final int PROPAGATE = -3;      // 當前執行緒在佇列中的等待狀態 volatile int waitStatus; // 前驅節點 volatile Node prev; // 後繼節點 volatile Node next; /** 獲取同步狀態的執行緒 */ volatile Thread thread; // 指向下一個處於 CONDITION 的節點 Node nextWaiter; // 如果是共享模式返回true final boolean isShared() { return nextWaiter == SHARED; } /** 返回前驅節點,沒有就丟擲NPE */ final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } /** Establishes initial head or SHARED marker. */ Node() {} /** 將執行緒構成一個 node 新增到佇列中,通過呼叫 addWaiter 使用. */ Node(Node nextWaiter) { this.nextWaiter = nextWaiter; U.putObject(this, THREAD, Thread.currentThread()); } /** 在 condition 佇列使用,通過呼叫 addConditionWaiter 使用. */ Node(int waitStatus) {        // 通過 unsafe 類以及對應的 Node 屬性在記憶體中的偏移量來修改對應例項的屬性值。 U.putInt(this, WAITSTATUS, waitStatus); U.putObject(this, THREAD, Thread.currentThread()); } /** CASes waitStatus field. */ final boolean compareAndSetWaitStatus(int expect, int update) { return U.compareAndSwapInt(this, WAITSTATUS, expect, update); } /** CASes next field. */ final boolean compareAndSetNext(Node expect, Node update) { return U.compareAndSwapObject(this, NEXT, expect, update); } private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe(); private static final long NEXT; static final long PREV; private static final long THREAD; private static final long WAITSTATUS; static { try {          //獲取 Node 的屬性 next 在記憶體中的偏移量,下面同理 NEXT = U.objectFieldOffset (Node.class.getDeclaredField("next")); PREV = U.objectFieldOffset (Node.class.getDeclaredField("prev")); THREAD = U.objectFieldOffset (Node.class.getDeclaredField("thread")); WAITSTATUS = U.objectFieldOffset (Node.class.getDeclaredField("waitStatus")); } catch (ReflectiveOperationException e) { throw new Error(e); } } }

對於 Node 類,可以發現其內部操作都是通過 Unsafe 類來保證是原子性操作。同時內部部分變數都是採用 volatile 來修飾,確保該變數對其他執行緒也是可見的。此外,還可以得出存在兩種不同模式,一種是獨佔模式,一種是共享模式。

再看看 AQS 中兩個跟 Node 類相關的屬性:

  // java.util.concurrent.locks.AbstractQueuedSynchronizer 
// 頭結點 private transient volatile Node head; // 尾節點 private transient volatile Node tail;

整個結構如下圖所示:

入隊操作

如上圖瞭解了同步佇列的結構, 我們在分析其入列操作在簡單不過。無非就是將 tail(使用 CAS 保證原子操作)指向新節點,新節點的 prev 指向佇列中最後一節點(舊的 tail 節點),原佇列中最後一節點的 next 節點指向新節點以此來建立聯絡,來張圖幫助大家理解。

  

出隊操作

同步佇列(CLH)遵循 FIFO,首節點是獲取同步狀態的節點,首節點的執行緒釋放同步狀態後,將會喚醒它的後繼節點(next),而後繼節點將會在獲取同步狀態成功時將自己設定為首節點,這個過程非常簡單。如下圖

 

 

設定首節點是通過獲取同步狀態成功的執行緒來完成的(獲取同步狀態是通過 CAS 來完成),只能有一個執行緒能夠獲取到同步狀態,因此設定頭節點的操作並不需要 CAS 來保證,只需要將首節點設定為其原首節點的後繼節點並斷開原首節點的 next(等待 GC 回收)應用即可。

同步狀態 state

在瞭解資料結構後,接下來了解一下 AQS 的同步狀態 —— State。AQS 中維護了一個名為 state 的欄位,意為同步狀態,是由 Volatile 修飾的,用於展示當前臨界資源的獲鎖情況。

// java.util.concurrent.locks.AbstractQueuedSynchronizer
private volatile int state;
  • 當state=0時,表示無鎖狀態

  • 當state>0時,表示已經有執行緒獲得了鎖,也就是 state=1,但是因為 ReentrantLock 允許重入,所以同一個執行緒多次獲得同步鎖的時候,state 會遞增,比如重入5次,那麼state=5。 而在釋放鎖的時候,同樣需要釋放 5 次直到 state=0 其他執行緒才有資格獲得鎖

下面提供了幾個訪問這個欄位的方法:

// java.util.concurrent.locks.AbstractQueuedSynchronizer 
// 獲取State的值 protected final int getState() // 設定State的值 protected final void setState(int newState) // 使用CAS方式更新State protected final boolean compareAndSetState(int expect, int update)

這幾個方法都是 Final 修飾的,說明子類中無法重寫它們。我們可以通過修改 State 欄位表示的同步狀態來實現多執行緒的獨佔模式和共享模式(加鎖過程)。

                  

對於我們自定義的同步工具,需要自定義獲取同步狀態和釋放狀態的方式,也就是 AQS 架構圖中的第一層:API 層。

需要注意的是:不同的 AQS 實現,state 所表達的含義是不一樣的。

清楚了 AQS 的基本架構以後,我們來分析一下 AQS 的實現原理,仍然以 ReentrantLock 為模型。

 ReentrantLock 實現原理分析

特性概覽

ReentrantLock 意思為可重入鎖,指的是一個執行緒能夠對一個臨界資源重複加鎖。為了幫助大家更好地理解 ReentrantLock 的特性,我們先將 ReentrantLock 跟常用的 Synchronized 進行比較,其特性如下(藍色部分為本篇文章主要剖析的點):

下面通過虛擬碼,進行更加直觀的比較:

// **************************Synchronized的使用方式**************************
// 1.用於程式碼塊
synchronized (this) {}
// 2.用於物件
synchronized (object) {}
// 3.用於方法
public synchronized void test () {}
// 4.可重入
for (int i = 0; i < 100; i++) {
    synchronized (this) {}
}
// **************************ReentrantLock的使用方式**************************
public void test () throw Exception {
    // 1.初始化選擇公平鎖、非公平鎖
    ReentrantLock lock = new ReentrantLock(true);
    // 2.可用於程式碼塊
    lock.lock();
    try {
        try {
            // 3.支援多種加鎖方式,比較靈活; 具有可重入特性
            if(lock.tryLock(100, TimeUnit.MILLISECONDS)){ }
        } finally {
            // 4.手動釋放鎖
            lock.unlock()
        }
    } finally {
        lock.unlock();
    }
}

ReentrantLock 的時序圖

呼叫 ReentrantLock 中的 lock() 方法,原始碼的呼叫過程採用時序圖來展現:

從圖上可以看出來,當鎖獲取失敗時,會呼叫 addWaiter() 方法將當前執行緒封裝成 Node 節點加入到 AQS 佇列,基於這個思路,我們來分析 AQS 的原始碼實現。ReentrantLock 與 AQS 之間的關係

首先來看看 ReentrantLock 的構造方法,它的構造方法有兩個,如下所示:

    // 預設是非公平鎖    
    public ReentrantLock() {
        sync = new NonfairSync();
    }

   // true 是公平鎖
    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }

可以發現,建構函式中引用了兩個內部類,分別是 FairSync (公平鎖) 和 NonfairSync (非公平鎖)。並且都是 Sync 的子類。

// 非公平鎖
static final class NonfairSync extends Sync {

}

// 公平鎖
static final class FairSync extends Sync {

} 

從這裡也可以發現 Sync 類的重要性,而前面的截圖也說明了 Sync 又是 AbstractQueuedSynchronizer 的子類,到這裡,他們之間的關係就浮出水面了:

對於 FairSync 與 NonfairSync :

  • 公平鎖 表示所有執行緒嚴格按照 FIFO 來獲取鎖

  • 非公平鎖 表示可以存在搶佔鎖的功能,也就是說不管當前佇列上是否存在其他執行緒等待,新執行緒都有機會搶佔鎖

公平鎖和非公平鎖的實現上的差異,會在文章後面做一個解釋,接下來的分析仍然以非公平鎖作為主要分析邏輯。


 

Lock 方法

對於 ReentrantLock 預設是 NonfairSync,我們以這個為例瞭解其背後的原理。

    
  // java.util.concurrent.locks.ReentrantLock#NonfairSync
  static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L; /** * Performs lock. Try immediate barge, backing up to normal * acquire on failure. */ // Android-removed: @ReservedStackAccess from OpenJDK 9, not available on Android. // @ReservedStackAccess final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } }

看 lock 方法程式碼的含義:

  • 若通過 CAS 設定變數 State(同步狀態)成功,也就是獲取鎖成功,則將當前執行緒設定為獨佔執行緒。

  • 若通過 CAS 設定變數 State(同步狀態)失敗,也就是獲取鎖失敗,則進入 Acquire 方法進行後續處理。

compareAndSetState 的程式碼實現邏輯如下

  protected final boolean compareAndSetState(int expect, int update) {
        return U.compareAndSwapInt(this, STATE, expect, update);
    }

這段程式碼其實邏輯很簡單,就是通過 CAS 樂觀鎖的方式來做比較並替換。上面這段程式碼的意思是,如果當前記憶體中的 state 的值和預期值 expect 相等,則替換為 update。更新成功返回 true,否則返回 false。這個操作是原子的,不會出現執行緒安全問題。

lock 方法的第一步很好理解,但第二步獲取鎖失敗後,後續的處理策略是怎麼樣的呢?這塊可能會有以下思考:

  • 某個執行緒獲取鎖失敗的後續流程是什麼呢?有以下兩種可能:
  1. 將當前執行緒獲鎖結果設定為失敗,獲取鎖流程結束。這種設計會極大降低系統的併發度,並不滿足我們實際的需求。所以就是 2 這種流程,也就是 AQS 框架的處理流程。

  2. 存在某種排隊等候機制,執行緒繼續等待,仍然保留獲取鎖的可能,獲取鎖流程仍在繼續。

  • 對於問題 1 的第二種情況,既然說到了排隊等候機制,那麼就一定會有某種佇列形成,這樣的佇列是什麼資料結構呢?

  • 處於排隊等候機制中的執行緒,什麼時候可以有機會獲取鎖呢?

  • 如果處於排隊等候機制中的執行緒一直無法獲取鎖,還是需要一直等待嗎,還是有別的策略來解決這一問題?

可以看一下 else 分支的邏輯,acquire 方法:

    // java.util.concurrent.locks.AbstractQueuedSynchronizer 
   public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }

Acquire 方法是 AQS 中的核心方法。這裡它幹了三件事情:

  • tryAcquire:會嘗試再次通過 CAS 獲取一次鎖。

  • addWaiter:將當前執行緒加入上面鎖的雙向連結串列(等待佇列)中

  • acquireQueued:通過自旋,判斷當前佇列節點是否可以獲取鎖


 

tryAcquire 方法

下面詳細看下 NonfairSync 的 tryAcquire 方法,該方法會直接呼叫 nonfairTryAcquire 方法,程式碼如下:

     // java.util.concurrent.locks.ReentrantLock 
     final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState();
       // c=0 說明此時沒有獲取沒有執行緒佔有鎖 if (c == 0) {
          // CAS 操作去獲取鎖 if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } }
       // 已經獲取鎖了,可以繼續重入 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }

簡單來說上面的方法主要就是看能不能獲取到鎖,不能獲取到就返回 false,然後就會呼叫 addWaiter 新增到等待佇列中,具體程式碼如下:

    // java.util.concurrent.locks.AbstractQueuedSynchronizer 
  private Node addWaiter(Node mode) { Node node = new Node(mode); // 死迴圈 for (;;) { Node oldTail = tail; if (oldTail != null) { // 通過unsafe 類來對 Node.prev 節點賦值 U.putObject(node, Node.PREV, oldTail); // 更新 tail 節點為 node,該操作對其他執行緒是可見的,確保每次只有一個執行緒可以更新成功 if (compareAndSetTail(oldTail, node)) { oldTail.next = node; return node; } } else { initializeSyncQueue(); } } } // cas 設定 tail 節點 private final boolean compareAndSetTail(Node expect, Node update) { return U.compareAndSwapObject(this, TAIL, expect, update); } // 初始化 head 和 tail 節點 private final void initializeSyncQueue() { Node h; if (U.compareAndSwapObject(this, HEAD, null, (h = new Node()))) tail = h; }

addWaiter(Node node) 方法通過採用死迴圈方案,確保將該節點設定尾成尾節點。

  • 如果為尾節點不為空,需要將新節點新增到 oldTail 的 next 節點,同時將新節點的 prev 節點指向 oldTail;

  • 如果當前佇列為空,需要進行初始化,此時 head 結點和 tail 節點都是 h =  new Node () 例項;此時 oldTail = h 不為空,node 的 prev 為 oldTail, oldTail 的 next 是 node。

這裡程式碼很簡單,但是卻通過 CAS 操作保證了多個執行緒一起新增節點的時候,只有一個執行緒可以成功。

此外,入隊操作還有個 enq 方法,這個方法和 addWaiter 一樣的,就是返回值不一樣,具體如下:

   // java.util.concurrent.locks.AbstractQueuedSynchronizer  
  private Node enq(Node node) { for (;;) { Node oldTail = tail; if (oldTail != null) { U.putObject(node, Node.PREV, oldTail); if (compareAndSetTail(oldTail, node)) { oldTail.next = node; return oldTail; } } else { initializeSyncQueue(); } } }

但請注意,初始化的頭結點並不是當前執行緒節點,而是呼叫了無參建構函式的節點。如果經歷了初始化或者併發導致佇列中有元素,則與之前的方法相同。

acquireQueued

將新增到佇列中的 Node 作為引數傳入 acquireQueued 方法,這裡面會做搶佔鎖的操作:

    final boolean acquireQueued(final Node node, int arg) {
        try {
            boolean interrupted = false;
            for (;;) {
          // 獲取前一個節點,為空,丟擲 NPE final Node p = node.predecessor();
          // p==head 說明 node 是佇列中的第一位,這時候還會再去獲取一次鎖 if (p == head && tryAcquire(arg)) {
            // 獲取鎖成功後,node 變成 head 節點,凡是 head 節點,其 thread 和 pre 都為空,next 保持不變。 setHead(node); p.next = null; // help GC
            // 注意這個中斷記錄是在獲取鎖之後才會被返回的,也就是說獲取鎖之後,才有資格處理中斷 return interrupted; }
          // 獲取鎖失敗,說明p為頭節點且當前沒有獲取到鎖(可能是非公平鎖被搶佔了)
          // 或者是p不為頭結點,這個時候就要判斷當前node是否要被阻塞(被阻塞條件:前驅節點的waitStatus為-1),防止無限迴圈浪費資源。具體兩個方法下面細細分析 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
            // 說明在這個過程中發生過中斷,需要補上 interrupted = true; } } catch (Throwable t) { cancelAcquire(node); throw t; } }

總的來說,一個執行緒獲取鎖失敗了,被放入等待佇列,acquireQueued 會把放入佇列中的執行緒不斷去獲取鎖,直到獲取成功或者不再需要獲取(中斷)。

下面來看獲取失敗後的處理,具體在看下面的程式碼: 

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
     // 獲取頭結點的節點狀態 int ws = pred.waitStatus;
     // 當前 prev node 的執行緒需要被 unpark 喚醒,也就是當前 node 可以接受 park 操作 if (ws == Node.SIGNAL)        // This node has already set status asking a release to signal it, so it can safely park. return true;
     // 前節點處於取消狀態,跳過,獲取再前一個的節點狀態 if (ws > 0) { do {
         // 這裡將取消狀態的節點刪除 node.prev = pred = pred.prev; } while (pred.waitStatus > 0);
       // 同時設定下一個節點為 node pred.next = node; } else {
       // 設定前任節點等待狀態為 SIGNAL pred.compareAndSetWaitStatus(ws, Node.SIGNAL); } return false; } private final boolean parkAndCheckInterrupt() { LockSupport.park(this); // 如果之前中斷了,為 true,並清除中斷標誌 return Thread.interrupted(); }

如果 shouldParkAfterFailedAcquire 返回了true,則會執行:parkAndCheckInterrupt()方法,它是通過 LockSupport.park(this) 將當前執行緒掛起到 WATING 狀態,它需要等待一箇中斷、unpark 方法來喚醒它,通過這樣一種 FIFO 的機制的等待,來實現了 Lock 的操作。

LockSupport 類是 Java6 引入的一個類,提供了基本的執行緒同步原語。LockSupport 實際上是呼叫了 Unsafe 類裡的函式,歸結到 Unsafe 裡,只有兩個函式:

public native void unpark(Thread jthread);  
public native void park(boolean isAbsolute, long time);  

unpark 函式為執行緒提供“許可( permit )”,執行緒呼叫 park 函式則等待“許可”。這個有點像訊號量,但是這個“許可”是不能疊加的,“許可”是一次性的。
permit相當於0/1的開關,預設是 0,呼叫一次 unpark 就加 1 變成了 1。呼叫一次 park 會消費 permit,又會變成 0,變成 0 不會影響原有執行緒的執行。 如果再呼叫一次 park 會阻塞,因為 permit 已經是 0 了。直到 permit 變成 1。這時呼叫 unpark 會把 permit 設定為 1 。每個執行緒都有一個相關的 permit,permit 最多隻有一個,重複呼叫 unpark 不會累積。

這裡需要說明的一點就是:acquireQueued 方法內部是一個死迴圈, shouldParkAfterFailedAcquire 和  parkAndCheckInterrupt 也都在這裡面。這裡對這個邏輯再整理下:

  1.  acquireQueued 本意是通過無限迴圈讓佇列中的第一個節點嘗試去獲取鎖;當一個 node 被加入到佇列中的時候,就會促發這個無限迴圈;

  2. 如果等待佇列中的第一個節點獲取到鎖了,就會退出迴圈;

  3. 如果 node 是第一個加入等待佇列的,此時 node 的 prev 節點是 head ( new Node() ),node 會先去獲取鎖,失敗後,因為 prev 的 waitStatus = 0,這時候將其 waitStatus 設定為 -1,然後再次迴圈,再獲取鎖失敗就會呼叫 parkAndCheckInterrupt 阻塞當前執行緒;

  4. shouldParkAfterFailedAcquire 過程中會將佇列中處於 CANCELLED = 1 的節點刪除。也就是說每新增一個節點,獲取鎖失敗後,都可能會對佇列做一遍整理;

  5. 被加入佇列後的執行緒是不會響應中斷的。當node 獲取鎖之後,如果執行緒在等待中被中斷過,需要將這個中斷補上,這樣執行緒就可以響應中斷操作,比如此時被取消了。

cancelAcquire 方法

如果在獲取鎖的過程中,發生了錯誤,就會響應  cancelAcquire(node) 方法。下面具體看下方法的原始碼,看看它做了啥:

    private void cancelAcquire(Node node) {
        // Ignore if node doesn't exist
        if (node == null)
            return;

        node.thread = null;

        // 過濾掉那些被取消的節點
        Node pred = node.prev;
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;

        // 獲取過濾後的前驅節點的後繼節點
        Node predNext = pred.next;

     // 把當前node的狀態設定為CANCELLED 
        node.waitStatus = Node.CANCELLED;
      // 如果當前節點是尾節點,將從後往前的第一個非取消狀態的節點設定為尾節點 
   // 如果更新成功,將tail的後繼節點設定為null,更新失敗,說明 node 後面還有其他節點,node 不是尾接點 if (node == tail && compareAndSetTail(node, pred)) { pred.compareAndSetNext(predNext, null); } else {        // 如果當前節點不是head的後繼節點,1:判斷當前節點前驅節點的是否為SIGNAL,2:如果不是,則把前驅節點設定為SINGAL看是否成功
       // 如果1和2中有一個為true,再判斷當前節點的執行緒是否為null
// 如果上述條件都滿足,把當前節點的前驅節點的後繼指標指向當前節點的後繼節點 int ws; if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && pred.compareAndSetWaitStatus(ws, Node.SIGNAL))) && pred.thread != null) {
Node next = node.next; if (next != null && next.waitStatus <= 0) pred.compareAndSetNext(predNext, next);
          // 走到這裡,已經把 node 從 next 佇列裡面刪除了,但是保留了 prev 指標 } else {
          // 如果當前節點是head的後繼節點,或者上述條件不滿足,那就喚醒當前節點的後繼節點 unparkSuccessor(node); }
       // 這裡修改了 node 的next 指標,但是保證了 prev 指標的不變 node.next = node; // help GC } }

當前的流程:獲取當前節點的前驅節點,如果前驅節點的狀態是 CANCELLED,那就一直往前遍歷,找到第一個 waitStatus <= 0 的節點,將找到的 Pred 節點和當前 Node 關聯,將當前Node 設定為 CANCELLED。

根據當前節點的位置,考慮以下三種情況:

  1. 當前節點是尾節點。

  2. 當前節點是Head的後繼節點。

  3. 當前節點不是Head的後繼節點,也不是尾節點。

根據上述第二條,我們來分析每一種情況的流程。

當前節點是尾節點。

當前節點是 Head 的後繼節點。

當前節點不是 Head 的後繼節點,也不是尾節點。

通過上面的流程,我們對於 CANCELLED 節點狀態的產生和變化已經有了大致的瞭解,但是為什麼所有的變化都是對 Next 指標進行了操作,而沒有對 Prev 指標進行操作呢?什麼情況下會對 Prev 指標進行操作?

執行 cancelAcquire 的時候,當前節點的前置節點可能已經從佇列中出去了(已經執行過 Try 程式碼塊中的 shouldParkAfterFailedAcquire 方法了),如果此時修改 Prev指標,有可能會導致 Prev 指向另一個已經移除佇列的 Node,因此這塊變化 Prev 指標不安全。 shouldParkAfterFailedAcquire 方法中,會執行下面的程式碼,其實就是在處理 Prev 指標。shouldParkAfterFailedAcquire 是獲取鎖失敗的情況下才會執行,進入該方法後,說明共享資源已被獲取,當前節點之前的節點都不會出現變化,因此這個時候變更 Prev 指標比較安全。

do {
    node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);

 

unparkSuccessor

下面看下 unparkSuccessor 的邏輯:

    private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
       // 將傳入的引數node的等待狀態變為 0
            node.compareAndSetWaitStatus(ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
       // 從後往前尋找那些沒有被取消的執行緒
            for (Node p = tail; p != node && p != null; p = p.prev)
                if (p.waitStatus <= 0)
                    s = p;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

 unparkSuccessor 的作用如下:

  • 如果其下一個節點為空,或者其等待狀態是取消狀態,那麼就從後往前找,找到一個等待狀態 <=0 的,然後將其喚醒;

  • 如果下一個節點不為空,且等待狀態 <=0,將其喚醒。

這個方法的找到一個需要喚醒的節點,看下後面怎麼處理:

  // LockSupport 
  public static void unpark(Thread thread) {
        if (thread != null)
            U.unpark(thread);
    }

發現也是通過 unSafe 類來處理的。這裡呼叫了 unpark 方法,那肯定有地方呼叫了 park 方法,這個是在  parkAndCheckInterrupt 裡呼叫的。


 FairSync lock 公平鎖

到這裡,NonfairSync lock 的邏輯就講完了 。那 FairSync lock 是如何保證公平的呢?且看程式碼:

// java.util.concurrent.locks.ReentrantLock    
   static final class FairSync extends Sync { private static final long serialVersionUID = -3000897897090466540L; // 加鎖 final void lock() { acquire(1); }    // protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState();
       // 當前沒有執行緒獲取鎖 if (c == 0) {
          // 當前執行緒處於 head 之後,或者佇列為空,就會去呼叫 CAS 獲取鎖,否則是沒有機會獲取鎖的 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } }
       // 當前執行緒就是獨佔執行緒,可重入 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } }

可見對於公平鎖,新加入的節點有以下幾種操作:

  1. node 能獲取鎖的情況有兩種:1 是當前沒有執行緒持有鎖,並且佇列為空,或者 node 是 head 的下一個節點;2 是 node 本身持有鎖,可重入。

  2. 在情況 1 後的 node,都將會被加入到佇列中去;

這裡就可以看出來,公平鎖完全是按照先來後到的順序進行排列等候的,不會給你機會去通過 CAS 操作獲取鎖的。對於非公平鎖,每個執行緒去獲取鎖的時候都有機會去嘗試獲取鎖的,成功鎖就是你的,不成功就加入到佇列中去。


 

unLock 方法 

講完了 lock 方法以後,接下去講 unLock 方法了。來看下 unlock 的邏輯:

  public void unlock() {
        sync.release(1);
    }

   // 釋放鎖
   public final boolean release(int arg) {
     // true 表示成功釋放,就會喚醒下一個執行緒 if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }   protected final boolean tryRelease(int releases) { int c = getState() - releases;
    // 確保是當前執行緒,非當前執行緒 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); }
    // 更新同步狀態 setState(c); return free; }

unlock 的邏輯比較好理解,就是釋放鎖,更新同步狀態,然後喚醒下一個等待執行緒。

其中 tryRelease 動作可以認為就是一個設定鎖狀態的操作,而且是將狀態減掉傳入的引數值(引數是 1 ),如果結果狀態為 0,就將排它鎖的 Owner 設定為 null,以使得其它的執行緒有機會進行執行。

在排它鎖中,加鎖的時候狀態會增加 1(當然可以自己修改這個值),在解鎖的時候減掉 1,同一個鎖,在可以重入後,可能會被疊加為 2、3、4 這些值,只有 unlock() 的次數與 lock() 的次數對應才會將 Owner 執行緒設定為空,而且也只有這種情況下才會返回 true。

hasQueuedPredecessors 是公平鎖加鎖時判斷等待佇列中是否存在有效節點的方法。如果返回 False,說明當前執行緒可以爭取共享資源;如果返回 True,說明佇列中存在有效節點,當前執行緒必須加入到等待佇列中。

// java.util.concurrent.locks.ReentrantLock

public final boolean hasQueuedPredecessors() {
    // The correctness of this depends on head being initialized
    // before tail and on head.next being accurate if the current
    // thread is first in queue.
    Node t = tail; // Read fields in reverse initialization order
    Node h = head;
    Node s;
    return h != t && ((s = h.next) == null || s.thread != Thread.currentThread());
}

看到這裡,我們理解一下h != t && ((s = h.next) == null || s.thread != Thread.currentThread());為什麼要判斷的頭結點的下一個節點?第一個節點儲存的資料是什麼?

雙向連結串列中,第一個節點為虛節點,其實並不儲存任何資訊,只是佔位,這個可以從列表的第一次初始化也可以看出來。

真正的第一個有資料的節點,是在第二個節點開始的。當h != t時: 如果(s = h.next) == null,等待佇列正在有執行緒進行初始化,但只是進行到了Tail指向Head,沒有將Head指向Tail,此時佇列中有元素,需要返回 True。

  • 如果(s = h.next) != null,說明此時佇列中至少有一個有效節點。

  • 如果此時s.thread == Thread.currentThread(),說明等待佇列的第一個有效節點中的執行緒與當前執行緒相同,那麼當前執行緒是可以獲取資源的;

  • 如果s.thread != Thread.currentThread(),說明等待佇列的第一個有效節點執行緒與當前執行緒不同,當前執行緒必須加入進等待佇列。

對於 unparkSuccessor 邏輯前面講過了,就是喚醒下一個節點去獲取鎖。當然在喚醒過程中,對於非公平鎖,其他執行緒是有機會去搶佔的

到這裡,就把加鎖和解鎖的邏輯都講完了。


 

Lock 和 unLock 總結

以非公平鎖為例,這裡主要闡述一下非公平鎖與 AQS 之間方法的關聯之處,具體每一處核心方法的作用都已經在上文闡述清楚了。

為了幫助大家理解 ReentrantLock 和 AQS 之間方法的互動過程,以非公平鎖為例,將加鎖和解鎖的互動流程單獨拎出來強調一下,以便於對後續內容的理解。

加鎖:

  • 通過 ReentrantLock 的加鎖方法 Lock 進行加鎖操作。

  • 會呼叫到內部類 Sync 的 Lock 方法,由於 Sync#lock 是抽象方法,根據 ReentrantLock 初始化選擇的公平鎖和非公平鎖,執行相關內部類的 Lock 方法,本質上都會執行 AQS 的 Acquire 方法。

  • AQS 的 Acquire 方法會執行 tryAcquire 方法,但是由於 tryAcquire 需要自定義同步器實現,因此執行了 ReentrantLock 中的 tryAcquire 方法,由於 ReentrantLock 是通過公平鎖和非公平鎖內部類實現的 tryAcquire 方法,因此會根據鎖型別不同,執行不同的 tryAcquire。

  • tryAcquire 是獲取鎖邏輯,獲取失敗後,會執行框架 AQS 的後續邏輯,跟 ReentrantLock 自定義同步器無關。

解鎖:

  • 通過 ReentrantLock 的解鎖方法 Unlock 進行解鎖。

  • Unlock 會呼叫內部類 Sync 的 Release 方法,該方法繼承於 AQS。

  • Release 中會呼叫 tryRelease 方法,tryRelease 需要自定義同步器實現,tryRelease 只在 ReentrantLock 中的 Sync 實現,因此可以看出,釋放鎖的過程,並不區分是否為公平鎖。

  • 釋放成功後,所有處理由 AQS 框架完成,與自定義同步器無關。

通過上面的描述,大概可以總結出ReentrantLock加鎖解鎖時API層核心方法的對映關係。

 

到這裡,基本就講完了。


 

關於 Lock 及 AQS 的一些補充:

1、 Lock 的操作不僅僅侷限於 lock()/unlock(),因為這樣執行緒可能進入 WAITING 狀態,這個時候如果沒有 unpark() 就沒法喚醒它,可能會一直“睡”下去,可以嘗試用 tryLock()、tryLock(long , TimeUnit) 來做一些嘗試加鎖或超時來滿足某些特定場景的需要。例如有些時候發現嘗試加鎖無法加上,先釋放已經成功對其它物件新增的鎖,過一小會再來嘗試,這樣在某些場合下可以避免“死鎖”哦。

看下相關程式碼:

    // ReentrantLock
    public boolean tryLock() {
     // 呼叫的是非公平鎖來搶佔鎖 return sync.nonfairTryAcquire(1); } // ReentrantLock public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
     // 超過一定時間後再去獲取鎖 return sync.tryAcquireNanos(1, unit.toNanos(timeout)); }// AQS
   // 拿不到鎖時,等一段時間再拿不到就退出 private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
     // 時間 <=0 直接返回 if (nanosTimeout <= 0L) return false; final long deadline = System.nanoTime() + nanosTimeout;
     // 將當前執行緒加入到佇列中 final Node node = addWaiter(Node.EXCLUSIVE); try { for (;;) { final Node p = node.predecessor();
          // 這裡如果當前執行緒是第一個有效節點,直接嘗試去獲取鎖 if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC return true; } nanosTimeout = deadline - System.nanoTime(); if (nanosTimeout <= 0L) {
            // 時間到了之後,就退出等待佇列 cancelAcquire(node); return false; }
          // 需要等待,並且時長大於 1000L if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
            // 阻塞一定時間,再去獲取鎖 LockSupport.parkNanos(this, nanosTimeout); if (Thread.interrupted()) throw new InterruptedException(); } } catch (Throwable t) { cancelAcquire(node); throw t; } }

2、 lockInterruptibly() 它允許丟擲 InterruptException 異常,也就是當外部發起了中斷操作,程式內部有可能會丟擲這種異常,但是並不是絕對會丟擲異常的。

    // ReentrantLock     
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

    // AQS
    public final void acquireInterruptibly(int arg)
            throws InterruptedException {
     // 如果發生了中斷,就丟擲中斷異常 if (Thread.interrupted()) throw new InterruptedException();
     // if (!tryAcquire(arg)) doAcquireInterruptibly(arg); } // 可中斷的 private void doAcquireInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.EXCLUSIVE); try { for (;;) { final Node p = node.predecessor();
          // 再次看能不能獲取鎖 if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC return; }
          // park 前發現中斷了,丟擲中斷錯誤 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } catch (Throwable t) { cancelAcquire(node); throw t; } }

可以發現,基本上可以中斷的點,都會去判斷執行緒是否有中斷標誌,有的話,直接丟擲中斷異常,但是在加入佇列過程,和獲取鎖的過程是不響應中斷的,只有之前之後會做中斷判斷。

3、 newCondition() 操作,是返回一個 Condition 的物件,Condition 只是一個介面,它要求實現 await()、awaitUninterruptibly()、awaitNanos(long)、await(long , TimeUnit)、awaitUntil(Date)、signal()、signalAll() 方法,AbstractQueuedSynchronizer 中有一個內部類叫做 ConditionObject 實現了這個介面,它也是一個類似於佇列的實現,具體可以參考原始碼。大多數情況下可以直接使用,當然覺得自己比較牛逼的話也可以參考原始碼自己來實現。

4、 在 AQS 的 Node 中有每個 Node 自己的狀態(waitStatus),我們這裡歸納一下,分別包含:

  • SIGNAL 從前面的程式碼狀態轉換可以看得出是前面有執行緒在執行,需要前面執行緒結束後,呼叫 unpark() 方法才能啟用自己,值為:-1

  • CANCELLED 當 AQS 發起取消或 fullyRelease() 時,會是這個狀態。值為 1,也是幾個狀態中唯一一個大於 0 的狀態,所以前面判定狀態大於 0 就基本等價於是 CANCELLED 的意思。

  • CONDITION 執行緒基於 Condition 物件發生了等待,進入了相應的佇列,自然也需要 Condition 物件來啟用,值為 -2。

  • PROPAGATE 讀寫鎖中,當讀鎖最開始沒有獲取到操作許可權,得到後會發起一個 doReleaseShared() 動作,內部也是一個迴圈,當判定後續的節點狀態為 0 時,嘗試通過CAS自旋方式將狀態修改為這個狀態,表示節點可以執行。

  • 狀態 0 初始化狀態,也代表正在嘗試去獲取臨界資源的執行緒所對應的 Node 的狀態。 


 

總結

本文基於 ReentrantLock 非公平鎖的獨佔鎖原始碼來分析了 AQS 的內部實現原理。在獲得同步鎖時,同步器維護一個同步佇列,獲取狀態失敗的執行緒都會被加入到佇列中並在佇列中進行自旋;移出佇列(或停止自旋)的條件是前驅節點為頭節點且成功獲取了同步狀態。在釋放同步狀態時,同步器呼叫 tryRelease(int arg) 方法釋放同步狀態,然後喚醒頭節點的後繼節點。

 

參考文章

從ReentrantLock的實現看AQS的原理及應用

AQS的原理淺析

J.U.C|同步佇列(CLH)

深入分析AQS實現原理

&n