1. 程式人生 > >ReentrantLock解析及原始碼分析

ReentrantLock解析及原始碼分析

##本文結構 - Tips:說明一部分概念及閱讀原始碼需要的基礎內容 - ReentrantLock簡介 - 公平機制:對於公平機制和非公平機制進行介紹,包含對比 - 實現:`Sync`原始碼解析額,公平和非公平模式的加鎖、解鎖過程及原始碼分析 - 公平鎖和非公平鎖的加鎖流程圖 - `ReentrantLock`提供的一些其他方法 - `Condition`:這裡只是一提,不會有什麼有意義的內容 - `ReentrantLock`全部原始碼理解:個人閱讀`ReentrantLock`原始碼的時候做的註釋,需要結合`AQS`一塊理解 ##Tips: ------- - 同步等待佇列:即普遍的資料中提到的同步佇列,由`AQS`維護。在程式碼的英文註釋中寫道wait queue,因此我在這裡翻譯成同步等待佇列 - 條件等待佇列:即普遍的資料中提到的等待佇列,由`AQS.Condition`維護,在程式碼的英文註釋中也是寫道`wait queue`,因此我在這裡翻譯成條件等待佇列 - 本篇文章會大量提到`AQS`這個類,並且大量的方法都在`AQS`中實現,本文對會對使用到的方法進行解釋,但是對於`AQS`內部的屬性沒有過多解釋,後續篇章寫`AQS`會專門寫,建議可以瞭解一下,有助於理解 - 建議閱讀前先了解`AQS`中的`Node`類,有助於閱讀,本文不會說明 ## ReentrantLock簡介 ---------- 在多執行緒程式設計中,同步和互斥是一個非常重要的問題。 在java中可以通過使用`synchronized`來實現共享資源的獨佔, 除此之外還可以使用`Lock`提供的方法來實現對共享資源的獨佔。 而且`Lock`對比`synchronized`具有更高的靈活性。 `ReentrantLock`是`Lock`介面的一種實現,它提供了公平和非公平兩種鎖的公平機制供開發者選擇, 並且實現了鎖的可重入性(指的是對同一臨界資源重複加鎖,注意:加鎖多少次就一定要解鎖多少次)。 ## 公平機制 ------- ###概念 `ReentrantLock`提供了公平鎖和非公平鎖兩個版本供開發者選擇: - 公平鎖:所有請求獲取鎖的執行緒按照先後順序排隊獲取鎖,下一個獲取鎖的執行緒一定是等候獲取鎖時間最長的執行緒,所得獲取滿足`FIFO`特點。 - 非公平鎖:請求獲取鎖的執行緒不一定需要排隊,只要鎖沒有被獲取,不論是否有執行緒在等待獲取所,他就可以進行加鎖操作。eg:所有在佇列中等待鎖的執行緒都沒有被作業系統排程到並且鎖沒有被獲取,此時正在指定的執行緒需要獲取鎖就可以直接嘗試獲取鎖 ###對比 ||公平鎖|非公平鎖| |:----: |:---:|:-----: | |優點 |所有的執行緒都可以獲取到資源不會餓死在隊列當中|可以減少CPU喚醒執行緒的開銷,整體的吞吐效率會高點,CPU也不必取喚醒所有執行緒,會減少喚起執行緒的數量 | |缺點 |吞吐量會下降很多,佇列裡面除了第一個執行緒,其他的執行緒都會阻塞,cpu喚醒阻塞執行緒的開銷會很大|可能導致佇列中間的執行緒一直獲取不到鎖或者長時間獲取不到鎖,導致餓死| ##實現 --------- ###抽象類`Sync` 公平與非公平的實現主要靠鎖的同步器來實現,他們都是內部抽象類`Sync`的子類(姑且稱之為抽象同步器)。 $\color{#FF3030}{Sync及父類AQS提供了整個加鎖和解鎖的過程及排隊等待的過程,}$並暴露出抽象方法`lock()`供實現不同的鎖公平機制。 `ReentrantLock`中`Sync`的子類`NonfairSync`提供非公平鎖同步機制,`FairSync`提供公平的鎖同步機制。 程式碼的解釋請看註釋 ``` /** * Base of synchronization control for this lock. Subclassed * into fair and nonfair versions below. Uses AQS state to * represent the number of holds on the lock. */ //提供了這個鎖的同步器的基礎方法,子類NonfairSync和FairSync提供了公平和非公平兩種同步器的實現 //使用AQS的state來標識鎖的狀態 abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = -5179523762034025860L; /** * Performs {@link Lock#lock}. The main reason for subclassing * is to allow fast path for nonfair version. */ //抽象方法加鎖,加鎖過程交給子類實現以提供不同的公平機制 abstract void lock(); /** * Performs non-fair tryLock. tryAcquire is implemented in * subclasses, but both need nonfair try for trylock method. */ //預設提供了非公平機制的加鎖過程 //acquires 申請加鎖的次數,一般情況下是一次,但是有多次的情況,在Condition中會看到 final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); //獲取鎖的狀態,getState在AQS中實現 int c = getState(); //鎖空閒 if (c == 0) { //加鎖,加鎖成功設定鎖的屬於哪個執行緒資訊 if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } //當前執行緒已經成功獲取了鎖,這塊也是鎖的可重入性的體現 else if (current == getExclusiveOwnerThread()) { //將鎖的持有次數加給定的次數即可 int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); //設定加鎖次數 setState(nextc); return true; } return false; } //釋放鎖的過程 //releases釋放鎖的次數,一般情況下是一次,但是有多次的情況,在Condition中會看到 protected final boolean tryRelease(int releases) { //getState在AQS中實現 int c = getState() - releases; //獨佔鎖釋放鎖的時候誰獲取的鎖誰用完釋放,期間不許其他執行緒使用 /如果鎖的擁有者不是當前執行緒程式碼結構則出了問題 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); //由於可重入性,只有在釋放releases次後鎖狀態為0才完全釋放鎖,鎖才不被佔有 boolean free = false; //如果釋放鎖後鎖狀態為0,則表示當前執行緒不再持有這個鎖 //則將持有鎖的執行緒exclusiveOwnerThread置null if (c == 0) { free = true; setExclusiveOwnerThread(null); } //設定鎖狀態,,在AQS中實現 setState(c); return free; } //檢查當前執行緒是否持當前鎖 protected final boolean isHeldExclusively() { // While we must in general read state before owner, // we don't need to do so to check if current thread is owner return getExclusiveOwnerThread() == Thread.currentThread(); } //條件等待佇列 //具體實現在AQS中實現 final ConditionObject newCondition() { return new ConditionObject(); } // Methods relayed from outer class //獲取當前鎖的持有者 final Thread getOwner() { return getState() == 0 ? null : getExclusiveOwnerThread(); } //獲取當前執行緒持有鎖的次數 final int getHoldCount() { return isHeldExclusively() ? getState() : 0; } //檢查鎖是否被持有 final boolean isLocked() { return getState() != 0; } /** * Reconstitutes the instance from a stream (that is, deserializes it). */ private void readObject(java.io.ObjectInputStream s) throws java.io.IOException, ClassNotFoundException { s.defaultReadObject(); setState(0); // reset to unlocked state } } ``` ###公平鎖 公平鎖提供了一種絕對等待時間公平的機制,鎖永遠會被同步等待佇列中等待時間最長的獲取到, 這樣可以保證每個在等待的執行緒在程式不退出的情況下都可以獲取到鎖。 但是每一個請求的鎖的執行緒都將進入到同步等待佇列中阻塞休眠,執行緒的休眠和喚醒需要耗費額外的時間,會降低效率,降低吞吐量 (整個在同步等待佇列中阻塞休眠的操作不是絕對的,只有所沒有被佔有,並且同步等待佇列為空時可以直接獲取鎖或者遞迴呼叫同一個執行緒獲取鎖) ####公平鎖的同步器 ``` /** * Sync object for fair locks */ static final class FairSync extends Sync { private static final long serialVersionUID = -3000897897090466540L; final void lock() { //呼叫AQS提供的請求加鎖的方法 //緊接著下一個程式碼片段解釋 acquire(1); } /** * Fair version of tryAcquire. Don't grant access unless * recursive call or no waiters or is first. */ //公平版本的tryAcquire, //第一個獲取鎖的執行緒和已經獲取鎖的執行緒遞迴呼叫獲取鎖不需要再排隊等待 protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { //判斷同步等待佇列中是否有其他等待獲取鎖的執行緒 //沒有並且鎖沒有被其他執行緒持有則可以直接獲取鎖 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } //如果當前執行緒持有鎖,則可以鎖計數器加一讓該執行緒再一次獲取鎖 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } //否則當前執行緒在這次嘗試獲取鎖之前就沒有獲取到鎖 //同步等待佇列中有其他執行緒在嘗試獲取鎖 //則獲取鎖失敗,下一步應該會被加入到同步等待佇列 return false; } } ``` 簡單看下`hasQueuedPredecessors()`方法,這裡只解釋滿足公平鎖加鎖條件的情況 ``` /** * Queries whether any threads have been waiting to acquire longer * than the current thread. */ //這個方法是再AQS中提供的用來判斷線同步等待佇列中是否還有等待時間比當前執行緒等待時間更長的執行緒 //tail是佇列的尾節點,head是頭節點,每個節點會代表一個執行緒首尾節點除外 //再tryAcquire中我們希望它返回的時false那麼看下返回false代表那種情況 //返回false要求 h != t && ((s = h.next) == null|| s.thread != Thread.currentThread())為false,那麼要麼h==t就是頭尾節點是同一個,佇列為空 //要麼(s = h.next) == null|| s.thread != Thread.currentThread()為false, //這就要求h.next!=null 並且h.next就是當前執行緒,也就是說佇列中第一個等待獲取鎖的執行緒就是當前執行緒 //那麼就可以直接加鎖; public final boolean hasQueuedPredecessors() { // The correctness of this depends on head being initialized // before tail and on head.next being accurate if the current // thread is first in queue. Node t = tail; // Read fields in reverse initialization order Node h = head; Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); } ``` ####公平鎖的加鎖 在公平鎖的同步器中提供了`lock()`方法用來進行加鎖操作, 可重入鎖進行加鎖的方法也確實是呼叫同步器的`lock()`來實現加鎖 加鎖入口:`lock()`方法;`lock()`方法定義在`ReentrantLock`類裡面,通過呼叫同步器的加鎖操作完成加鎖過程,公平機制不影響對外暴露的介面 ``` /** * Acquires the lock. * *

