1. 程式人生 > >JUC原始碼分析(二)-ReentrantLock原始碼分析

JUC原始碼分析(二)-ReentrantLock原始碼分析

背景介紹

ReentrantLock是JUC中的重要的類,其靜態內部類Sync繼承AQS,關於AQS的分析可見JUC-AbstractQueuedSynchronizer(AQS)原始碼分析,Sync重寫了tryAcquire()和tryRelease()完成加鎖功能,靜態內部類NonfairSyncFairSync分別實現非公平鎖和公平鎖.

ReentrantLock語義

我們知道AQS的語義是由子類重寫tryAcquire()和tryRelease()完成的,在Sync中,

  1. state=0,表示鎖資源可用
  2. state=1,表示鎖資源已被另一個執行緒獲取,當前執行緒獲取鎖資源失敗
  3. 由於ReentrantLock支援可重入鎖,所以state>1時,state數值即為鎖被重入的次數
    /**
     * 獲取鎖
     * 1.若沒有另一執行緒佔有鎖,則設定鎖持有數為1並直接返回
     * 2.若當前執行緒已經佔有鎖,則將鎖持有數加1並直接返回
     * 3.若鎖被另一執行緒佔有,則當前執行緒掛起直到可以獲得鎖
     */
    public void lock() {
        //呼叫sync.lock(),公平鎖與非公平鎖的實現不同
        sync.lock();
    }
    /**
     * 嘗試釋放鎖
     * 1.若當前執行緒佔有鎖,則將鎖持有數減1,若鎖持有數為0則釋放鎖
     * 2.若當前執行緒沒有佔有鎖,則丟擲IllegalMonitorStateException
     */
public void unlock() { //公平鎖與非公平鎖實現一致 sync.release(1); }

其lock()和unlock()均呼叫其內部類Sync完成

公平鎖與非公平鎖

Sync

Sync繼承AQS,釋放鎖的邏輯由公平鎖和非公平鎖共用,如下:

        /**
         * tryRelease()較為簡單,將state設定為state - releases;若state==0,再設定exclusiveOwnerThread即可
         */
        @ReservedStackAccess
protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; }

NonfairSync

        /**
         * 嘗試直接獲取鎖,若失敗則呼叫acquire(1)
         */
        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

NonfairSync繼承Sync,tryAcquire()簡單呼叫Sync.nonfairTryAcquire()

        /**
         * Performs non-fair tryLock.  tryAcquire is implemented in
         * subclasses, but both need nonfair try for trylock method.
         */
        @ReservedStackAccess
        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                //鎖資源仍可用
                if (compareAndSetState(0, acquires)) {
                    //若使用CAS修改state值成功,即成功獲取鎖
                    //設定獨佔執行緒
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            //即使鎖資源不可用,但current即為獨佔執行緒,依然可以修改state值(實現"可重入"功能)
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                //由於此段程式碼實現可重入功能,即進入此段程式碼執行的執行緒都已獲得鎖,此時setState(nextc)無需原子操作
                setState(nextc);
                return true;
            }
            return false;
        }

FairSync

        /**
         * 由於CAS不能保證公平性,因此直接呼叫acquire(1)
         */
        final void lock() {
            acquire(1);
        }

FairSync繼承Sync,其tryAcquire()如下

        /**
         * Fair version of tryAcquire.  Don't grant access unless
         * recursive call or no waiters or is first.
         */
        @ReservedStackAccess
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                //僅在此處增加了!hasQueuedPredecessors()的條件以實現公平鎖,如何實現見"辨析"
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

辨析

上文介紹了公平鎖和非公平鎖的程式碼,現在請大家思考一個問題:

  1. AQS的release()會喚醒AQS佇列的第一個執行緒還是喚醒AQS佇列的所有執行緒?

檢視原始碼發現,答案是只會喚醒AQS佇列的第一個執行緒.

既然只會喚醒AQS佇列的第一個執行緒,那麼

  1. 是否說明AQS佇列中只會有一個被喚醒的執行緒去嘗試獲取鎖?
  2. 是否說明被喚醒的執行緒一定可以無競爭的獲取鎖?
  3. 是否說明獲取鎖的順序一定是按照的被掛起在AQS佇列中的順序,即獲取鎖的順序一定是按照請求獲取鎖的先後順序,即獲取鎖的順序天生就是公平的?

