1. 程式人生 > >Java讀原始碼之ReentrantLock

Java讀原始碼之ReentrantLock

## 前言 ReentrantLock 可重入鎖,應該是除了 synchronized 關鍵字外用的最多的執行緒同步手段了,雖然JVM維護者瘋狂優化 synchronized 使其已經擁有了很好的效能。但 ReentrantLock 仍有其存在價值,例如可以感知執行緒中斷,公平鎖模式,可以指定超時時間的搶鎖等更細粒度的控制都是目前的 synchronized 做不到的。 如果不是很瞭解 Java 中執行緒的一些基本概念,可以看之前這篇: [Java讀原始碼之Thread](https://www.cnblogs.com/freshchen/p/11674575.html) ## 案例 用一個最簡單的案例引出我們的主角 ```java public class ReentrantLockDemo { // 預設是非公平鎖和 synchronized 一樣 private static ReentrantLock reentrantLock = new ReentrantLock(); public void printThreadInfo(int num) { reentrantLock.lock(); try { System.out.println(num + " : " + Thread.currentThread().getName()); System.out.println(num + " : " + Thread.currentThread().toString()); } finally { reentrantLock.unlock(); } } public static void main(String[] args) { ExecutorService executorService = Executors.newCachedThreadPool(); IntStream.rangeClosed(0, 3) .forEach(num -> executorService .execute(() -> new ReentrantLockDemo().printThreadInfo(num)) ); } /** * 輸出: * 0 : pool-1-thread-1 * 0 : Thread[pool-1-thread-1,5,main] * 3 : pool-1-thread-4 * 3 : Thread[pool-1-thread-4,5,main] * 1 : pool-1-thread-2 * 1 : Thread[pool-1-thread-2,5,main] * 2 : pool-1-thread-3 * 2 : Thread[pool-1-thread-3,5,main] */ ``` 可以看到使用起來也很簡單,而且達到了同步的效果。廢話不多說一起來瞅一瞅 lock() 和 unlock() 兩個同步方法是怎麼實現的。 ## 原始碼分析 ### 公平鎖與非公平鎖 公平鎖顧名思義。就是每個執行緒排隊搶佔鎖資源。而非公平鎖執行緒什麼時候能執行更多的看緣分,例如一個執行緒需要執行臨界區程式碼,不管之前有多少執行緒在等,直接去搶鎖,說白了就是插隊。對於 ReentrantLock 的實現,從構造器看出,當我們傳入 true 代表選擇了公平鎖模式 ```java public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); } ``` 為什麼先看公平鎖實現,而不是預設的非公平鎖,因為 synchronized 就是非公平鎖,1.7開始 synchronized 的實現改變了,並且基本借鑑了 ReentrantLock 的實現,加入了自旋,偏向鎖減少系統呼叫,所以如果需要非公平鎖且不需要特別精細的控制,完全沒有必要因為效能選擇 ReentrantLock 了。 ### AQS 結構 從案例中的 lock 方法進入 - **ReentrantLock.FairSync#lock** ```java final void lock() { // 要一把鎖,向誰要鎖? acquire(1); } ``` 在繼續深入之前讓我們先熟悉一下 AbstractQueuedSynchronizer(AKA :AQS) 的結構 - 首先繼承了 AbstractOwnableSynchronizer ,主要屬性: ```java // 儲存當前持有鎖的執行緒 private transient Thread exclusiveOwnerThread; ``` - AQS 自身主要屬性: ```java // 阻塞佇列的頭 private transient volatile Node head; // 阻塞佇列的尾 private transient volatile Node tail; // 同步器的狀態 private volatile int state; ``` 從 head 和 tail 可以猜想到,AQS 應該是用一個連結串列作為等待佇列,給等待的執行緒排隊, status 欄位預設是0,一旦鎖被某個執行緒佔有就 +1,那為啥要用int呢? 如果當前持有鎖的這個執行緒(exclusiveOwnerThread)還要再來把鎖,那狀態還可以繼續 +1,也就實現了可重入。 - 上面的 Node 節點長啥樣呢,不要被註釋中 CLH 鎖高大上的名稱嚇到,其實就是雙向連結串列,主要屬性: ```java // 標識次節點是共享模式 static final Node SHARED = new Node(); // 標識次節點是獨佔模式 static final Node EXCLUSIVE = null; // 節點裡裝著排隊的執行緒 volatile Thread thread; // 節點裡裝的執行緒放棄了,不搶鎖了,可能超時了,可能中斷了 static final int CANCELLED = 1; // 下一個節點裡的執行緒等待被通知出隊 static final int SIGNAL = -1; // 節點裡裝的執行緒在等待執行條件,結合 Condition 使用 static final int CONDITION = -2; // 節點狀態需要被傳播到下一個節點,主要用在共享模式 static final int PROPAGATE = -3; // 標識節點的等待狀態,初始0,取值是上面的 -3 ~ 1 volatile int waitStatus; // 前一個節點 volatile Node prev; // 後一個節點 volatile Node next; // 指向下一個等待條件 Condition Node nextWaiter; ``` 去掉一些普通情況不會涉及的屬性,如果有四個執行緒競爭,結構如下圖所示: ![](https://cdn.jsdelivr.net/gh/freshchen/resource/img/draw/aqs-1-1.png) 可以看到就是一個標準的頭節點為空的雙鏈表,為什麼頭節點是空? ### 公平鎖加鎖 - **AbstractQueuedSynchronizer#acquire** ```java public final void acquire(int arg) { // 如果嘗試拿鎖沒成功,那就進等待佇列 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 檢測到執行緒被中斷了,因為重置了中斷訊號但沒做處理,再設定下中斷位,讓使用者去處理,中斷標準操作 selfInterrupt(); } static void selfInterrupt() { Thread.currentThread().interrupt(); } ``` - **ReentrantLock.FairSync#tryAcquire** ```java protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); // 取AQS的 state 值 int c = getState(); // 當前沒有執行緒持有鎖 if (c == 0) { // 如果沒有其他執行緒在排隊(公平) if (!hasQueuedPredecessors() && // 這裡可能存在競爭 CAS 試著去搶一次鎖 compareAndSetState(0, acquires)) { // 搶到鎖了,把鎖持有者改成自己,其他執行緒往後稍稍 setExclusiveOwnerThread(current); return true; } } // 鎖已經被持有了,但如果鎖主人就是自己,那歡迎光臨(可重入) else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); // 因為其他執行緒進不來,這裡不存在競爭,直接改鎖狀態 setState(nextc); return true; } return false; } ``` - **AbstractQueuedSynchronizer#hasQueuedPredecessors** ```java // 返回 false 代表不需要排隊,true 代表要排隊 public final boolean hasQueuedPredecessors() { Node t = tail; Node h = head; Node s; // h == t 頭等於尾只可能是剛初始的狀態或者已經沒有節點等待了 // h.next == null ? 下面介紹進隊的過程中,如果其他執行緒與此同時 tryAcquire 成功了,會把之前的head.next置為空,說明被捷足先登了,差一點可惜 // 如果到最後一個判斷了,也就是佇列中至少有一個等待節點,直接看第一個等待節點是不是自己,如果不是自己就乖乖排隊去 return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); } ``` - **AbstractQueuedSynchronizer#addWaiter** tryAcquire如果沒有拿到鎖,就需要進等待隊列了,變成一個 Node 例項 ```java // 這裡 mode 為獨佔模式 private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); Node pred = tail; // 如果尾節點不為空,說明等待佇列已經初始化過 if (pred != null) { node.prev = pred; // 嘗試把自己放到隊尾 if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } // 進來這裡說明,等待佇列沒有被初始化過,或者嘗試失敗了 enq(node); return node; } ``` - **AbstractQueuedSynchronizer#enq** ```java private Node enq(final Node node) { for (;;) { Node t = tail; // 如果尾節點是空,說明佇列沒有初始化 if (t == null) { // 初始化一個空節點(延遲載入),head ,tail都指向它 if (compareAndSetHead(new Node())) tail = head; } // 一直嘗試把自己塞到隊尾(自旋) else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } } ``` - **AbstractQueuedSynchronizer#acquireQueued** addWaiter方法已經把等待執行緒包裝成節點放到等待隊列了,為啥要返回中斷標識呢?主要是為了給一些需要處理中斷的方式複用,例如 **ReentrantLock#lockInterruptibly**,以及帶超時的鎖 ```java final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; // 這邊邏輯開始繞起來了 for (;;) { // 拿前一個節點 final Node p = node.predecessor(); // 前一個節點是head,說明自己排在第一個 if (p == head && // 在讓出cpu前再試一次,此時可能鎖持有者已經讓位了 tryAcquire(arg)) { // 搶到鎖了 setHead(node); // 把之前沒用的頭節點釋放 p.next = null; // help GC failed = false; return interrupted; } // 兩次嘗試都失敗了,只能準備被掛起,讓出cpu了(調了核心,重量級) if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { // 普通的鎖不處理中斷異常,不會進這個方法 if (failed) cancelAcquire(node); } } private void setHead(Node node) { // 把頭節點設為自己 head = node; // 因為已經搶到鎖了,不需要記錄這個執行緒在等待了,保持了頭節點中執行緒永遠為 null node.thread = null; node.prev = null; } ``` - **AbstractQueuedSynchronizer#shouldParkAfterFailedAcquire** ```java private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) // 已經告訴前一個節點自己需要被通知了 return true; if (ws >
0) { // 只有 CANCELLED 這個狀態大於0,如果前面的節點不排隊了,就一直找到一個沒 CANCELLED 的 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } // 進到這裡,只剩下PROPAGATE(共享鎖時候才有) CONDITION(本文不涉及) 和 未賦值狀態也就是0, else { // 這裡把 預設狀態0 置為 -1,也就代表著後面有執行緒在等著被喚醒了 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } // 返回false,就暫時不會讓執行緒掛起,繼續自旋,直到返回true return false; } ``` - **AbstractQueuedSynchronizer#parkAndCheckInterrupt** ```java private final boolean parkAndCheckInterrupt() { // 掛起,標準用法this充當blocker LockSupport.park(this); // 一旦恢復,返回執行緒在掛起階段是否被中斷,此方法會重置中斷位 return Thread.interrupted(); } ``` 到這裡加鎖流程就介紹差不多了,用一個最簡單流程的圖來總結一下: ![](https://cdn.jsdelivr.net/gh/freshchen/resource/img/draw/aqs-1-2.png) ### 公平鎖解鎖 - **AbstractQueuedSynchronizer#release** ```java public final boolean release(int arg) { // 嘗試釋放鎖 if (tryRelease(arg)) { Node h = head; // 如果等待佇列已經被初始化過,並且後面有節點等待操作 if (h != null && h.waitStatus != 0) // 恢復掛起的執行緒 unparkSuccessor(h); return true; } return false; } ``` - **ReentrantLock.FairSync#tryRelease** ```java protected final boolean tryRelease(int releases) { int c = getState() - releases; // 能執行釋放鎖的肯定是鎖的持有者,除非虛擬機器魔怔了 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; // 考慮可重入 if (c == 0) { free = true; // 鎖現在沒有持有者了 setExclusiveOwnerThread(null); } setState(c); return free; } ``` - **ReentrantLock.FairSync#unparkSuccessor** ```java // node 是頭節點 private void unparkSuccessor(Node node) { int ws = node.waitStatus; // 如果狀態不是 CANCELED,就把狀態置為初始狀態 if (ws < 0) compareAndSetWaitStatus(node, ws, 0); Node s = node.next; // s == null 這個條件成立主要是在共享模式下自旋釋放。 if (s == null || s.waitStatus >
0) { // 把 CANCELED 狀態的節點置為空 s = null; // 因為 head 這條路已經斷了,從尾巴開始找到第一個排隊的節點,然後把佇列接上 for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) // 把第一個排隊的節點中的執行緒喚醒, LockSupport.unpark(s.thread); } ``` 執行緒從加鎖程式碼裡介紹的 **AbstractQueuedSynchronizer#parkAndCheckInterrupt** 方法中醒來,繼續自旋拿鎖。如果此時後面還有人排隊就一定能拿到鎖了。如圖所示: ![](https://cdn.jsdelivr.net/gh/freshchen/resource/img/draw/aqs-1-3.png) ### 非公平鎖加鎖 - **ReentrantLock.NonfairSync#lock** ```java final void lock() { // 不管三七二十一,直接搶鎖,如果運氣好,鎖正好被釋放了,就不排隊了 if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else // 和上面介紹的公平鎖一樣,只是 tryAcquire 實現不一樣 acquire(1); } ``` - **ReentrantLock.Sync#nonfairTryAcquire** 上面公平鎖我們已經知道,執行緒真正掛起前會嘗試兩次,由於不考慮別人有沒有入隊,實現非常簡單 ```java final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); // 如果沒有執行緒持有鎖,直接搶鎖 if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } // 如果是重入,狀態累加 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } ``` ### 非公平鎖解鎖 因為都是獨佔鎖模式,解鎖和公平鎖邏輯一樣。 ## 總結 至此,總算看完了 ReentrantLock 常規的加鎖解鎖原始碼,好好體會下 AQS 的結構,還是能看懂的,且很有收穫,總之 Doug Lea 大神牛B。 本文還是挖了很多坑的: - 帶超時的鎖是如何實現的? - 檢測中斷的鎖是如何實現的? - 各種 Condition 是如何實現的? - 鎖的共享模式是如何實現的? 以後有時間再一探究