1. 程式人生 > >ReentrantLock 與 AQS 源碼分析

ReentrantLock 與 AQS 源碼分析

str finall unit 不可 就是 功能 初始化 結點 找到

ReentrantLock 與 AQS 源碼分析

1. 基本結構

?? 重入鎖 ReetrantLock,JDK 1.5新增的類,作用與synchronized關鍵字相當,但比synchronized更加靈活。ReetrantLock本身也是一種支持重進入的鎖,即該鎖可以支持一個線程對資源重復加鎖,但是加鎖多少次,就必須解鎖多少次,這樣才可以成功釋放鎖。

1. 繼承

沒有繼承任何類,因為很多操作都使用了組合完成。

2. 實現

Lock, java.io.Serializable
??這裏著重介紹一下 Lock 接口,接口定義了幾個必要的方法,也是在 ReentrantLock 中的重點需要分析的方法。
?? 三類方法:獲取鎖、釋放鎖、獲取條件。

public interface Lock {
    // 阻塞獲取鎖,如果獲取不到鎖就一直等待
    void lock();
    // 可中斷獲取鎖,在獲取鎖的過程可以被中斷,但是 Synchronized 是不可以
    void lockInterruptibly() throws InterruptedException;
    // 非阻塞獲取鎖,沒有獲取到鎖立即返回
    boolean tryLock();
    // 超時獲取鎖,沒獲取到鎖等待一段時間
    boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
    // 解鎖
void unlock(); // 等待喚醒機制的條件 Condition newCondition(); }

從上面可以看到 Synchronized 和 Lock 的一些重要區別:

  1. Lock 的獲取鎖的過程是可以中斷的,Synchronized 不可以,Synchronized 只能在 wait或同步代碼塊執行過程中才可以被中斷。

  2. 由於 Lock 顯示的加鎖,鎖可以橫跨幾個方法,也就是臨界區的位置可以更加自由。

  3. Lock 支持超時獲取鎖。

  4. 後面會看到 Lock 還支持公平及非公平鎖。

  5. 綁定多個 Condition 條件

3. 主要字段

??很好,這個類的字段非常的少,真正起作用的字段只有一個 “鎖” 字段。

    // 同步鎖
    private final Sync sync;

?? 這個鎖(Sync)是一個繼承自 AQS 的抽象內部類,說明一下 AQS (AbstractQueuedSynchronizer) 一般被稱為隊列同步器,他是並發包中的核心組件,絕大多數鎖機制都是采用的這個類來實現的。雖然看到他是一個抽象類,但是你會發現裏面沒有一個方法是抽象方法,他實現了鎖機制中的必要的通用的方法,待會會專門講這個類。不然 ReentrantLock 沒辦法說,ReentrantLock 裏面的鎖操作都是依賴於 AQS。

?? 然後這個鎖是有兩個子類,分別是 NonfairSyncFairSync 從名字上也可以看出這兩個類分別代表了 公平鎖非公平鎖 。何為鎖的公平性? 實際上就是新來的線程需要征用鎖必須要要等到先於他到達的線程獲取並釋放鎖。也就是獲取鎖的過程是按照下來後到的順序進行的,反之就稱為非公平鎖。後面我們會看到其實這兩種鎖不同就在於非公平鎖在新線程創建後首先會直接進行鎖的獲取,如果沒有獲取到會進行一段時間的自旋,始終沒獲取到鎖才進行等待狀態。

?? 一般而言,公平鎖開銷比非公平鎖大,這也是比較符合我們的直觀感受。公平鎖是需要進行排隊的,但在某些場景下,可能更註重時間先後順序,那麽公平鎖自然是很好的選擇。

?? 好總結一下,在 ReentrantLock 中只維護了一個 “鎖” 變量,這個鎖是繼承了 AQS 同步器,然後這個鎖又有兩種派生的鎖:公平鎖,非公平鎖。那麽 ReentrantLock 實現其實就有兩種方式:公平鎖,非公平鎖。

4. 主要方法概覽

  1. ctor-2
  2. lock
  3. lockInterruptibly
  4. tryLock
  5. tryLock(time)
  6. unlock
  7. newCondition

2. 基礎並發組件 AQS

1. 基本字段

1. 重要字段

?? AQS 是維護了一個同步隊列(雙向鏈表),這個隊列裏面線程都是需要競爭鎖的,沒有競爭到的就在同步隊列中等待。headtail 就指向隊列的首尾。state 是一個標誌字段,表示當前有多少線程在臨界區。一般來說 state 只能是 0 或 1 但是由於鎖是可重入的,所以也有大於 1 的情況。

?? 除了一個同步隊列還有 0~n 個等待隊列,等待隊列就是調用了 await 方法的線程,會被掛到調用了 awaitcondition 上面的等待隊列,所以有多少個 condition 就有多少等待隊列。

