1. 程式人生 > >【併發程式設計】 圖文深入解析Java顯示鎖底層原始碼 —— 加解鎖是如何實現的

【併發程式設計】 圖文深入解析Java顯示鎖底層原始碼 —— 加解鎖是如何實現的

一、瞭解 AbstractQueuedSynchronizer(AQS)

1、AQS 簡介

AbstractQueuedSynchronizer 是大師 Doug Lea 編寫的一個併發程式設計類,位於 java.util.concurrent.locks,是 CountdownLatch、Semaphore、ReentrantLock、ReentrantReadWriteLock、ThreadPoolExecutor 中重要的組成部分,他們中關於 “鎖” 的部分與 AQS 息息相關。

借用一下原始碼中的說法,AbstractQueuedSynchronizer 基於一個 FIFO 佇列 提供了一套阻塞鎖和同步相關的實現。該類被設計成為很多同步容器 synchronizers

的底層實現,它使用了一個原子int private volatile int state; 來表示當前狀態。當在 AQSacquired (獲取資源) 或被 release (釋放資源)時,需要依據這個 state 來進行判斷。所以子類需要定義方法來修改這個狀態,該狀態的含義由我們自由定製。(翻譯的不好...)

2、實現最簡單的 AQS

我們來看一個最簡單的例子,我們有一個類 Sync 繼承了 AbstractQueuedSynchronizer,並重寫了其 tryAcquiretryRelease 方法。實現非常簡單,我們通過呼叫父類的 compareAndSetState()

以及 setState() 來完成,簡單來說(不是特別準確),就是 tryAcquire 返回 true,代表獲取鎖成功,否則就會阻塞。而 tryRelease 則負責鎖的釋放。

在例子中:將 state 設定為 100 代表當前狀態為無鎖,1 則代表已經有某個執行緒獲取了該鎖。當然這個 state 表達的含義是怎麼樣的,完全是我們定義的,實際上鎖定或者無鎖是 100 還是 200 還是 -100,都沒有什麼關係。

/**
 * Created by Anur IjuoKaruKas on 2019/5/7
 */
public class Mutex extends AbstractQueuedSynchronizer {

    public static class Sync extends AbstractQueuedSynchronizer {

        public Sync() {
            setState(100); // set the initial state, being unlocked.
        }

        @Override
        protected boolean tryAcquire(int ignore) {
            boolean result = compareAndSetState(100, 1);
            print("嘗試獲取鎖" + (result ? "成功" : "失敗"));
            return result;
        }

        @Override
        protected boolean tryRelease(int ignore) {
            setState(100);
            return true;
        }
    }

    private final Sync sync = new Sync();

    public void lock() {
        sync.acquire(0);
    }

    public void unLock() {
        sync.release(0);
    }

    public static void main(String[] args) throws InterruptedException {
        Mutex mutex = new Mutex();
        mutex.lock();

        Thread thread = new Thread(() -> {
            print("呼叫 mutex.lock() 之前");
            mutex.lock();
            print("呼叫 mutex.lock() 之後");
        });

        thread.start();

        print("main 執行緒 Sleep 之前");
        Thread.sleep(5000);
        print("main 執行緒 Sleep 之後");
        mutex.unLock();
    }

    public static void print(String print) {
        System.out.println(String.format("時間 - %s\t\t%s\t\t%s", new Date(), Thread.currentThread(), print));
    }
}

========================================= 輸出
時間 - Fri May 24 15:44:19 CST 2019		Thread[main,5,main]		嘗試獲取鎖成功
時間 - Fri May 24 15:44:19 CST 2019		Thread[main,5,main]		main 執行緒 Sleep 之前
時間 - Fri May 24 15:44:19 CST 2019		Thread[Thread-0,5,main]		呼叫 mutex.lock() 之前
時間 - Fri May 24 15:44:19 CST 2019		Thread[Thread-0,5,main]		嘗試獲取鎖失敗
時間 - Fri May 24 15:44:19 CST 2019		Thread[Thread-0,5,main]		嘗試獲取鎖失敗
時間 - Fri May 24 15:44:19 CST 2019		Thread[Thread-0,5,main]		嘗試獲取鎖失敗
時間 - Fri May 24 15:44:24 CST 2019		Thread[main,5,main]		main 執行緒 Sleep 之後
時間 - Fri May 24 15:44:24 CST 2019		Thread[Thread-0,5,main]		嘗試獲取鎖成功
時間 - Fri May 24 15:44:24 CST 2019		Thread[Thread-0,5,main]		呼叫 mutex.lock() 之後

我們可以看到,程式碼符合我們的預期:在 main 函式所線上程呼叫 mutex.unLock(); 釋放鎖之前,子執行緒是一直阻塞的,呼叫 mutex.lock() 之後 的日誌輸出發生在 main 執行緒 Sleep 之後 之後。