Acquires the lock if it is not held by another thread and returns * immediately, setting the lock hold count to one. * *

If the current thread already holds the lock then the hold * count is incremented by one and the method returns immediately. * *

If the lock is held by another thread then the * current thread becomes disabled for thread scheduling * purposes and lies dormant until the lock has been acquired, * at which time the lock hold count is set to one. */ public void lock() { sync.lock(); } ``` 官方的註釋說明了`lock`操作會做那些事情: 1. 獲取鎖,如果所沒有被其他執行緒獲取,那麼將獲取這個鎖並返回,並設定這個鎖的狀態為1標識被加鎖一次(鎖被獲取一次) 2. 如果當前執行緒已經持有鎖,那麼鎖計數器自增1 3. 如果鎖被其他執行緒持有,那麼當前執行緒會被阻塞進入睡眠狀態並進入同步的等待佇列,直到鎖被獲取,然後將鎖的計數器設定為1 $\color{#00FF00}{這裡面好像沒說明公平的方式哈,只要鎖沒被獲取就立刻獲取,感覺公平鎖是綠的}$ 加鎖的過程直接呼叫了同步器的`lock()`方法在上述的同步器中`lock()`呼叫`acquire()`方法 `acquire()`方法在`AQS`中實現 具體解釋見註釋 ``` /** * Acquires in exclusive mode, ignoring interrupts. Implemented * by invoking at least once {@link #tryAcquire}, * returning on success. Otherwise the thread is queued, possibly * repeatedly blocking and unblocking, invoking {@link * #tryAcquire} until success. This method can be used * to implement method {@link Lock#lock}. * * @param arg the acquire argument. This value is conveyed to * {@link #tryAcquire} but is otherwise uninterpreted and * can represent anything you like. */ //以獨佔的方法加鎖,並且忽略中斷 //那是不是還有響應中斷的加鎖呢?? public final void acquire(int arg) { //先嚐試呼叫同步器的tryAcquire()方法加鎖 if (!tryAcquire(arg) && //加鎖失敗的情況下將當前執行緒放入同步等待佇列中 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) //acquireQueued返回值是執行緒在等待期間是否被中斷,如果有則還原中斷現場 selfInterrupt(); } ``` `addWaiter()`方法,使用當前執行緒構造一個同步等待佇列的節點,並且放在隊尾,在`AQS`中實現 ``` /** * Creates and enqueues node for current thread and given mode. * * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared * @return the new node */ private Node addWaiter(Node mode) { //為當前的執行緒構造一個同步等待佇列中的節點,在可重入鎖中式排他的模式(mode==Node.EXCLUSIVE) Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; //如果隊尾不為空 if (pred != null) { node.prev = pred;//將當前節點放在隊尾 //使用CAS操作原子的設定隊尾節點 if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } //如果對尾節點為空或者CAS設定隊尾節點失敗呼叫enq方法將當前執行緒節點新增進佇列 enq(node); return node; } ``` `enq()`方法:如果佇列不存在或者使用CAS操作使節點入隊失敗,則進入此方法構造佇列並將節點入隊,在`AQS`中實現 ``` /** * Inserts node into queue, initializing if necessary. See picture above. * @param node the node to insert * @return node's predecessor */ private Node enq(final Node node) { //用一個死迴圈將當前節點新增進佇列,如果佇列不存在則建立佇列 for (;;) { Node t = tail; //尾節點為空則堆類不存在進行初始化佇列,建立頭節點 if (t == null) { // Must initialize //使用CAS操作建立頭節點, //為什麼式CAS操作,試想剛判斷出佇列不存在需要建立頭節點, //此時執行緒發生執行緒的排程當前執行緒阻塞,另一個執行緒做同樣的操作並建立了佇列 //當前執行緒再次被喚醒後繼續建立佇列,會有執行緒安全問題 if (compareAndSetHead(new Node())) //尾節點指向頭節點 tail = head; //佇列存在死迴圈的將當前執行緒對應的節點放入到隊列當中 } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } } ``` `acquireQueued()`方法,對於排隊等待的執行緒應當使其進行阻塞,減少排程以及CPU空轉的時間,除非下一個就到了這個執行緒獲取鎖,在`AQS`中實現 這個方法的設計上如果自身是頭節點的後繼節點,那麼有可能頭節點會很快處理完成任務釋放鎖,自己就可以獲取到鎖,避免進行執行緒阻塞、喚醒操作,減少資源消耗 ``` /** * Acquires in exclusive uninterruptible mode for thread already in * queue. Used by condition wait methods as well as acquire. * * @param node the node * @param arg the acquire argument * @return {@code true} if interrupted while waiting */ final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { //當前執行緒中斷的標誌位 boolean interrupted = false; for (;;) { //獲取當前節點的前驅節點 final Node p = node.predecessor(); //前驅節點是頭節點則嘗試呼叫同步器的tryAcquire獲取鎖 //第一次進入者方法時嘗試一次獲取鎖,獲取失敗則會進行判斷是否需要進行阻塞 //如果需要阻塞則阻塞,如果不需要阻塞則會將前驅節點的waitStatus設定為SIGNAL, //再次迴圈獲取鎖失敗,再次進入判斷是否需要阻塞時一定會被阻塞 //獲取失敗即便先驅節點是頭節點也會被阻塞 if (p == head && tryAcquire(arg)) { //成功獲取鎖則將當前節點設定為頭節點 setHead(node); //原先區節點的下一個節點置空, p.next = null; // help GC //怎麼help個人理解當當前節點稱為頭節點再被釋放的時候,那麼當前節點可以做到不可達,從而gc failed = false; //返回執行緒在佇列中等待期間是否被中斷 return interrupted; } //在先驅節點不是頭結點的情況下阻塞當前執行緒並使其睡眠 //直到被其他執行緒喚醒,這樣可以減少CPU的空轉,提高效率 //從阻塞喚醒後繼續for迴圈直到獲取到鎖, if (shouldParkAfterFailedAcquire(p, node) && //阻塞當前執行緒直到有其他執行緒喚醒,並返回中斷資訊 parkAndCheckInterrupt()) //如果線上程阻塞休眠期間執行緒被中斷則設定終端標記位, interrupted = true; } } finally { //如果沒有獲取到鎖(獲取鎖的過程出了意外),或者取消了獲取鎖,則取消當前執行緒獲取鎖的操作 if (failed) cancelAcquire(node); } } ``` `shouldParkAfterFailedAcquire()`方法,在`AQS`中實現 ``` /** * Checks and updates status for a node that failed to acquire. * Returns true if thread should block. This is the main signal * control in all acquire loops. Requires that pred == node.prev. * * @param pred node's predecessor holding status * @param node the node * @return {@code true} if thread should block */ private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { //獲取先驅節點的等待狀態 int ws = pred.waitStatus; //SIGNAL前驅節點準備好喚醒後繼節點,後繼節點可以安全的阻塞 if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; //大於0表示先驅節點已經被取消獲取鎖及排隊 if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ //迴圈向前找到一個沒有取消的節點 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); //更新先驅節點 pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ //// 更新pred結點waitStatus為SIGNAL compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } ``` `parkAndCheckInterrupt()`方法,呼叫`LockSupport.park()`阻塞指定執行緒,在`AQS`中實現 ```$xslt /** * Convenience method to park and then check if interrupted * * @return {@code true} if interrupted */ private final boolean parkAndCheckInterrupt() { //阻塞當前執行緒,直到其他執行緒呼叫LockSupport.unpark(當前執行緒), //使得呼叫unpark的執行緒釋放鎖後當前執行緒被喚醒並返回在阻塞期間執行緒是否被中斷 LockSupport.park(this); return Thread.interrupted(); } ``` `cancelAcquire()`方法,取消獲取鎖,在`AQS`中實現 ``` /** * Cancels an ongoing attempt to acquire. * * @param node the node */ private void cancelAcquire(Node node) { // Ignore if node doesn't exist if (node == null) return; //節點的執行緒置null node.thread = null; // Skip cancelled predecessors Node pred = node.prev; //如果前驅節點已經取消,那麼迴圈向前找到一個沒有取消的節點並設定當前節點的前驅節點 while (pred.waitStatus > 0) node.prev = pred = pred.prev; // predNext is the apparent node to unsplice. CASes below will // fail if not, in which case, we lost race vs another cancel // or signal, so no further action is necessary. Node predNext = pred.next; // Can use unconditional write instead of CAS here. // After this atomic step, other Nodes can skip past us. // Before, we are free of interference from other threads. //設定當前節點狀態為已經取消獲取鎖 node.waitStatus = Node.CANCELLED; // If we are the tail, remove ourselves. //如果節點自身時隊尾,則移除節點並使用CAS操作設定隊尾 if (node == tail && compareAndSetTail(node, pred)) { compareAndSetNext(pred, predNext, null); } else { // If successor needs signal, try to set pred's next-link // so it will get one. Otherwise wake it up to propagate. int ws; //前驅節點不是頭節點,則當前節點就需要阻塞 //前驅節點執行緒不為null,則保證存在前驅節點 //前驅節點沒有被取消,waitStatus可以被設定為SIGNAL保證可以喚醒後繼節點 if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { Node next = node.next; //寫一個節點存在並且沒有被取消,則CAS的將前驅節點的後繼節點設定為當前節點的後繼節點 if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); } else { //否則喚醒後繼執行緒 unparkSuccessor(node); } node.next = node; // help GC } } ``` 至此,公平鎖的加鎖操作全部結束(或者已經取消獲取鎖),在同步等待佇列中等待的執行緒,如果對應的節點的先驅節點不是頭節點,則執行緒會被阻塞減少排程。 公平鎖的公平的保證時靠加鎖時判斷當前鎖對應的同步等待佇列中存在等待的佇列,如果有則當前執行緒入隊,排隊來保證的; 如果當前鎖沒有被獲取,並且佇列不存在或者佇列中沒有等待的執行緒則可以直接加鎖。回到`acquire()`方法,如果線上程等等待的過程中發生了中斷, 那麼獲取到所之後會還原中斷。 #### 公平鎖的解鎖 通過加鎖的過程可以發現,鎖被獲取的次數通過給`state`欄位增加和設定鎖所屬的執行緒`exclusiveOwnerThread`來完成加鎖操作, 那麼當執行緒需要解鎖的時候應該也是對這兩個欄位的操作,且解鎖一定在加鎖之後,因此不存在進入同步等待佇列等待的過程。 解鎖入口:`unlock()`方法;`unlock()`方法定義在`ReentrantLock`類裡面,通過呼叫同步器的解鎖操作完成解鎖過程,公平機制不影響對外暴露的介面 程式碼具體解釋見註釋 ``` /** * Attempts to release this lock. * *