    //同步隊列頭指針
    private transient volatile Node head;
    // 同步隊列尾指針
    private transient volatile Node tail;
    // 狀態標誌,0 則沒有線程在臨界區,非零表示有 state 個線程在臨界區(由於鎖可重入)
    private volatile int state;

2. Node 節點

??Node 節點也就是上文所提到的 同步隊列等待隊列 中的元素,註意兩個隊列之間的元素類型是一樣的因為他們之間會有相互移動轉換的動作,這兩個隊列中的元素自然是線程,為了方便查找和表示 AQS 將線程封裝到了 Node 節點中,構成雙向隊列。

static final class Node {
        // 共享非 null/獨占為 null  
        static final Node SHARED = new Node();
        static final Node EXCLUSIVE = null;

        /**
         * 線程狀態
         */
        static final int CANCELLED =  1;
        static final int SIGNAL    = -1;
        static final int CONDITION = -2;
        static final int PROPAGATE = -3;
        volatile int waitStatus;
       // 雙向鏈表  這兩個指針用於同步隊列構建鏈表使用的   下面還有一個 nextWaiter 是用來構建等待單鏈表隊列
        volatile Node prev;
        volatile Node next;
        // 線程
        volatile Thread thread;
        // 等待隊列單鏈表
        Node nextWaiter;

        /**
         * Returns true if node is waiting in shared mode.
         */
        final boolean isShared() {
            return nextWaiter == SHARED;
        }

    }

?? 可以看到上面有一個 waitStatus 屬性,代表了線程當前的狀態,狀態標識就是那些常量。具體如下:

  1. SIGNAL: 正在執行的線程結束釋放鎖或者被取消執行,他必須喚醒後續的狀態為 SIGNAL 節點

  2. CANCELLED: 在同步隊列中等待的線程等待超時或被中斷,需要從同步隊列中取消該Node的結點, 其結點的waitStatus為CANCELLED,即結束狀態,進入該狀態後的結點將不會再變化。

  3. CONDITION: 該標識的結點處於等待隊列中(不是同步隊列),結點的線程等待在Condition上,當其他線程調用了Condition的signal()方法後,CONDITION狀態的結點將從等待隊列轉移到同步隊列中,等待獲取同步鎖。

  4. PROPAGATE:在共享模式中,該狀態標識結點的線程處於可運行狀態。

  5. 0:代表初始化狀態。

?? 可以看到,Node 裏面的主要字段就是一個狀態標誌位、一個線程的引用、用於構建鏈表的指針。註意,有三個指針,其中前兩個 nextpre 是用來構建同步隊列的(雙向鏈表),後面 nextWaiter 是用來構建等待隊列。所以說雖然同步隊列和等待隊列使用的同一個數據類型,數據結構是不同的,並且在後面我們會看到等待隊列中的節點只有兩種狀態 ConditionCANCELLED 前者表示線程已結束需要從等待隊列中移除,後者表示條件結點等待被喚醒。

??下面畫圖說明一下同步隊列和等待隊列的情況。
等待隊列
技術分享圖片
同步隊列
技術分享圖片

3. ConditionObject

?? 這個內部類是等待喚醒機制的核心,在他上面綁定了一個等待隊列。在這個類中使用了兩個指針( firstWaiter/lastWaiter )指向隊列的首尾。這裏主要看一下 awaitsignalsignalAll 方法。