通過重寫 tryAcquiretryRelease 方法,以及呼叫 acquirerelease 方法,我們很容易就實現了一個鎖,當然這個鎖有一堆問題... 我們只是通過這個小例子,來建立對 AQS 一個簡單的瞭解。

看到這裡,有些細心的小夥伴可能會想了,既然鎖是由 tryAcquire 控制的,那和 state 又有什麼關係呢? 我們完全可以定義一個自定義變數,比如 signfalse 代表無鎖,true 代表鎖定,好像也可以實現這段邏輯啊?這個時候就需要引出我們神奇的 compareAndSetCAS操作了。

3、AQS 繞不過的話題: CAS Compare And Swap

前面說到,我們暫時認為 :tryAcquire 返回 true,代表獲取到鎖,反之只要 tryAcquire 返回 flase,執行緒就會被阻塞(不準確,後面會細說)。實際上這裡有一個 隱含條件,我們必須做到:


  • ※ 無論何時,都只能有一個執行緒 tryAcquire 成功,且在某個執行緒 tryAcquire 成功之後,並在其 release 釋放鎖之前,任何執行緒進行 tryAcquire 都將返回 false

是的,就是併發問題!

下面這個例子我們簡單使用一個自定義變數 sign 來實現 tryAcquire,看看會發生什麼:

        private boolean sign;

        @Override
        protected boolean tryAcquire(int ignore) {
            boolean result = false;
            if (!sign) {
                sign = true;
                result =  true;
            }
            print("嘗試獲取鎖" + (result ? "成功" : "失敗"));
            return result;
        }

        @Override
        protected boolean tryRelease(int ignore) {
            sign = false;
            return true;
        }
========================================= 輸出
時間 - Fri May 24 18:03:12 CST 2019		Thread[main,5,main]		嘗試獲取鎖成功
時間 - Fri May 24 18:03:12 CST 2019		Thread[main,5,main]		main 執行緒 Sleep 之前
時間 - Fri May 24 18:03:12 CST 2019		Thread[Thread-0,5,main]		呼叫 mutex.lock() 之前
時間 - Fri May 24 18:03:12 CST 2019		Thread[Thread-0,5,main]		嘗試獲取鎖失敗
時間 - Fri May 24 18:03:12 CST 2019		Thread[Thread-0,5,main]		嘗試獲取鎖失敗
時間 - Fri May 24 18:03:12 CST 2019		Thread[Thread-0,5,main]		嘗試獲取鎖失敗
時間 - Fri May 24 18:03:17 CST 2019		Thread[main,5,main]		main 執行緒 Sleep 之後
時間 - Fri May 24 18:03:17 CST 2019		Thread[Thread-0,5,main]		嘗試獲取鎖成功
時間 - Fri May 24 18:03:17 CST 2019		Thread[Thread-0,5,main]		呼叫 mutex.lock() 之後

看起來好像沒問題,在這個 demo 中也得到了和第一個 DEMO 一樣的預期的結果。然而事情並沒有那麼簡單,新寫的這個 tryAcquire 實現是一個 "CompareThenSet" 操作,在併發的情況下,會出現不可預期的情況

  • 執行緒A 進來,發現 signfalse
  • 執行緒B 同時進來,也發現 signfalse
  • 兩者同時將 sign 修改為 true,問題就來了。

到底是 執行緒A 獲取到了鎖,還是 執行緒B 呢?(實際上都獲取到了)

我們改一下 Main 方法,我們使用 100 個執行緒併發執行 mutex.lock(); 獲取鎖成功則會輸出語句 print("獲取鎖成功");,執行,發現,竟然有兩個執行緒同時獲取到了鎖。有兩個執行緒同時將 sign 修改為了 true

    public static void main(String[] args) throws InterruptedException {
        Mutex mutex = new Mutex();

        List<Thread> threads = new ArrayList<>();
        for (int i = 0; i < 10000; i++) {
            threads.add(new Thread(() -> {
                mutex.lock();
                print("獲取鎖成功");
            }));
        }

        ExecutorService executorService = Executors.newFixedThreadPool(100);
        threads.forEach(executorService::submit);
        Thread.sleep(1000);
    }

如果我們使用 AQS 幫我們寫好的 compareAndSetState 則沒有這個問題。

Java9 之前,底層實現是呼叫 unsafe 包的 compareAndSwapInt 來實現的:

    protected final boolean compareAndSetState(int expect, int update) {
        // See below for intrinsics setup to support this
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }

而在 Java9 之後,則是使用 VarHandle 來實現 VarHandleunSafe 的一個替代方案,本文不多贅述,後面會有文章講到這個 ~ 。

    // VarHandle mechanics
    private static final VarHandle STATE;