If the current thread is the holder of this lock then the hold * count is decremented. If the hold count is now zero then the lock * is released. If the current thread is not the holder of this * lock then {@link IllegalMonitorStateException} is thrown. * * @throws IllegalMonitorStateException if the current thread does not * hold this lock */ //嘗試釋放獲取的鎖,呼叫到release方法,這個方法在AQS中實現 public void unlock() { sync.release(1); } ``` `release()`方法,在`AQS`中實現 ``` /** * Releases in exclusive mode. Implemented by unblocking one or * more threads if {@link #tryRelease} returns true. * This method can be used to implement method {@link Lock#unlock}. * * @param arg the release argument. This value is conveyed to * {@link #tryRelease} but is otherwise uninterpreted and * can represent anything you like. * @return the value returned from {@link #tryRelease} */ //釋放持有的排他鎖 public final boolean release(int arg) { //呼叫tryRelease()來釋放鎖,由同步器實現 //tryRelease方法的解釋在同步器章節 //如果有執行緒在同步等待佇列中等待獲取鎖, //那麼還應該喚醒等待的執行緒 if (tryRelease(arg)) { //如果存在同步等待佇列,那麼當前節點解鎖成功後回將自身節點設定為頭節點 //因此這裡的頭節點就是自身當前執行緒的節點 //但是在加鎖成功的時候會將節點的thread欄位設定為null,因此無法比對判斷 Node h = head; //後繼執行緒在阻塞前以前會將前驅結點的waitStatus設定為SIGNAL=-1,因此不為0即需要喚醒後繼節點 //為什麼是不為0,而不是等於-1??因為node有1,-1,-2,-3和0五種情況 //0代表預設狀態,node節點剛被建立, //1代表當前節點已經取消獲取鎖, //-1代表有後繼節點需要喚醒 //-2代表節點在條件等待佇列中等待,也就是不會出現在同步等待佇列 //-3代表共享模式, //如果在獲取到鎖並且已經存在後繼節點的時候取消獲取鎖,那麼節點就會使1, //直接點執行緒被喚醒完成加鎖操作後釋放鎖,他的waitStatus使1而不是-1,因此使用的是waitStatus != 0 if (h != null && h.waitStatus != 0) //喚醒後繼節點 unparkSuccessor(h); return true; } return false; } ``` `unparkSuccessor()`方法喚醒後繼節點,在`AQS`中實現 ``` /** * Wakes up node's successor, if one exists. * * @param node the node */ //喚醒頭結點的後繼節點 private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; //頭節點的狀態<0,則在發出喚醒訊號之前嘗試清除這個狀態,即將頭節點的狀態設定為0, //允許失敗或者被等待的執行緒改變 if (ws < 0) compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ Node s = node.next; //下一個節點不存在或者取消了獲取鎖,則沿著佇列從後往前找到第一個沒有取消的節點 if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } //喚醒沒有取消獲取鎖的第一個節點 if (s != null) LockSupport.unpark(s.thread); } ``` 至此可重入鎖的公平模式的加鎖和解鎖全部結束 問題:在解鎖的最後一步呼叫了`LockSupport.unpark()`來解鎖, 而一個執行緒`B`進入同步等待佇列阻塞的時候根據先去接點的`waitState==-1`來判斷是否需要阻塞, 那麼在他判斷完前驅節點執行緒`A` `waitState==-1`成立然後發生系統排程,執行其他執行緒, 而這時候執行緒`A`獲取鎖並解鎖呼叫了`LockSupport.unpark()`,然後執行執行緒`B`, 執行緒`B`會執行阻塞的過程,執行緒`B`會被阻塞掉,然後後面的節點都不能獲取鎖麼? ### 非公平鎖 除了公平的加鎖方式,可重入鎖還提供了非公平模式(預設)的加鎖。在非公平模式下只要鎖還沒有被其他執行緒獲取,就有機會成功獲取鎖, 當然已加入到佇列中的執行緒還是要按照順序排隊獲取。這樣做會減少需要阻塞、喚醒的執行緒,降低由於阻塞、喚醒帶來的額外開銷, 但是在佇列中等待的執行緒可能會被活活餓死(很慘的那種,出了問題排查的時候) #### 非公平鎖同步器 ``` /** * Sync object for non-fair locks */ static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L; /** * Performs lock. Try immediate barge, backing up to normal * acquire on failure. */ final void lock() { //和公平鎖最大的區別 //如果當前鎖沒有被其他執行緒獲取則直接嘗試加鎖 //沒有被其他執行緒獲取體現在引數值是0 if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else //呼叫AQS的acquire方法請求加鎖,和公平鎖一致 acquire(1); } protected final boolean tryAcquire(int acquires) { //呼叫父類Sync的nonfairTryAcquire請求加鎖 return nonfairTryAcquire(acquires); } } ``` ####非公平鎖加鎖 加鎖入口同公平模式: ``` public void lock() { //都是呼叫到同步器的lock方法 sync.lock(); } ``` `lock()`直接呼叫同步器實現的`lock()` ``` /** * Performs lock. Try immediate barge, backing up to normal * acquire on failure. */ final void lock() { //在這裡不管是否有執行緒在排隊,直接嘗試加鎖 if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else //加鎖失敗則呼叫AQS的acquire方法 acquire(1); } ``` `acquire()`方法在`AQS`中實現,和公平鎖的一致,方便閱讀就再寫一次 具體解釋見註釋 ``` //以獨佔的方法加鎖,並且忽略中斷 //那是不是還有響應中斷的加鎖呢?? public final void acquire(int arg) { //先嚐試呼叫同步器的tryAcquire()方法加鎖 if (!tryAcquire(arg) && //加鎖失敗的情況下將當前執行緒放入同步等待佇列中 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) //acquireQueued返回值是執行緒在等待期間是否被中斷,如果有則還原中斷現場 selfInterrupt(); } ``` 同步器的`tryAcquire()`方法見非公平模式的同步器,會呼叫到`Sync`的`nonfairTryAcquire()`方法, 如果加鎖失敗則會依次構建同步等待佇列->嘗試加鎖->失敗則判斷是否需要進行阻塞->是則阻塞等待前驅節點喚醒->嘗試加鎖這樣的流程 這個流程同公平鎖 ``` //預設提供了非公平機制的加鎖過程 //acquires 申請加鎖的次數,一般情況下是一次,但是有多次的情況,在Condition中會看到 final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); //獲取鎖的狀態,getState在AQS中實現 int c = getState(); //鎖空閒 if (c == 0) { //加鎖,加鎖成功設定鎖的屬於哪個執行緒資訊 if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } //當前執行緒已經成功獲取了鎖,這塊也是鎖的可重入性的體現 else if (current == getExclusiveOwnerThread()) { //將鎖的持有次數加給定的次數即可 int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); //設定加鎖次數 setState(nextc); return true; } return false; } ``` #### 非公平鎖的解鎖 非公平鎖的解鎖過程同公平鎖,釋放過程不存在公平於非公平,具體邏輯全部由`Sync`和`AQS`實現; ## 公平鎖和非公平鎖的加鎖流程圖 --------- ![](https://img2020.cnblogs.com/blog/1669869/202005/1669869-20200529172156435-770504418.png) ##ReentrantLock提供的一些其他方法 ------ |方法名稱|返回值型別|方法描述| | :---: | :---: | :---: | |`lockInterruptibly()`|`void`|以響應中斷的方式獲取鎖,如果在鎖獲取之前發生中斷直接丟擲中斷異常,在從同步等待佇列中被喚醒後檢查在等待期間是否有中斷髮生,如果有則丟擲中斷異常| |`tryLock()`|`boolean`|嘗試一次以非公平模式獲取鎖,當且僅當鎖沒有被獲取時有可能獲取成功,即便鎖私用的是公平模式,直接呼叫 `Sync`的非公平模式獲取一次鎖,返回獲取結果| |`tryLock(long timeout, TimeUnit unit)`|`boolean`|含有超時等待功能的獲取鎖,如果執行緒進入同步等待佇列阻塞,則只會阻塞指定的時間,這個功能由`LockSupport`類提供| |`newCondition()`|`Condition`|獲取一個`Condition`物件,可以提供類似`Object Moitor`一樣的功能| |`getHoldCount()`|`int`|獲取持有鎖的次數,一般用於測試鎖| |`isHeldByCurrentThread`|`boolean`|檢查當前執行緒是否持有這個鎖| |`isLocked`|`boolean`|檢查鎖是否被任何一個執行緒獲取| |`isFair()`|`boolean`|檢查當前鎖是否是公平鎖| |`getOwner()`|`Thread`|獲取持有鎖的執行緒| |`hasQueuedThreads()`|`boolean`|同步等待佇列是否有執行緒等待獲取鎖| |`hasQueuedThread(Thread thread)`|`boolean`|判斷指定執行緒是否在同步等待佇列中等待| |`getQueueLength()`|`int`|獲取同步等待對列的長度,佇列中的執行緒不一定都是等待獲取鎖的執行緒,還有可能已經取消| |`hasWaiters(Condition condition)`|`boolean`|判斷給定的`condition`中是否由執行緒等待| |`getWaitQueueLength(Condition condition)`|`int`|獲取給定`condition`中等待佇列的長度| |`getWaitingThreads(Condition condition)`|`Collection