  1. await
    ?? 當一個線程調用了await()相關的方法,那麽首先構建一個Node節點封裝當前線程的相關信息加入到等待隊列中進行等待,並釋放鎖直到被喚醒(移動到同步隊列)、中斷、超時才被隊列中移出。被喚醒後的第一件事是搶鎖和檢查是否被中斷,然後才是移除隊列。被喚醒時候的狀態應該為 SIGNAL ,而在方法中執行的移除隊列的操作就是移除狀態非 Condition 的節點。
public final void await() throws InterruptedException {
            // 等待可中斷
            if (Thread.interrupted())
                throw new InterruptedException();
            // 加入等待隊列, new 新的 Node 做一個尾插入
            Node node = addConditionWaiter();
            // 釋放當前線程的鎖,失敗則將當前線程設置為取消狀態
            int savedState = fullyRelease(node);

            int interruptMode = 0;
            // 如果沒在同步隊列就讓線程等待也就是看是否被喚醒
            // 如果有中斷或者被喚醒那麽退出循環
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            // 運行到此處說明已經被喚醒了,因為結束了循環
            // 喚醒後,首先自旋一下獲取鎖,同時判斷是否中斷
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            // 清理隊列中狀態不是 Condition 的的任務,包括被喚醒的 SIGNAL 和 被取消的 CANCELLED
            if (node.nextWaiter != null)
                unlinkCancelledWaiters();
            //被中斷 拋異常
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }
  1. signal/doSignal/signalAll
    ?? 執行 signal 首先進行鎖的判斷,如果沒有獲取到獨占鎖就直接拋出異常。這也就是為什麽只有擁有鎖的線程才能執行 signal ,然後獲取等待隊列中的第一個節點執行 doSignal。
        public final void signal() {
            // 獲取獨占鎖
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            // 喚醒等待隊裏中的第一個線程
            Node first = firstWaiter;
            if (first != null)
                doSignal(first);
        }

?? doSignal 方法主要就幹了三個事 :

  1. 將被喚醒的節點從等待隊列中移除(while 循環體),如果被喚醒的節點被取消了就繼續喚醒後面的節點(transferForSignal 返回 false)
  2. 否則把這個節點加入到同步隊列 ( enq 方法 )
  3. 當同步隊列中當前節點的前驅被取消或者沒辦法喚醒時則喚醒這個線程 ( unpark ),這時候調用了 unpark 正好和 await 中的 park 相對應使得 await 的線程被喚醒,接著執行循環體判斷自己已經被移入到同步隊列了,接著就可以執行後面的獲取鎖的操作。
 private void doSignal(Node first) {
            do {
                // 頭指針指向喚醒節點的下一個節點,並順便判斷等待隊列是否空
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                // 解除引用
                first.nextWaiter = null;
            } while (!transferForSignal(first) && (first = firstWaiter) != null); //移入同步隊列失敗則繼續喚醒下一個線程,否則喚醒成功
            // 喚醒成功的線程不一定馬上能開始執行,只有在前驅節點被取消或者沒辦法被喚醒時
   }
   
   
   //  將節點從等待隊列移動到同步隊列   成功返回 true 失敗 false
    final boolean transferForSignal(Node node) {
        // 在等待隊列中的節點只有 condition 和 cancelled 兩種狀態,如果狀態更新失敗說明任務被取消
        // 否則更新為初始狀態   直接返回的話上面的 doSignal 就會繼續喚醒後面的線程
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;
        // 把當前節點加入同步隊列
        Node p = enq(node);
        // 獲取同步隊列中倒數第二個節點的狀態,當前節點的前驅
        int ws = p.waitStatus;
        // 如果前驅節點被取消或者在設置前驅節點狀態為Node.SIGNAL狀態失敗時,喚醒被通知節點代表的線程
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }
    
    
    // 插入一個節點到同步隊列,如果同步隊列是空的則加入一個空節點做為頭結點
    // 死循環保證肯定能插入    返回插入節點的前驅
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                // 這一步不需要 cas 是因為並發沒關系,只是指向鏈表結尾,不會多線程更新問題
                node.prev = t;
                // 可能有多個線程搶
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

?? 有一個小問題,就是在某個線程中執行了別人的 signal 不會導致當前線程立即放棄鎖,之所以會這樣正是由於 ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL) 這個判斷,即前驅線程都結束了。比如下面的例子:

package util.AQSTest;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

// test signal 執行後不會導致當前線程立即釋放鎖
public class AQSTest {
    static Lock lock = new ReentrantLock();
   static Condition run1Cond = lock.newCondition();
    static Condition run2Cond = lock.newCondition();

    static class Runner1 implements Runnable {
        @Override
        public void run() {
            lock.lock();
            try {
                System.out.println("runner 1 start");
                run1Cond.await(1, TimeUnit.SECONDS);
                run2Cond.signal();
                System.out.println("runner 1 exit");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }

        }
    }