---------------------------------------------

    protected final boolean compareAndSetState(int expect, int update) {
        return STATE.compareAndSet(this, expect, update);
    }

這裡簡單的說一下 CAS,即 CompareAndSwapCAS 可原子性地比較並替換一個值,樂觀鎖中一個典型的實現便是使用 CAS 來完成的。對併發程式設計有所瞭解的小夥伴應該都知道 CAS,一般情況下,Compare(比較)Swap(交換) 至少是兩個原子操作(實際上是更多個原子操作,主要看編譯成多少條機器碼)。CAS 則保證了 CompareSwap 為一個原子操作。


二、深入理解 AbstractQueuedSynchronizer(AQS) 資源鎖定與解鎖正向流程

上文說到,我們暫時認為 :tryAcquire 返回 true,代表獲取到鎖,反之只要 tryAcquire 返回 flase,執行緒就會被阻塞。

AQS 當然沒有這麼簡單,但我們可以先看看加鎖時呼叫的 acquire 方法:

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

我們發現,tryAcquire 只是第一重判斷,如果 tryAcquire 失敗,緊接著還有另一個核心邏輯 acquireQueued。在簡介裡,我們說,AQS 除了使用一個 原子state 來作為狀態判斷以外,還有一個 FIFO 佇列,此佇列就和 acquireQueued 方法息息相關。另外,AQS 所控制的資源訪問,還可以是共享的,或者獨佔的(addWaiter 引數 Node.EXCLUSIVE)。

以下的分析我們以一個簡單的 獨佔式非公平 AQS 實現: java.util.concurrent.locks.ReentrantLock.NonfairSync 來深入解析。獨佔式很好理解,大部分的鎖實現都只允許一個執行緒在同一時間獲取到鎖定的資源。

1、TryAcquire 與 TryRelease 的標準寫法及其優化

先看看 NonfairSynctryAcuire 是怎麼實現及優化的。首先,NonfairSync 中將 state == 0 定義為無鎖狀態。

  1. 競爭優化: 如果當前無鎖(state == 0),再呼叫 CAS。這實際上對效能是一個很好的優化,假設當前取 state 不為 0,實際上 CompareAndSetState 成功的概率也很小,這也可以避免同一時間內,過多的執行緒去併發修改 state 這個狀態。
  2. 重入設計: 試想如果我們不判斷當前執行緒是否持有鎖,就去進行 CAS 操作,會發生什麼?毫無疑問是 CAS 失敗,這會間接導致死鎖。這裡我們可以看到,重入以後,有一個 int nextc = c + acquires; 操作,這是方便我們記錄到底套了幾層鎖用的,如果沒有這個機制,我們將無法精確的控制加鎖和解鎖的層級,難免會出現一些意料之外的情況。簡單來說:lock 幾次,就要 unLock 幾次。當然我們也可以做到 aquire 多次,一次性 release 掉,或者反過來,取決於怎麼我們實現 tryAquiretryRlease 方法。
  3. 偏向優化: 這個優化實際上很簡單,如果說要獲取鎖的執行緒就是鎖的持有執行緒,我們無需去進行任何 CAS 操作,返回 true 即可。
        @ReservedStackAccess
        final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) { // 避免過多的執行緒競爭 CAS 操作
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);// 如果 CAS 操作成功,則將當前執行緒儲存起來,重入和解鎖時用於判斷。
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires; // 重入優化,每次加鎖相當於 `state++`
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true; // 偏向優化
            }
            return false;
        }



        @ReservedStackAccess
        protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) { // 每次解鎖相當於 `state--` 直到 state == 0 ,代表可釋放鎖了
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }

2、AcquireQueued 解析

①、addWaiter階段

如果 tryAquire 失敗,就會進入 acquireQueued(addWaiter(Node.EXCLUSIVE), arg))addWaiter 方法建立了一個新的 Node 例項,Node 例項中主要儲存了當前執行緒資訊,並將 nextWaiter 賦值為 Node.EXCLUSIVE, 這個 nextWaiter 後面再談,它主要用於執行緒排程、以及獨佔模式、共享模式的區分,我們可以先不管它。

操作比較簡單,原理是將 node 塞入雙向連結串列尾端,也就是前面提到的 FIFO佇列。就是利用 CAS 操作將新建立的、帶有本執行緒資訊的 node 設定為雙向連結串列新的 tail,並且修改兩者的 ‘指標’ prevnext

        /** Constructor used by addWaiter. */
        Node(Node nextWaiter) {
            this.nextWaiter = nextWaiter;
            THREAD.set(this, Thread.currentThread());
        }
      Node node = new Node(mode);

        for (;;) {
            Node oldTail = tail;
            if (oldTail != null) {
                node.setPrevRelaxed(oldTail);
                if (compareAndSetTail(oldTail, node)) {
                    oldTail.next = node;
                    return node;
                }
            } else {
                initializeSyncQueue(); // 初始化雙向連結串列,就是建立一個新的空 node,並且頭尾都是此 node。
                                       // 這個 node 除了拿來標記連結串列從哪裡開始,沒有什麼別的意義。
            }
        }
