1. 程式人生 > >ReentrantLock 原始碼分析以及 AQS (一)

ReentrantLock 原始碼分析以及 AQS (一)

## 前言 JDK1.5 之後釋出了JUC(java.util.concurrent),用於解決多執行緒併發問題。AQS 是一個特別重要的同步框架,很多同步類都藉助於 AQS 實現了對執行緒同步狀態的管理。 AQS 中最主要的就是獨佔鎖和共享鎖的獲取和釋放,以及提供了一些可中斷的獲取鎖,超時等待鎖等方法。 ReentranLock 是基於 AQS 獨佔鎖的一個實現。ReentrantReadWriteLock 是基於 AQS 共享鎖的一個讀寫鎖實現。本來打算一篇文章裡面寫完獨佔鎖和共享鎖,但是發現篇幅太長了,也不易於消化。 因此,本篇就先結合 ReentrantLock 原始碼,分析 AQS 的獨佔鎖獲取和釋放。以及 ReentrantLock 的公平鎖和非公平鎖實現。 下一篇再寫 ReentrantReadWriteLock 讀寫鎖原始碼,以及 AQS 共享鎖的獲取和釋放。 在正式講解原始碼之前,牆裂建議讀者做一些準備工作,最好對以下知識有一定的瞭解,這樣閱讀起來原始碼會比較輕鬆(因為,我當初剛開始接觸多執行緒時,直接看 AQS 簡直是一臉懵逼,就像讀天書一樣。。)。 1. 瞭解雙向連結串列的資料結構,以及佇列的入隊出隊等操作。 2. LockSupport 的 park,unpark 方法,以及對執行緒的 interrupt 幾個方法瞭解(可參考:[LockSupport的 park 方法是怎麼響應中斷的?](https://mp.weixin.qq.com/s/-adhc-LYfvd9YXqSxstFiQ))。 3. 對 CAS 和自旋機制有一定的瞭解。 ## AQS 同步佇列 AQS 內部維護了一個 FIFO(先進先出)的雙向佇列。它的內部是用雙向連結串列來實現的,每個資料節點(Node)中都包含了當前節點的執行緒資訊,還有它的前後兩個指標,分別指向前驅節點和後繼節點。下邊看一下 Node 的屬性和方法: ``` static final class Node { //可以認為是一種標記,表明了這個 node 是以共享模式在同步佇列中等待 static final Node SHARED = new Node(); //也是一種標記,表明這個 node 是以獨佔模式在同步佇列中等待 static final Node EXCLUSIVE = null; /** waitStatus 常量值 */ //說明當前節點被取消,原因有可能是超時,或者被中斷。 //節點被取消的狀態是不可逆的,也就是說此節點會一直停留在取消狀態,不會轉變。 static final int CANCELLED = 1; //說明後繼節點的執行緒被 park 阻塞,因此當前執行緒需要在釋放鎖或者被取消時,喚醒後繼節點 static final int SIGNAL = -1; //說明執行緒在 condition 條件佇列等待 static final int CONDITION = -2; //在共享模式中用,表明下一個共享執行緒應該無條件傳播 static final int PROPAGATE = -3; //當前執行緒的等待狀態,除了以上四種值,還有一個值 0 為初始化狀態(條件佇列的節點除外)。 //注意這個值修改時是通過 CAS ,以保證執行緒安全。 volatile int waitStatus; //前驅節點 volatile Node prev; //後繼節點 volatile Node next; //當前節點中的執行緒,通過建構函式初始化,出隊時會置空(這個後續說,重點強調) volatile Thread thread; //有兩種情況。1.在 condition 條件佇列中的後一個節點 //2. 一個特殊值 SHARED 用於表明當前是共享模式(因為條件佇列只存在於獨佔模式) Node nextWaiter; //是否是共享模式,理由同上 final boolean isShared() { return nextWaiter == SHARED; } //返回前驅節點,如果為空丟擲空指標 final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } Node() { // Used to establish initial head or SHARED marker } Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; } Node(Thread thread, int waitStatus) { // Used by Condition this.waitStatus = waitStatus; this.thread = thread; } } ``` 另外,在 AQS 類中,還會記錄同步佇列的頭結點和尾結點: ``` //同步佇列的頭結點,是懶載入的,即不會立即建立一個同步佇列, //只有當某個執行緒獲取不到鎖,需要排隊的時候,才會初始化頭結點 private transient volatile Node head; //同步佇列的尾結點,同樣是懶載入。 private transient volatile Node tail; ``` ## 獨佔鎖 這部分就結合 ReentrantLock 原始碼分析 AQS 的獨佔鎖是怎樣獲得和釋放鎖的。 ### 非公平鎖 首先,我們從 ReentrantLock 開始分析,它有兩個構造方法,一個構造,可以傳入一個 boolean 型別的引數,表明是用公平鎖還是非公平鎖模式。另一個構造方法,不傳入任何引數,則預設用非公平鎖。 ``` public ReentrantLock() { sync = new NonfairSync(); } public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); } ``` NonfairSync 和 FairSync 都繼承自 Sync ,它們都是 ReentranLock 的內部類。 而Sync 類又繼承自 AQS (AbstractQueuedSynchronizer)。 ``` static final class NonfairSync extends Sync { } static final class FairSync extends Sync { } abstract static class Sync extends AbstractQueuedSynchronizer { } ``` 知道了它們之間的繼承關係,我們就從非公平鎖的加鎖方法作為入口,跟蹤原始碼。因為非公平鎖的流程講明白之後,公平鎖大致流程都一樣,只是多了一個條件判斷(這個,一會兒後邊細講,會做對比)。 **NonfairSync.lock** 我們看下公平鎖的獲取鎖的方法: ``` final void lock() { //通過 CAS 操作把 state 設定為 1 if (compareAndSetState(0, 1)) //如果設值成功,說明加鎖成功,儲存當前獲得鎖的執行緒 setExclusiveOwnerThread(Thread.currentThread()); else //如果加鎖失敗,則執行 AQS 的acquire 方法 acquire(1); } public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } ``` **acquire** 這個方法的邏輯是: 1. 通過 tryAcquire 方法,嘗試獲取鎖,如果成功,則返回 true,失敗返回 false 。 2. tryAcquire 失敗之後,會先呼叫 addWaiter 方法,把當前執行緒封裝成 node 節點,加入同步佇列(獨佔模式)。 3. acquireQueued 方法會把剛加入佇列的 node 作為引數,通過自旋去獲得鎖。 **tryAcquire** 這是一個模板方法,具體的實現需要看它的子類,這裡對應的就是 ReentrantLock.NonfairSync.tryAcquire 方法。我們看一下: ``` protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } final boolean nonfairTryAcquire(int acquires) { //當前執行緒 final Thread current = Thread.currentThread(); //獲取當前的同步狀態,若為 0 ,表示無鎖狀態。若大於 0,表示已經有執行緒搶到了鎖。 int c = getState(); if (c == 0) { //然後通過 CAS 操作把 state 的值改為 1。 if (compareAndSetState(0, acquires)) { // CAS 成功之後,儲存當前獲得鎖的執行緒 setExclusiveOwnerThread(current); return true; } } // 如果 state 大於0,則判斷當前執行緒是否是獲得鎖的執行緒,是的話,可重入。 else if (current == getExclusiveOwnerThread()) { //由於 ReentrantLock 是可重入的,所以每重入一次 state 就加 1 。 int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } ``` **addWaiter** 如果獲取鎖失敗之後,就會呼叫 addWaiter 方法,把當前執行緒加入同步佇列。 ``` private Node addWaiter(Node mode) { //把當前執行緒封裝成 Node ,並且是獨佔模式 Node node = new Node(Thread.currentThread(), mode); //嘗試快速入隊,如果失敗,則會呼叫 enq 入隊方法。enq 會初始化佇列。 Node pred = tail; //如果 tail 不為空,說明當前佇列中已經有節點 if (pred != null) { //把當前 node 的 prev 指標指向 tail node.prev = pred; //通過 CAS 把 node 設定為 tail,即新增到隊尾 if (compareAndSetTail(pred, node)) { //把舊的 tail 節點的 next 指標指向當前 node pred.next = node; return node; } } //當 tail 為空時,把 node 新增到佇列,如果需要的話,先進行佇列初始化 enq(node); //入隊成功之後,返回當前 node return node; } ``` **enq** 通過自旋,把當前節點加入到佇列中 ``` private Node enq(final Node node) { for (;;) { Node t = tail; //如果 tail為空,說明佇列未初始化 if (t == null) { //建立一個空節點,通過 CAS把它設定為頭結點 if (compareAndSetHead(new Node())) //此時只有一個 head頭節點,因此把 tail也指向它 tail = head; } else { //第二次自旋時,tail不為空,於是把當前節點的 prev指向 tail節點 node.prev = t; //通過 CAS把 tail節點設定為當前 node節點 if (compareAndSetTail(t, node)) { //把舊的 tail節點的 next指向當前 node t.next = node; return t; } } } } ``` **acquireQueued** 入隊成功之後,就會呼叫 acquireQueued 方法自旋搶鎖。 ``` final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { //獲取當前節點的前驅節點 final Node p = node.predecessor(); //如果前驅節點就是 head 節點,就呼叫 tryAcquire 方法搶鎖 if (p == head && tryAcquire(arg)) { //如果搶鎖成功,就把當前 node 設定為頭結點 setHead(node); p.next = null; // help GC failed = false; //搶鎖成功後,會把執行緒中斷標誌返回出去,終止for迴圈 return interrupted; } //如果搶鎖失敗,就根據前驅節點的 waitStatus 狀態判斷是否需要把當前執行緒掛起 if (shouldParkAfterFailedAcquire(p, node) && //執行緒被掛起時,判斷是否被中斷過 parkAndCheckInterrupt()) //注意此處,如果被執行緒被中斷過,需要把中斷標誌重新設定一下 interrupted = true; } } finally { if (failed) //如果丟擲異常,則取消鎖的獲取,進行出隊操作 cancelAcquire(node); } } ``` **setHead** 通過程式碼,我們可以看到,當前的同步佇列中,只有第二個節點才有資格搶鎖。如果搶鎖成功,則會把它設定為頭結點。 ``` private void setHead(Node node) { head = node; node.thread = null; node.prev = null; } ``` 需要注意的是,這個方法,會把頭結點的執行緒設定為 null 。想一下,為什麼? 因為,此時頭結點的執行緒已經搶鎖成功,需要出隊了。自然的,佇列中也就不應該存在這個執行緒了。 PS:由 enq 方法,還有 setHead 方法,我們可以發現,頭結點的執行緒總是為 null。這是因為,頭結點要麼是剛初始化的空節點,要麼是搶到鎖的執行緒出隊了。因此,我們也常常把頭結點叫做虛擬節點(不儲存任何執行緒)。 **shouldParkAfterFailedAcquire** 以上是搶鎖成功的情況,那麼搶鎖失敗了呢?這時,我們需要判斷是否應該把當前執行緒掛起。 ``` private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { //獲取當前節點的前驅節點的 waitStatus int ws = pred.waitStatus; if (ws == Node.SIGNAL) //如果 ws = -1 ,說明當前執行緒可以被前驅節點正常喚醒,於是就可以安全的 park了 return true; if (ws > 0) { //如果 ws > 0,說明前驅節點被取消,則會從當前節點依次向前查詢, //直到找到第一個沒有被取消的節點,把那個節點的 next 指向當前 node //這一步,是為了找到一個可以把當前執行緒喚起的前驅節點 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { //如果 ws 為 0,或者 -3(共享鎖狀態),則把它設定為 -1 //返回 false,下次自旋時,就會判斷等於 -1,返回 true了 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } ``` **parkAndCheckInterrupt** 如果 shouldParkAfterFailedAcquire 返回 true,說明當前執行緒需要被掛起。因此,就執行此方法,同時檢查執行緒是否被中斷。 ``` private final boolean parkAndCheckInterrupt() { //把當前執行緒掛起,則 acquireQueued 方法的自旋就會暫停,等待前驅節點 unpark LockSupport.park(this); //返回當前節點是否被中斷的標誌,注意此方法會把執行緒的中斷標誌清除。 //因此,返回上一層方法時,需要設定 interrupted = true 把中斷標誌重新設定,以便上層程式碼可以處理中斷 return Thread.interrupted(); } ``` 想一下,為什麼搶鎖失敗後,需要判斷是否把執行緒掛起? 因為,如果搶不到鎖,並且還不把執行緒掛起,acquireQueued 方法就會一直自旋下去,這樣你的CPU能受得了嗎。 **cancelAcquire** 當不停的自旋搶鎖時,若發生了異常,就會呼叫此方法,取消正在嘗試獲取鎖的執行緒。node 的位置分為三種情況,見下面註釋, ``` private void cancelAcquire(Node node) { if (node == null) return; // node 不再指向任何執行緒 node.thread = null; Node pred = node.prev; //從當前節點不斷的向前查詢,直到找到一個有效的前驅節點 while (pred.waitStatus > 0) node.prev = pred = pred.prev; Node predNext = pred.next; //把 node 的 ws 設定為 -1 node.waitStatus = Node.CANCELLED; // 1.如果 node 是 tail,則把 tail 更新為 node,並把 pred.next 指向 null if (node == tail && compareAndSetTail(node, pred)) { compareAndSetNext(pred, predNext, null); } else { int ws; //2.如果 node 既不是 tail,也不是 head 的後繼節點,就把 node的前驅節點的 ws 設定為 -1 //最後把 node 的前驅節點的 next 指向 node 的後繼節點 if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { Node next = node.next; if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); } else { //3.如果 node是 head 的後繼節點,則直接喚醒 node 的後繼節點。 //這個也很好理解,因為 node 是佇列中唯一有資格嘗試獲取鎖的節點, //它放棄了資格,當然有義務把後繼節點喚醒,以讓後繼節點嘗試搶鎖。 unparkSuccessor(node); } node.next = node; // help GC } } ``` **unparkSuccessor** 這個喚醒方法就比較簡單了, ``` private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; //從尾結點向前依次遍歷,直到找到距離當前 node 最近的一個有效節點 for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) //把這個有效節點的執行緒喚醒, //喚醒之後,當前執行緒就可以繼續自旋搶鎖了,(回到 park 的地方) LockSupport.unpark(s.thread); } ``` 下面畫一個流程圖更直觀的檢視整個獲取鎖的過程。 ![](https://img2020.cnblogs.com/other/1714084/202003/1714084-20200313214616066-1438351358.jpg) ### 公平鎖 公平鎖和非公平鎖的整體流程大致相同,只是在搶鎖之前先判斷一下是否已經有人排在前面,如果有的話,就不執行搶鎖。我們通過原始碼追蹤到 FairSync.tryAcquire 方法。會發現,多了一個 hasQueuedPredecessors 方法。 **hasQueuedPredecessors** 這個方法判斷邏輯稍微有點複雜,有多種情況。 ``` public final boolean hasQueuedPredecessors() { Node t = tail; Node h = head; Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); } ``` 1. 如果 h == t,說明 h 和 t 都為空(此時佇列還未初始化)或者它們是同一個節點(說明佇列已經初始化,並且只有一個節點,此時為 enq 方法第一次自旋成功後)。此時,返回false。 2. 如果 h != t,則判斷 head.next == null 是否成立,如果成立,則返回 true。這種情況發生在有其他執行緒第一次入隊時。在 AQS 的 enq 入隊方法,設定頭結點成功之後 compareAndSetHead(new Node()) ,還未執行 tail = head 時(仔細想一想為什麼?)。此時 tail = null , head = new Node(),head.next = null。 3. 如果 h != t,並且 head.next != null,說明此時佇列中至少已經有兩個節點,則判斷 head.next 是否是當前執行緒。如果是,返回 false(注意是 false哦,因為用了 !),否則返回 true 。 總結:以上幾種情況,只有最終返回 false 時,才會繼續往下執行。因為 false,說明沒有執行緒排在當前執行緒前面,於是通過 CAS 嘗試把 state 值設定為 1。若成功,則方法返回。若失敗,同樣需要去排隊。 ### 公平鎖和非公平鎖區別 舉個例子來對比公平鎖和非公平鎖。比如,現在到飯點了,大家都到食堂打飯。把佇列中的節點比作排隊打飯的人,每個打飯視窗都有一個管理員,只有排隊的人從管理員手中搶到鎖,才有資格打飯。打飯的過程就是執行緒執行的過程。 ![](https://img2020.cnblogs.com/other/1714084/202003/1714084-20200313214616329-1001586117.jpg) 如果,你發現前面沒有人在排隊,那麼就可以直接從管理員手中拿到鎖,然後打飯。對於公平鎖來說,如果你前面有人在打飯,那麼你就要排隊到他後面(圖中B),等他打完之後,把鎖還給管理員。那麼,你就可以從管理員手中拿到鎖,然後打飯了。後面的人依次排隊。這就是FIFO先進先出的佇列模型。 對於非公平鎖來說,如果你是圖中的 B,當 A 把鎖還給管理員後,有可能有另外一個 D 插隊過來直接把鎖搶走。那麼,他就可以打飯,你只能繼續等待了。 所以,可以看出來。公平鎖是嚴格按照排隊的順序來的,先來後到嘛,你來的早,就可以早點獲取鎖。優點是,這樣不會造成某個執行緒等待時間過長,因為大家都是中規中矩的在排隊。而缺點呢,就是會頻繁的喚起執行緒,增加 CPU的開銷。 非公平鎖的優點是吞吐量大,因為有可能正好鎖可用,然後執行緒來了,直接搶到鎖了,不用排隊了,這樣也減少了 CPU 喚醒排隊執行緒的開銷。 但是,缺點也很明顯,你說我排隊排了好長時間了,終於輪到我打飯了,憑什麼其他人剛過來就插到我前面,比我還先打到飯,也太不公平了吧,後邊一大堆排隊的人更是怨聲載道。這要是每個人來了都插到我前面去,我豈不是要餓死了。 ### 獨佔鎖的釋放 我們從 ReentrantLock 的 unlock 方法看起: ``` public void unlock() { //呼叫 AQS 的 release 方法 sync.release(1); } public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; //如果頭結點不為空,並且 ws 不為 0,則喚起後繼節點 if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } ``` 這段邏輯比較簡單,當執行緒釋放鎖之後,就會喚醒後繼節點。 unparkSuccessor 已講,不再贅述。然後看下 **tryRelease** 方法,公平鎖和非公平鎖走的是同一個方法。 ``` protected final boolean tryRelease(int releases) { //每釋放一次鎖,state 值就會減 1,因為之前可能有鎖的重入 int c = getState() - releases; //如果當前執行緒不是搶到鎖的執行緒,則丟擲異常 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { //只有 state 的值減到 0 的時候,才會全部釋放鎖 free = true; setExclusiveOwnerThread(null); } setState(c); return free; } ``` 因為,ReentrantLock 支援鎖的重入,所以每次重入 state 值都會加 1,相應的每次釋放鎖, state 的值也會減 1 。所以,這也是為什麼每個 lock 方法最後都要有一個 unlock 方法釋放鎖,它們的個數需要保證相同。 當 state 值為 0 的時候,說明鎖完全釋放。其他執行緒才可以有機會搶到鎖。 ## 結語 以上已經講解了獨佔鎖主要的獲取方法 acquire ,另外還有一些其他相關方法,不再贅述,因為主要邏輯都是一樣的,只有部分稍有不同,只要理解了 acquire ,這些都是相通的。如 acquireInterruptibly 方法,它可以在獲取鎖的時候響應中斷。還有超時獲取鎖的方法 doAcquireNanos 可以設定獲取鎖的超時時間,超時之後就返回失敗。 下篇預告:分析 ReentrantReadWriteLock 讀寫鎖原始碼,以及 AQS 共享鎖的獲取和釋放,敬請期待。 如果本文對你有用,歡迎點贊,評論,轉發。 學習是枯燥的,也是有趣的。我是「煙雨星空」,歡迎關注,可第一時間接收文章