    static class Runner2 implements Runnable {
        @Override
        public void run() {
            lock.lock();
            try {
                System.out.println("runner 2 start");
                run2Cond.await();
                System.out.println("runner 2 exit");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally {
                lock.unlock();
            }

        }
    }
    public static void main(String[] args) {
        new Thread(new Runner1(),"runner1").start();
        new Thread(new Runner2(),"runner2").start();
    }
}

輸出的結果始終是:

runner 1 start
runner 2 start
runner 1 exit
runner 2 exit

?? 我使用了工具對上面的代碼進行了調試,大致說一下流程,順便用來捋一捋等待喚醒機制。

?? 首先 runner1 啟動,獲取到鎖,打印出 “runner1 start” ,然後調用了 await 方法,此時 runner1 線程就執行了 AQS 中的 ConditionObject 中的 await 方法,該方法首先 new 了一個新的節點,把 runner1 封裝到這個節點裏面。掛在了 run1Con 的等待隊列上,然後執行了釋放鎖並判斷中斷。緊接著 runner1 線程執行循環體判斷是否被喚醒也就是是否在同步隊列,顯然這時候不在,就直接調用了 park 方法,執行休眠 1 秒鐘操作, park 方法是 native 方法由操作系統實現。在上面線程釋放鎖的時候執行的操作是 fullyRelease 這個方法調用了 release 方法,而 release 方法中釋放了鎖之後,會檢查同步隊列中是否還有以前因為沒搶到鎖而等待的線程,如果有執行 unparkSuccessor 也就是喚醒同步隊列中的後繼線程。那麽此時 runner2 會被喚醒,喚醒後就去搶鎖,獲取到 lock 鎖後輸出了 “runner2 start” ,然後 runner2 線程又會因為調用 await 處於和 runner1 同樣的境地,也就是被放入 run2Con 的等待隊列。好!此時 runner1 的超時時間到了,就會被 unpark 這個 unpark 是被操作系統調用的,之後繼續執行循環體發現超時時間小於等於 0 ,則調用 transferAfterCancelledWait 裏面調用了 enq 就是加入同步隊列,接著開始競爭鎖,開始執行 run2Con 上的 signal 此時 signal 調用 doSignal 先執行 do while 中的循環體,runner2 從 run2Con 的等待隊列上移除,然後執行 transferForSignal 裏面又調用了 enq 將他加入同步隊列,並返回同步隊列中的前驅,前驅節點狀態不是 Cancelled 或者 可以被置為 SIGNAL 則 signal 方法結束。接著打印了 “runner1 exit” 。接著需要執行 finally 裏面的釋放鎖的操作了,顯然 unlock 肯定調用了 release ,而 release 會喚醒同步隊列中的後繼的線程,那麽位於同步隊列中的 runner2 之前的 park 狀態就會被打斷,從而跳出 while 循環,執行獲取鎖的操作。打印出 “runner2 exit” ,最後釋放鎖整個程序結束。

?? 現在總算是吧 Condition 的等待喚醒機制弄清楚了。也把 AQS 中的兩個內部類的功能都解釋完了。接下來就看 AQS 中的方法。

2. 重要方法

  1. get/setState
  2. release/tryRelease/unparkSuccessor/fullyRelease
  3. acquire/tryAcquire/addWaiter/tryQueued
  4. acquireShared
  5. releaseShared

?? 這些屬於 AQS 中常用的方法,但是裏面的核心方法都是模板方法,也就是說由繼承他的子類來實現,所以只能看個大概的邏輯。一會等到講 ReentrantLock 時再詳細說這裏面的方法。

3. ReentrantLock 內部類 Sync/fairSync/noFairSync

1. Sync

?? 這三個內部類實際上是繼承自 AQS ,也就是說 ReentrantLock 是采用了 AQS 作為自己的核心並發控制組件完成的一系列的鎖操作,及等待喚醒機制。

?? 首先看一下 Sync 他是後面兩個的父類,他直接繼承自 AQS 。AQS 中留了幾個比較重要的模板方法 tryAcquire 、tryRelease 。這個方法直接實現了一些在公平鎖和非公平鎖中的通用操作,也就是釋放鎖的操作 tryRelease 。

?? tryRelease 的實現很簡單,主要就是依賴於 AQS 中的 state 屬性,如果state 值減去要釋放的信號量為 0 則釋放成功,否則失敗。

        // 釋放鎖的公共操作
        protected final boolean tryRelease(int releases) {
            // 釋放鎖首先就是使用 AQS 中的 state 的值減去信號量 判斷是否為0
            // 如果是 0 則表明成功釋放鎖,獨占線程設為 null,否則說明還占用鎖
            int c = getState() - releases;
            // 必須獲取到鎖才能解鎖,否則拋異常
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }

2. fairSync