Serialization of this class behaves in the same way as built-in * locks: a deserialized lock is in the unlocked state, regardless of * its state when serialized. * *

這個鎖最多支援衝入2147483647 次(遞迴呼叫) * */ public class ReentrantLock implements Lock, java.io.Serializable { private static final long serialVersionUID = 7373984872572414699L; /** Synchronizer providing all implementation mechanics */ private final Sync sync; /** * Base of synchronization control for this lock. Subclassed * into fair and nonfair versions below. Uses AQS state to * represent the number of holds on the lock. */ abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = -5179523762034025860L; /** * Performs {@link Lock#lock}. The main reason for subclassing * is to allow fast path for nonfair version. */ abstract void lock(); /** * Performs non-fair tryLock. tryAcquire is implemented in * subclasses, but both need nonfair try for trylock method. */ final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; } protected final boolean isHeldExclusively() { // While we must in general read state before owner, // we don't need to do so to check if current thread is owner return getExclusiveOwnerThread() == Thread.currentThread(); } final ConditionObject newCondition() { return new ConditionObject(); } // Methods relayed from outer class final Thread getOwner() { return getState() == 0 ? null : getExclusiveOwnerThread(); } final int getHoldCount() { return isHeldExclusively() ? getState() : 0; } final boolean isLocked() { return getState() != 0; } /** * Reconstitutes the instance from a stream (that is, deserializes it). */ private void readObject(java.io.ObjectInputStream s) throws java.io.IOException, ClassNotFoundException { s.defaultReadObject(); setState(0); // reset to unlocked state } } /** * 可重入鎖非公平同步器 */ static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L; /** * 加鎖. Try immediate barge, backing up to normal * acquire on failure. */ final void lock() { //使用cas操作加鎖,成功的話將鎖的擁有執行緒置為自己 if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else //排他模式獲取,忽略中斷,通過再次呼叫tryAcquire獲取鎖, //獲取失敗將進入等待佇列,可能會重複的阻塞和取消阻塞, //直到呼叫tryAcquire獲取鎖成功 acquire(1); } //實現AQS的tryAcquire()方法(該方法進行一次嘗試獲取鎖) protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } } /** * 可重入鎖的公平同步器 */ static final class FairSync extends Sync { private static final long serialVersionUID = -3000897897090466540L; //公平模式與非公平模式的區別,公平模式下所有的所獲取都會檢查當前的等待佇列 //非公平鎖可以進行搶佔,即便有等待佇列中有等待的執行緒,只要鎖物件沒有加鎖成功,既可以被加鎖 final void lock() { acquire(1); } /** * 實現AQS的tryAcquire方法,加鎖時會檢查當前鎖的等待佇列的實際元素 * 當等待佇列為空(頭節點==尾節點)或者頭結點的下一個節點對應的執行緒試當前的執行緒可以加鎖成功 * 如果當前執行緒已經持有 */ protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } } /** * 建立一個可重入鎖.非公平模式 * 與使用 {@code ReentrantLock(false)}.建立效果一樣 */ public ReentrantLock() { sync = new NonfairSync(); } /** * 建立給定模式(公平或者非公平)的可重入鎖 * @param fair {@codetrue} 使用公平模式 */ public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); } /** * 獲取鎖. * *

如果這個鎖沒有被其他執行緒持有,則獲取鎖並立刻返回, * 將鎖的計數器記為1 * * 如果當前執行緒已經持有鎖,則將鎖的計數器自增1, * 並且立刻返回 * *

如果鎖被其他執行緒持有 * 那麼當前執行緒將會因為執行緒排程原因設定禁用並且為休眠狀態, * 並且在鎖被獲取之前處於休眠狀態 * at which time the lock hold count is set to one. */ public void lock() { sync.lock(); } /** * 獲取鎖直到當前執行緒被阻塞 * *

如果鎖沒被其他的執行緒獲取則獲取鎖並返回 * * 如果鎖已經被當前執行緒獲取,則將鎖的計數器自增 * * 如果鎖被其他執行緒持有 * 處於執行緒排程的目的會將當前執行緒設定為禁用並且為休眠狀態 * 直到以下兩種情況中的一種發生: * *

    * *
  • 當前執行緒獲取到鎖; or * *
  • 其他的執行緒中斷了當前的執行緒 * *
* *

如果當前執行緒獲取到了鎖,那麼將鎖的計數器設定為1 * *

如果當前執行緒 * * 在進入這個方法的時候中斷狀態已經被設定; * 或者在獲取鎖的過程中被中斷 * * InterruptedException異常將會被丟擲並且清除中斷標記位 * * @throws InterruptedException if the current thread is interrupted */ public void lockInterruptibly() throws InterruptedException { //該方法會先檢查當前執行緒是否被中斷,然後呼叫tryAcquire獲取鎖 //獲取失敗則呼叫doAcquireInterruptibly //這個方法和忽略中斷的加鎖acquireQueue基本相同,不同點是 //當執行緒在呼叫LockSupport.park(this)被喚醒後,Lock方法會忽略是否該執行緒被中斷, //若被中斷該方法會丟擲執行緒中斷異常 sync.acquireInterruptibly(1); } /** * 當且僅當鎖沒被其他執行緒獲取時才獲取鎖 * *

當且僅當鎖沒被其他執行緒獲取時獲取鎖並且返回ture,設定鎖的計數器為1 * 即使這個鎖被設定為公平鎖,只要沒有其他執行緒獲取到鎖,使用tryLock也能立刻獲取到鎖 * 無論是否有其他執行緒在排隊 * 即使這種行為破壞了公平,但在某些情況下是有用的 * 如果想要這個鎖的公平設定可以使用 * {@link #tryLock(long, TimeUnit) tryLock(0, TimeUnit.SECONDS) } * 這兩者幾乎是的等價的 (依然會檢測中斷). * * 如果當前執行緒已經持有鎖,那麼將鎖計數器自增加一 * * 如果當前鎖被其他執行緒持有,那麼返回false * * @return {@code true} if the lock was free and was acquired by the * current thread, or the lock was already held by the current * thread; and {@code false} otherwise */ public boolean tryLock() { //與非公平鎖tryAcquire方法內容相同 return sync.nonfairTryAcquire(1); } /** * 在超時等待的時間內獲取到鎖並且執行緒沒有被中斷 * * 如果在超時等待的時間內獲取到鎖並且沒有被中斷,鎖沒有被其他執行緒持有 * 將返回true,同時將鎖計數器設定為1 * 如果當前鎖使用的是公平鎖的模式 * 有其他的執行緒在等待這個鎖,都不能獲取到鎖這個功能由(tryAcquire實現) * 如果想要在超時等待的時間內破壞鎖的公平性獲取鎖可以和TryLock聯合使用 * *

 {@code
     * if (lock.tryLock() ||
     *     lock.tryLock(timeout, unit)) {
     *   ...
     * }}
* *

如果當前的執行緒已經擁有鎖那麼將鎖計數器自增加一(tryAcquire實現) * *

如果鎖被其他執行緒持有那麼處於執行緒排程的目的會將當前執行緒置為不可用 * 並且睡眠直到下面一種事情發生: * *

    * *
  • 鎖被當前執行緒獲取,或者 * *
  • 其他執行緒中斷了當前執行緒 * *
  • 時間超過了設定的等待時間 * *
* *

如果獲取到了鎖將返回ture,並且將鎖計數器設定為1 * *

如果當前執行緒 * *

    * *
  • 在進入這個方法之前,中斷標記位被設定 * * 或者獲取鎖的時候被中斷 * *
* 會丟擲 {@link InterruptedException} 異常,並且清楚中斷標記位 * *

如果超過了設定的等待時間將會返回false(其實還會嘗試獲取一次鎖) * 如果設定的超時等待時間少於等於0,這個方法不會一直等待 * *

這個方法會先響應中斷,而不是鎖的獲取和等待 * * @param timeout the time to wait for the lock * @param unit the time unit of the timeout argument * @return {@code true} if the lock was free and was acquired by the * current thread, or the lock was already held by the current * thread; and {@code false} if the waiting time elapsed before * the lock could be acquired * @throws InterruptedException if the current thread is interrupted * @throws NullPointerException if the time unit is null */ public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(timeout)); } /** * 試圖釋放鎖 * *

如果當前執行緒是鎖的持有者,則鎖的計數器自減1, * 如果鎖的計數器為0,則釋放鎖 * 如果當前執行緒不是鎖的持有者,將丟擲異常 * 釋放鎖成功後如果等待佇列中還有等待的執行緒,那麼呼叫lockSupport.unspark喚醒等待的執行緒 * {@link IllegalMonitorStateException}. * * @throws IllegalMonitorStateException if the current thread does not * hold this lock */ public void unlock() { sync.release(1); } /** * Returns a {@link Condition} instance for use with this * {@link Lock} instance. * *

The returned {@link Condition} instance supports the same * usages as do the {@link Object} monitor methods ({@link * Object#wait() wait}, {@link Object#notify notify}, and {@link * Object#notifyAll notifyAll}) when used with the built-in * monitor lock. * *

    * *
  • If this lock is not held when any of the {@link Condition} * {@linkplain Condition#await() waiting} or {@linkplain * Condition#signal signalling} methods are called, then an {@link * IllegalMonitorStateException} is thrown. * *
  • When the condition {@linkplain Condition#await() waiting} * methods are called the lock is released and, before they * return, the lock is reacquired and the lock hold count restored * to what it was when the method was called. * *
  • If a thread is {@linkplain Thread#interrupt interrupted} * while waiting then the wait will terminate, an {@link * InterruptedException} will be thrown, and the thread's * interrupted status will be cleared. * *
  • Waiting threads are signalled in FIFO order. * *
  • The ordering of lock reacquisition for threads returning * from waiting methods is the same as for threads initially * acquiring the lock, which is in the default case not specified, * but for fair locks favors those threads that have been * waiting the longest. * *
* * @return the Condition object */ public Condition newCondition() { return sync.newCondition(); } /** * 查詢鎖被當前執行緒加鎖的次數 * *

執行緒對於每個加鎖的操作都有一個對應的解鎖操作 * *

這個操作通常只被用來測試和除錯 * For example, if a certain section of code should * not be entered with the lock already held then we can assert that * fact: * *

 {@code
     * class X {
     *   ReentrantLock lock = new ReentrantLock();
     *   // ...
     *   public void m() {
     *     assert lock.getHoldCount() == 0;
     *     lock.lock();
     *     try {
     *       // ... method body
     *     } finally {
     *       lock.unlock();
     *     }
     *   }
     * }}
* * @return the number of holds on this lock by the current thread, * or zero if this lock is not held by the current thread */ public int getHoldCount() { return sync.getHoldCount(); } /** * 檢查當前執行緒是否持有這個鎖 * *

Analogous to the {@link Thread#holdsLock(Object)} method for * built-in monitor locks, this method is typically used for * debugging and testing. For example, a method that should only be * called while a lock is held can assert that this is the case: * *

 {@code
     * class X {
     *   ReentrantLock lock = new ReentrantLock();
     *   // ...
     *
     *   public void m() {
     *       assert lock.isHeldByCurrentThread();
     *       // ... method body
     *   }
     * }}
* *

It can also be used to ensure that a reentrant lock is used * in a non-reentrant manner, for example: * *

 {@code
     * class X {
     *   ReentrantLock lock = new ReentrantLock();
     *   // ...
     *
     *   public void m() {
     *       assert !lock.isHeldByCurrentThread();
     *       lock.lock();
     *       try {
     *           // ... method body
     *       } finally {
     *           lock.unlock();
     *       }
     *   }
     * }}
* * @return {@code true} if current thread holds this lock and * {@code false} otherwise */ public boolean isHeldByCurrentThread() { return sync.isHeldExclusively(); } /** * 檢查這個鎖是否被任意一個執行緒持有(檢查鎖的計數器state是否是0) * 這個方法用來監控系統狀態,而不是用來同步, * * @return {@code true} if any thread holds this lock and * {@code false} otherwise */ public boolean isLocked() { return sync.isLocked(); } /** * 測試當前鎖是公平鎖還是非公平鎖 * * @return {@code true} if this lock has fairness set true */ public final boolean isFair() { return sync instanceof FairSync; } /** * 獲取當前擁有鎖的執行緒,如果沒有執行緒持有,則返回null * 此方法的目的是為了方便構造提供更廣泛的鎖監視設施的子類。 * * @return the owner, or {@code null} if not owned */ protected Thread getOwner() { return sync.getOwner(); } /** * 檢查是否有現成正在等待獲取此鎖(head!=tail) Note that * 因為執行緒取消獲取鎖可能發生在任何時間,所以返回值為true不能保證 * 一定有其他執行緒在獲取此鎖,(比如等待佇列中的執行緒已經取消獲取此鎖) * 這個方法的設計被用來監控系統 * * @return {@code true} if there may be other threads waiting to * acquire the lock */ public final boolean hasQueuedThreads() { return sync.hasQueuedThreads(); } /** * 判斷給定的執行緒是否在等待這個鎖 * 執行緒取消獲取鎖可能發生在任何時間, * true的返回值不代表這個執行緒還在等待獲取這個鎖 * 這個方法的設計被用來監控系統 * * @param thread the thread * @return {@code true} if the given thread is queued waiting for this lock * @throws NullPointerException if the thread is null */ public final boolean hasQueuedThread(Thread thread) { return sync.isQueued(thread); } /** * 返回等待佇列中等待獲取鎖的數量的估計值 * 這個方法返回的是一個估計值因為佇列中的執行緒數量可能在變化 * 這個方法的設計被用來監控系統 * * @return the estimated number of threads waiting for this lock */ public final int getQueueLength() { return sync.getQueueLength(); } /** * 返回一個執行緒集合,集合中的執行緒可能在等待獲取鎖 * 因為執行緒獲取鎖可能被取消所以獲取到的集合不是準確的 * 此方法的目的是為了方便構造提供更廣泛的鎖監視設施的子類 * * @return the collection of threads */ protected Co