②、acquireQueued階段(自旋)

入隊成功後,進入 acquireQueued 方法,拋開執行緒被 interrupt 的情況acquireQueued 的程式碼其實也很簡單,我們不看 interrupt 相關邏輯,其實邏輯還是很簡單的。這是一個無限迴圈(或者說自旋),只要沒有 tryAcquire 成功,就會一直迴圈下去,邏輯如下:

  1. 如果上一個節點是 FIFO 佇列頭,則進行一次 tryAquire,如果成功,則跳出迴圈。
  2. 檢測是否需要阻塞,如果需要阻塞,則阻塞等待喚醒,parkAndCheckInterrupt 便是阻塞直到被喚醒(或者被 interrupt ,暫時先不考慮這個情況)。
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

tryAcquire()parkAndCheckInterrupt() 都很好理解,前者就是去嘗試一下獲取鎖定資源,看能否成功。後者則是阻塞直到被喚醒。

③、阻塞階段

我們先說說 shouldParkAfterFailedAcquire,這個判斷是一個挺有意思的設計,後續文章會細說,它和執行緒排程、取消獲取鎖等相關。因為在獲取鎖定資源和釋放鎖定資源的過程中,實際上我們只需要用到兩個狀態,一個是初始狀態 pred.waitStatus == 0,另一個是 pred.waitStatus == SIGNAL == -1

程式碼中我們可以很容易看出,在 CASprev 節點的 waitStatus 設定為 SIGNAL : -1 之前,都將返回 false,如果設定成功,下一次自旋進入該方法就是 true 了,也就是說,會進入 parkAndCheckInterrupt() 方法,阻塞直到被喚醒。

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            return true;
        if (ws > 0) {
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
自旋階段圖解:

阻塞階段圖解:

3、release 解析

①、喚醒 FIFO 的下一個節點

阻塞直到喚醒這個邏輯在鎖定資源、釋放資源 這兩個階段來看十分簡單,最後我們來看看 release 做了什麼,release 除了呼叫了我們自己實現的 tryRelease 之外,其實關鍵的就是這個 unparkSuccessor

tryRelease 上面也說過了,就是改改原子 state,這裡不多贅述。

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

程式碼中可以看出,當 FIFO 佇列不為空且頭結點的 waitStatu 被修改過,就會進入 unparkSuccessorunparkSuccessor 傳入了當前 FIFO 的佇列頭,邏輯如下:

  1. 如果當前節點 waitStatus 為負(可能為 SIGNALCONDITION 或者 PROPAGATE),我們這裡簡單先看成只有 SIGNAL 狀態,則 CAS 將其設定為 0。其他幾個狀態我們後面會說到。
  2. 如果 !(s == null || s.waitStatus > 0),也就是說 node.nextwaitStatus <= 0 ,則簡單的直接將其喚醒:LockSupport.unpark(s.thread);
    private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

②、被喚醒後

被喚醒的執行緒當然不是直接獲得了鎖,它還是會繼續 acquireQueue 進行自旋,邏輯還是和之前一樣,避免小夥伴往上翻程式碼,這裡貼了一份如果 prev 是頭結點,如果 tryAcquire 成功,我們看到其實很簡單,只是將自己設為頭部即可。

                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }


這篇文章只是簡單的說說 AQS 的正向獲取資源,釋放資源流程,後續會繼續解析 waitnotifycondition 等基於 AQS 的執行緒排程解析 ~~ 以及各個鎖是如何實現 AQS 的 ~~

文章皆是基於原始碼一步步分析,沒有參考過多資料,如有錯誤,請指出!!


另外歡迎來 Q 群討論技術相關(目前基本沒人)[左二維碼]~

如果覺得寫得好還可以關注一波訂閱號喲 ~ 部落格和訂閱號同步更新 [右二維碼]~


參考資料:

JDK12 原始碼
Brief introduction to AbstractQueuedSynchronizer by Using a Simple Mutex Example

另外小夥伴可以思考一下:

  1. 如果說階段7:ThreadB 被喚醒後,繼續自旋時,另一個執行緒ThreadC tryAcquire成功了會發生什麼。
  2. 如果說第一個問題了解了,那應該就很清楚為什麼說本文解析的這個鎖叫做:非公平鎖了
  3. 眾所周知,只要是 CAS 操作,都有 ABA 問題,如果說修改 waitStatus 發生了 ABA 問題,會