    公平鎖執行 lock 操作就是執行了 AQS 中的 acquire(1) 也就是請求一個鎖資源。但是註意,在 AQS 中的 acquire 中的 tryAcquire 方法沒有實現,所以必須由當前類實現。

    在 tryAcquire 中做的事情就是看是否有代碼在臨界區。沒有則還要看同步隊列中是否有線程等待,當只有這一個線程在獲取鎖的時候才能正常的獲取鎖,其他情況都失敗。
// 公平鎖
    static final class FairSync extends Sync {
        final void lock() {
            acquire(1);
        }

        // 沒有代碼在臨界區或者是當前線程的重入 則獲取成功,否則失敗
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            // 如果當前線程在獲取鎖的過程沒有其他線程在臨界區
            if (c == 0) {
                // 如果同步隊列中沒有等待的線程,就設置 state ,並且當前線程設為獨占線程
                if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            // 有程序在臨界區,如果是當前線程可重入,加上請求的資源數
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            // 競爭鎖失敗,因為他是公平的鎖競爭
            return false;
        }
    }

3. noFairSync

同理,這個方法也需要實現 lock 和 tryAcquire 操作。在 lock 中直接判斷是否有代碼在臨界區,沒有則直接獲取到鎖,與公平鎖不同的是:公平鎖還判斷了等待隊列中是否有等待的線程。有在臨界區的情況時執行 acquire 操作。同樣的,首先要執行 tryAcquire 如果失敗,加入同步隊列並自旋獲取鎖。還是 tryAcquire 的實現,這裏又調用了 nonfairTryAcquire。

    // 非公平鎖
    static final class NonfairSync extends Sync {
        final void lock() {
            // 如果沒有代碼在臨界區 直接獲取鎖,獨占
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
            // 有代碼在臨界區則執行嘗試獲取鎖
                acquire(1);
        }

        // 和公平鎖中的 tryAcquire 一模一樣只是少了關於同步隊列中是否有等待線程的判斷
        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }
    
    final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            // 沒有線程獲取鎖 直接獲取到鎖  和公平鎖中的 tryAcquire 一模一樣只是少了關於同步隊列的判斷
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            // 重入鎖
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

?? 好了,現在我們 AQS 中的空的核心方法也被子類實現了,那麽現在 fairSync 和 noFairSync 就算是一個完整的 AQS 了。此時看一下加解鎖的流程。

只說公平鎖,因為非公平鎖就只是少了一個判斷。

  1. 首先 sync 調用 lock 方法,讓後 lock 調用了 AQS 的 acquire(1) 也就是獲取一個鎖資源。

  2. acquire 就先調用 tryAcquire(1) 嘗試獲取鎖,這時候代碼又回調到 sync 中的實現的 tryAcquire 方法,這個方法先判斷鎖是否已經被別的線程使用,然後需要確定沒有更早的線程在同步隊列等待獲取鎖,才把當前線程設置為獨占線程,並設置 state 值獲取鎖。但是如果有代碼在臨界區需要判斷是否為當前線程,因為鎖是可重入的。如果是當前線程則 state 加上請求鎖的個數,返回。

  3. 這時候又回到 AQS 中,如果上面嘗試獲取鎖的過程失敗,就需要調用 addWaiter 將當前線程封裝成一個獨占節點,等待狀態默認為 0,並且返回當前節點。

  4. 加入同步隊列後,再調用 acquireQueued 方法,當此線程是同步隊列中等待的第一個線程則自旋嘗試獲取鎖,畢竟很可能正在執行的線程馬上就會釋放鎖了,再進行休眠不合適。如果自旋獲取鎖失敗則判斷節點狀態是否為 SIGNAL 然後執行等待操作。

  5. 鎖獲取成功則把當前節點設置為頭結點,把 thread = null

  6. 至此,Acquire 方法執行結束。

然後調用 unlock 方法解鎖操作。

  1. 解鎖操作就沒那麽麻煩,首先還是調用到了 AQS 中的 release 方法,這個方法首先嘗試解鎖當前線程,又回調到了 sync 中的 tryRelease 。

  2. tryRelease 邏輯比較簡單,使用 AQS 中的 state 減去釋放的資源數,等於 0 代表完全釋放,否則釋放失敗。

  3. 如果 tryRelease 成功執行就要去喚醒同步隊列中的後繼節點,繼續執行。

  4. 至此,release 方法執行完畢。

4. AQS 中的要方法

1. get/setState

??這兩個方法主要是對 state 變量的 volatile 的讀寫,其實裏面就就是普通的 get/set 方法。但是註意的一點就是 state 是 volatile 的。