問題有點多,先思考一下.

  1. 我們看到第三個問題:獲取鎖的順序天生就是公平的嗎?–>利用反證法可知,答案是不是,否則在ReentrantLock中就不需要有FairSync了.

  2. 再看第一個問題:是否說明AQS佇列中只會有一個被喚醒的執行緒去嘗試獲取鎖?–>答案是是的,因為確實AQS佇列中只有第一個執行緒被喚醒了

  3. 再看一下第二個問題:是否說明被喚醒的執行緒一定可以無競爭的獲取鎖?–>答案是不是,即使AQS佇列中只會有一個被喚醒的執行緒去嘗試獲取鎖,但有可能有一個未被掛起在AQS佇列上的執行緒與剛被喚醒的執行緒爭奪鎖.

設想這樣一種場景

  1. 執行緒1獲取鎖成功,而後執行緒2~5依次嘗試獲取鎖失敗,被掛到AQS佇列上.
  2. 執行緒1釋放鎖,則此時掛在AQS佇列第一位的執行緒2則被喚醒,並且嘗試獲取鎖資源.但與此同時執行緒6也嘗試獲取鎖,便會出現執行緒2與執行緒6競爭鎖資源.

如果是非公平鎖,執行緒6可能會獲取鎖資源,此時執行緒6後申請鎖但卻先獲取鎖,不公平.
如果是公平鎖,我們看下hasQueuedPredecessors()的程式碼

    public final boolean hasQueuedPredecessors() {
        Node t = tail;
        Node h = head;
        Node s;
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }
  • h==t,此時AQS佇列尚未初始化,hasQueuedPredecessors()返回false,任意執行緒都可嘗試獲取鎖
  • h!=t,此時AQS已初始化完畢
    • 對執行緒6而言,此時便屬於此種情況,並且h.next!=null&&s.thread != Thread.currentThread(),此時hasQueuedPredecessors()返回true,執行緒6不能嘗試獲取鎖,將被掛起
    • 對執行緒2而言,h.next!=null&&s.thread == Thread.currentThread(),此時hasQueuedPredecessors()返回false,執行緒2可以嘗試獲取鎖

因此執行緒2線上程6前獲取鎖,保證了公平性.

可中斷與不可中斷

Java 執行緒中的中斷

在計算機領域,中斷在不同場景有著不同的含義.在此討論的中斷僅為java.lang.Thread中與中斷相關的interrupt(),interrupted(),isInterrupted()等方法.
在我看來,Thread的是否中斷就是一個標誌位,只不過是一個內建的統一的標誌位機制.
思考這樣一種場景,有一個執行緒A執行迴圈,迴圈終止的條件由B執行緒控制,那麼執行緒的程式碼可能如下:

while(!stop){
    ...
}

當B執行緒需要通知A執行緒停止迴圈時,只需要設定A.stop=true即可.這種實現方式有一個致命的缺點–>破壞了封裝性,如果B執行緒要通知A執行緒,就一定要讓A.stop對可見,當然可以修改為通過一個公有方法通知A執行緒,但是執行緒間通知中斷資訊是Thread中非常普遍的場景,不如讓java.lang.Thread就實現這個功能.另外,若A執行緒執行一個native方法呢?此時如何讓這個native方法停止執行呢?總不能還要為native方法設定控制方法是否執行的公共方法供其他執行緒呼叫吧.因此這就是Java 執行緒中斷機制的來由.

其實中斷機制也可以當做是一種IPC方式,只是傳遞的資訊已經確定(即停止執行緒執行),因此收到此資訊的執行緒需要儘快停止執行,那收到中斷資訊的執行緒如何停止執行呢?Java中有哪些可以讓程式立馬停止執行的方式呢?
答案是returnException,但是因為return無法區分執行緒是正常執行結束退出還是因為接收中斷資訊而停止執行的,所以在接收中斷資訊後使用Exception(這也是Java引入異常機制的原因,學習的知識又交叉了),也就是InterruptedException,是我們非常熟悉的異常.

當B執行緒呼叫A.interrupt()通知A執行緒中斷資訊後,A執行緒有兩個選擇:

  1. 線上程執行的程式碼中有判斷當前執行緒是否中斷的程式碼,若A執行緒檢測到中斷,則throw new InterruptException,此時我們稱之為可中斷.
  2. 線上程執行的程式碼中沒有判斷當前執行緒是否中斷的程式碼,這樣即使B執行緒通知A執行緒中斷資訊,A執行緒也會繼續執行,此時我們稱之為不可中斷.

lockInterruptibly()

其實上文說的可中斷不可中斷往往是針對方法的.剛好ReentrantLock中有一個可中斷的方法lockInterruptibly(),我們一起看下:

    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