    // 對狀態變量的 volatile 讀寫
    protected final int getState() {
        return state;
    }
    protected final void setState(int newState) {
        state = newState;
    }

2. release/tryRelease/unparkSuccessor/fullyRelease

?? 這幾個方法在一起說主要是因為他們之間存在調用鏈,首先來看 release 這個方法我們在上面也分析了,裏面調用了 tryRelease 、unparkSuccessor。 也就是首先調用 tryRelease 來釋放當前線程的鎖,如果釋放成功就調用 unparkSuccessor 來喚醒同步隊列中後繼節點。其中 tryRelease 是由子類來實現,裏面的主要邏輯就是看當前的 state 變量的值在修改過後是否為0 。這裏還有一個 fullRelease 主要是在 ConditionObject 中調用的,當執行 await 的操作的時會執行此方法釋放鎖。

 //  嘗試釋放鎖
    public final boolean release(int arg) {
        // 如果釋放鎖成功 喚醒同步隊列中的後繼節點
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
    
    // 喚醒同步隊列中的後繼節點
    private void unparkSuccessor(Node node) {
        // node 一般就是當前正在運行的線程
        int ws = node.waitStatus;
        // 當前線程置為初始狀態   可以失敗
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
        // 找到同步隊列中的下一個節點
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {  //沒有下一個節點或者被取消
            s = null;
            // 從後往前找第一個沒有被取消的線程
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        // 喚醒那個線程
        if (s != null)
            LockSupport.unpark(s.thread);
    }
    
    final int fullyRelease(Node node) {
        boolean failed = true;
        try {
            int savedState = getState();
            if (release(savedState)) {
                failed = false;
                return savedState;
            } else {
                throw new IllegalMonitorStateException();
            }
        } finally {
            if (failed)
                node.waitStatus = Node.CANCELLED;
        }
    }

3. acquire/tryAcquire/addWaiter/acquireQueued

這個和上面的一樣,在執行了 acquire 後,會去調用子類復寫的 tryAcquire 方法,這個方法就是看有否有代碼塊在臨界區,沒有的話直接獲取鎖(非公平鎖),設置 state,有的話要判斷是不是當前線程能否進行重入操作,否則就獲取失敗。失敗後會調用 addWaiter ,new 一個新的節點加入到同步隊列,接著調用了 acquireQueued 如果這個節點是同步隊列中的第一個等待的線程(但不是第一個節點,因為第一個節點是 thread=null 的運行中的線程)就自旋一段時間看能否獲取到鎖。不能則 park 等待。

// 獲取鎖
    public final void acquire(int arg) {
        // 嘗試獲取鎖 失敗則加入同步隊列 如果是同步隊列中的第一個線程就自旋獲取鎖
        // 上面的步驟的自旋獲取鎖階段,返回的是是否需要中斷,所以下面就進行 selfInterrupt
        // tryAcquire 是模板方法,因為對於公平鎖和非公平鎖獲取鎖方式不同
        if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    
    
    // 創建一個節點放入到同步對列中   可傳入是否為獨占鎖   返回當前節點
    private Node addWaiter(Node mode) {
        // 默認的 status 是 0
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            // 把 tail 設置為 node 成功說明沒有競爭
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        // 失敗則就說明空隊列   創建頭結點
        enq(node);
        return node;
    }
    
     final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            // 自旋獲取鎖
            for (;;) {
                // 獲取前驅節點
                final Node p = node.predecessor();
                // 如果前驅是空的頭結點,那麽也就是說當前線程就是隊列中的第一個線程 並嘗試獲取鎖  成功的話方法返回中斷情況
                if (p == head && tryAcquire(arg)) {
                    // 把當前節點設置為頭結點  thread=null 也就可以看做當前線程在運行,所以就不在同步隊列
                    setHead(node);
                    // gc
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                // 如果獲取鎖失敗,檢測為 SIGNAL 或者設置為 SIGNAL 然後讓此線程等待 等待操作在 parkAndCheckInterrupt 中完成
                if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            // 失敗 取消
            if (failed)
                cancelAcquire(node);
        }
    }

5. 總結

?? 其實到這裏 ReentrantLock 已經講完了,因為他底層全部調用的是 Sync 中的方法,也就是全都是調用了 AQS 中的方法。而 AQS 中的大部分重要的方法都已經看過了。

ReentrantLock 與 AQS 源碼分析