方法簽名與我們上文介紹的一致,該方法可能throws InterruptedException

    public final void acquireInterruptibly(int arg)
            throws InterruptedException {
        //首先判斷是否被中斷,若是,丟擲異常
        if (Thread.interrupted())
            throw new InterruptedException();
        if (!tryAcquire(arg))
            doAcquireInterruptibly(arg);
    }
    /**
     * Acquires in exclusive interruptible mode.
     * @param arg the acquire argument
     */
    private void doAcquireInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.EXCLUSIVE);
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    return;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    //丟擲異常
                    throw new InterruptedException();
            }
        } catch (Throwable t) {
            cancelAcquire(node);
            throw t;
        }
    }
  1. acquireInterruptibly()相比acquire()多了判斷是否中斷的邏輯
  2. doAcquireInterruptibly()就相當於addWaiter()+acquireQueued(),只不過在檢測到被中斷時丟擲異常而不是返回是否被中斷的資訊
  3. acquire()相比acquireInterruptibly()多了selfInterrupt(),是因為在acquireQueued()中檢測是否被中斷後將中斷標記清除了,因此若已被中斷需要執行selfInterrupt()設定中斷標記;這也是acquireQueued()返回一個布林值表示當前執行緒是否被中斷的原因

另:雖然根據上述分析acquireQueued()中的parkAndCheckInterrupt()可以修改為park()而不再檢測是否被中斷,因為即使檢測到被中斷之後,在acquire()中還需要再次設定中斷標記.但是acquireQueued()不僅僅被acquire()呼叫,在JUC–ReentrantLock及Condition原始碼分析中的waitThread:condition.await()被阻塞的後半部分我們可以看到acquireQueued()還會被condition.await()呼叫,此時可以根據acquireQueued()的返回值實現更豐富的中斷處理策略.

定時模式

ReentrantLock提供了一種可中斷的可設定等待時間的獲取鎖的方法,即tryLock(long timeout, TimeUnit unit),如何實現可中斷的獲取鎖的方法已在上文分析,接下來分析如何實現可設定等待時間的獲取鎖的方法.

    /**
     * 可中斷的可設定等待時間的獲取鎖的方法
     */
    public boolean tryLock(long timeout, TimeUnit unit)
            throws InterruptedException {
        //簡單呼叫AQS的相關方法(ReentrantLock的Sync類未重寫此方法)
        return sync.tryAcquireNanos(1, unit.toNanos(timeout));
    }
    /**
     * 嘗試獲取獨佔鎖,可中斷也可定時
     */
    public final boolean tryAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        //檢查中斷標記,實現可中斷的方法
        if (Thread.interrupted())
            throw new InterruptedException();
        //1.執行tryAcquire()嘗試獲取鎖
        //2.呼叫doAcquireNanos(arg, nanosTimeout)完成定時功能
        return tryAcquire(arg) ||
            doAcquireNanos(arg, nanosTimeout);
    }

可以看出,實現可定時是通過呼叫doAcquireNanos(arg, nanosTimeout)完成的

    /**
     * 以獨佔定時模式獲取鎖
     */
    private boolean doAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (nanosTimeout <= 0L)
            return false;
        //根據nanosTimeout計算deadline
        final long deadline = System.nanoTime() + nanosTimeout;
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return true;
                }
                nanosTimeout = deadline - System.nanoTime();
                //1.若執行一次tryAcquire()後就已超時,返回false
                if (nanosTimeout <= 0L)
                    return false;
                if (shouldParkAfterFailedAcquire(p, node) &&
                    nanosTimeout > spinForTimeoutThreshold)
                    //2.呼叫定時的掛起方法
                    LockSupport.parkNanos(this, nanosTimeout);
                if (Thread.interrupted())
                    throw new InterruptedException();
            }
        } finally {
            //若未成功獲取鎖,則將生成的Node從AQS佇列中取下
            if (failed)
                cancelAcquire(node);
        }
    }

由上可知,通過

  1. 呼叫定時的掛起方法(LockSupport.parkNanos(this, nanosTimeout);)
  2. 若剩餘時間不足則返回fasle

來實現定時模式.

總結

  • ReentrantLock通過將加鎖操作委託給內部類Sync完成加鎖語義
  • 公平性:由於每次只能喚醒AQS佇列中的第一個執行緒,所以鎖的競爭只會存在於未在AQS佇列的執行緒和AQS佇列的第一個執行緒之間,公平鎖相比非公平鎖增加了hasQueuedPredecessors()的判斷,確保只有AQS佇列的第一個執行緒才可以嘗試獲取鎖資源,保證了公平性
  • 可中斷:AQS的中斷方法僅僅在方法內部增加了判斷當前執行緒是否中斷的邏輯,若當前執行緒已被中斷,丟擲InterruptedException
  • 定時模式:通過呼叫定時的掛起方法和在程式碼中檢測剩餘時間實現定時模式

參考