JUC之ReentrantLock原始碼分析
ReentrantLock:實現了Lock介面,是一個可重入鎖,並且支援執行緒公平競爭和非公平競爭兩種模式,預設情況下是非公平模式。ReentrantLock算是synchronized的補充和替代方案。
公平競爭:遵從先來後到的規則,先到先得
非公平競爭:正常情況下是先到先得,但是允許見縫插針。即持有鎖的執行緒剛釋放鎖,等待佇列中的執行緒剛準備獲取鎖時,突然半路殺出個程咬金,搶到了鎖,等待佇列中等待獲取鎖的執行緒只能乾瞪眼,接著等搶到鎖的執行緒釋放鎖
ReentrantLock與synchronized比較:
1、ReentrantLock底層是通過將阻塞的執行緒儲存在一個FIFO佇列中,synchronized底層是阻塞的執行緒儲存在鎖物件的阻塞池中
2、ReentrantLock是通過程式碼機制進行加鎖,所以需要手動進行釋放鎖,synchronized是JAVA關鍵字,加鎖和釋放鎖有JVM進行實現
3、ReentrantLock的加鎖和釋放鎖必須在方法體內執行,但是可以不用同一個方法體,synchronized可以在方法體內作為方法塊,也可以在方法宣告上
4、synchronized進行加鎖時,如果獲取不到鎖就會直接進行執行緒阻塞,等待獲取到鎖後再往下執行。ReentrantLock既可以阻塞執行緒等待獲取鎖,也可以設定等待獲取鎖的時間,超過等待獲取時間就放棄獲取鎖,不再阻塞執行緒
ReentrantLock原始碼分析:
/** Synchronizer providing all implementation mechanics */ private final Sync sync; /** * Base of synchronization control for this lock. Subclassed * into fair and nonfair versions below. Uses AQS state to * represent the number of holds on the lock. */ abstract static class Sync extends AbstractQueuedSynchronizer { .... } /** * Sync object for non-fair locks */ static final class NonfairSync extends Sync { .... } /** * Sync object for fair locks */ static final class FairSync extends Sync { .... }
從省略細節的原始碼中我們可以很清晰的看到,ReentrantLock內部定義了三個內部類,全部直接或者間接的繼承AbstractQueuedSynchronizer,ReentrantLock有一個屬性sync,預設情況下為NonfairSync型別,即為非公平鎖。實際上ReentrantLock的所有操作都是有sync這個屬性進行的,ReentrantLock只是一層外皮。
ReentrantLock既然實現類Lock介面,我們就先以加鎖、解鎖進行分析:
1、加鎖(非公平):
public void lock() { sync.lock(); }
就像上面說的一樣,ReentrantLock的功能都是靠底層sync進行實現,ReentrantLock的加鎖很簡單,就一句話使用sync的lock()方法,我們直接看原始碼
final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } protected final boolean compareAndSetState(int expect, int update) { // See below for intrinsics setup to support this return unsafe.compareAndSwapInt(this, stateOffset, expect, update); } protected final void setExclusiveOwnerThread(Thread thread) { exclusiveOwnerThread = thread; }
sync的lock()方法也是比較簡單的,先通過CAS試圖將AQS的state由0變為1。如果成功,表示該執行緒獲取鎖成功(設值時沒有執行緒在持有鎖),就將當前執行緒設定為鎖的擁有者(這就是前面所說得見縫插針,後面還有);如果失敗,只表示設定值沒有成功,不表示該執行緒獲取鎖失敗(因為有可能是重入加鎖),開始呼叫AQS的acquire(int)方法進行獲取鎖,我們直接看acquire(int)方法
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
這是AQS定義的模板方法,由AQS的子類去重寫tryAcquire(int) 方法,由AQS去呼叫,進行嘗試獲取鎖。如果獲取鎖成功,方法直接結束;如果獲取鎖失敗,就將當前執行緒進行阻塞並加入到FIFO的CLH佇列中。我們先看ReentrantLock內部類是如何重寫tryAcquire(int)方法的
protected final boolean tryAcquire(int acquires) {
// 直接呼叫父類nonfairTryAcquire(int)方法 return nonfairTryAcquire(acquires); } final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
我們直接看sync宣告的nonfairTryAcquire(int)方法,先獲取儲存的state,如果值為0,表示當前沒有執行緒持有鎖,處理和前面的一致,直接見縫插針,通過CAS嘗試直接設定值,設值失敗就表示獲取鎖失敗了,直接返回失敗結果;如果state不是0,表示鎖被執行緒持有,就比較下持有鎖的執行緒是否是當前執行緒,如果是,表示執行緒重入持有鎖,進行state值的累加。如果不是,直接返回持有鎖失敗結果。tryAcquire(int)方法獲取鎖失敗後,會去執行AQS宣告的acquireQueued(Node, int)方法將當前執行緒封裝到CLH佇列的節點Node中,並進行阻塞。我們先看看AQS是將當前執行緒封裝到Node中都做了什麼操作
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; // 將隊尾的node改為新建的node if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
將當前執行緒裝進Node例項中,並設定改Node為獨享模式。然後獲取隊尾的Node例項,因為走到這一步的時候有兩種獲取鎖失敗的場景:1、競爭鎖時,沒有執行緒持有鎖;2、競爭鎖時,已有別的執行緒持有鎖。所以會先判斷下隊尾Node是否為null,如果為空,表示是第一種場景獲取鎖失敗,如果不為空,這是第二種場景獲取鎖失敗。先看第2種場景的處理流程,直接將隊尾的Node設定為新建Node例項的prev(表示在佇列中的前一個Node節點),然後通過CAS嘗試將新建的Node節點設定為隊尾(這個時候用CAS是因為有可能存在多執行緒競爭),如果設定隊尾成功,就將前任隊尾的next節點設定為新建的node節點;如果設定隊尾失敗(多執行緒競爭才會出現),和場景1進行相同的處理,先看原始碼
private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
處理很簡單,先獲取隊尾,如果獲取的隊尾為null,表示上一步場景1過來的,通過CAS將隊首設定為一個空的Node節點(該節點表示正在持有鎖的執行緒封裝的Node,僅僅是代表,沒有實值),並將隊尾和隊首指向同一個節點;如果獲取的隊尾不為null,將隊尾設定為引數Node的上一個節點,並通過CAS嘗試將引數Nodo設定為隊尾,如果設定成功,將新隊尾設定為前任隊尾的next節點,並直接返回;如果設定失敗,往復迴圈,直到成功為止。
通過前面對addWaiter(Node)原始碼的分析,我們可以清楚的瞭解到addWaiter方法將當前執行緒封裝到CLH佇列的獨享模式的Node節點中,並通過CAS將當前執行緒的Node節點設定為隊尾。下面我們接著看acquireQueued(final Node node, int arg)方法會做什麼處理
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { // 獲取前任節點 final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
先獲取引數Node節點的前任節點,如果前任節點是隊首時,會去再次呼叫tryAcquire(int)方法去嘗試持有鎖,如果成功持有鎖,就將引數Node直接設定為隊首,同時將前任隊首的next節點設定為null(去除引用,利於GC),然後直接返回當前執行緒是否要進行中斷操作。如果前任節點不是隊首或者再次嘗試持有鎖失敗,會先呼叫shouldParkAfterFailedAcquire(Node pre, Node node)進行判斷是否需要進行執行緒阻塞,如果需要執行緒阻塞再呼叫parkAndCheckInterrupt()進行執行緒阻塞(該方法返回值表示該執行緒是否是中斷狀態)。執行緒阻塞後會等待前任節點釋放鎖時喚醒結束阻塞,執行緒結束阻塞後會迴圈再次去獲取鎖。但是如果結束阻塞後去獲取鎖時,有新的執行緒見縫插針直接獲取到鎖了,那就只能再次在佇列中進行阻塞了。其實shouldParkAfterFailedAcquire和parkAndCheckInterrupt看方法名稱就能猜個大概了,我們還是直接先看shouldParkAfterFailedAcquire(Node pre, Node node)方法的原始碼
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) /* * 這個節點已經設定了狀態,請求釋放訊號通知它,這樣它就可以安全地進行阻塞了。 */ return true; if (ws > 0) { /* * 此時前任被取消了,跳過前任並重試。 */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * 等待狀態必須為0或傳播。將節點狀態設定為SIGNAL,告訴前任節點後面節點需要釋放訊號通知,但先不進行阻塞。呼叫者將需要重試,以確保它不能在阻塞前獲得鎖。 */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
先獲取前任節點的waitStatus(等待狀態),1.如果前任節點的等待狀態為Node.SIGNAL(表示後面的節點需要前任節點釋放鎖時進行通知,結束後面節點的阻塞),直接返回true,執行後面的阻塞流程;2.1如果前任節點的等待轉態值大於0,表示前任節點被取消了(爭奪鎖的執行緒因過了設定時間,獲取鎖失敗,從佇列中刪除節點的時候,Node節點會被設為取消狀態),跳過前任節點,往前找,直到找到不是取消狀態的節點,直接將找到的有效前任節點的next節點設定為當前節點;2.2如果前任節點不是取消狀態(可能是初始值0、等待狀態或者傳播狀態),通過CAS嘗試將前任節點的waitStatus設定為Node.SIGNAL(不管設定成功與否,都直接返回false,後面會再次迴圈執行,用來確保該節點的執行緒在阻塞前一定不會獲取到鎖,因為存在見縫插針去爭取持有鎖的執行緒)。shouldParkAfterFailedAcquire方法返回true,會進行執行緒阻塞,返回false,呼叫層會進行迴圈讓當前執行緒再次獲取一次鎖,失敗後再次被呼叫進行清洗已經取消的節點或者進行前任節點的等待狀態設定為Node.SIGNAL。
下面我們再來看看parkAndCheckInterrupt()方法的原始碼
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
parkAndCheckInterrupt()方法的內容很簡單,使用LockSupport的park(Object blocker)方法進行執行緒阻塞,有興趣的同學可以自行去深入瞭解,等待前任節點呼叫呼叫LockSupport的unpark(Thread thread)喚醒該執行緒。然後在該執行緒結束阻塞後返回該執行緒是否是中斷的狀態。
至此,非公平鎖獲取鎖的流程就分析完了,總結下非公平鎖加鎖流程:
1.不管執行緒有沒有被持有,先嚐試獲取鎖
2.鎖未被持有,直接獲取鎖。2.1獲取鎖成功,結束;2.2獲取鎖失敗,封裝成CLH佇列的Node節點,並進行執行緒阻塞
3.鎖被持有。3.1執行緒重入加鎖,進行state累加,等待釋放鎖時進行減除;3.2封裝成CLH佇列的Node節點,並進行執行緒阻塞
2、定長時間加鎖(非公平)
public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(timeout)); }
沒什麼可說的,依然是使用sync進行操作,我們直接看sync的實現原始碼
public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout); }
這裡直接先判斷當前執行緒是否是中斷狀態(可能是因為ReentrantReadWriteLock中的WriteLock也會使用才進行判斷),如果是中斷狀態,直接拋異常。我們這邊肯定不會是中斷狀態的啦,接著往下走,呼叫tryAcquire(int)方法嘗試獲取鎖,忘了過程的同學請往前翻。如果獲取鎖成功,那就直接結束。獲取鎖失敗時,執行doAcquireNanos(int arg, long nanosTimeout)方法以獨佔定時模式獲取鎖。我們直接看原始碼
private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (nanosTimeout <= 0L) return false; final long deadline = System.nanoTime() + nanosTimeout; final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return true; } nanosTimeout = deadline - System.nanoTime(); if (nanosTimeout <= 0L) return false; if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); if (Thread.interrupted()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
同樣是先建立一個Node節點並放置到CLH佇列的隊尾,然後同樣的是開始進行迴圈處理,不同的地方是,定長獲取鎖使用了LockSupport.pargNanos(Object blocker, long nanos)進行阻塞一段時間,如果線上程阻塞未自動結束時,前一個節點釋放了鎖,該節點一樣會被解除阻塞去爭奪鎖,如果不幸被別的執行緒見縫插針搶去了鎖,那就接著去阻塞定長時間(這個定長時間是根據最初設定的時間和當前時間的差值),等待鎖的釋放。如果超過了定長時間還是沒有獲取到鎖,就會呼叫cacelAcquire(Node node)去刪除該節點。
3、公平競爭加鎖
公平鎖物件FairSync的lock()方法直接呼叫了acquire(int)方法,前面我們分析了,acquire(int)方法會先呼叫tryAcquire(int)方法去嘗試獲取鎖,根據返回結果去判斷是否需要加入到佇列中。下面我們直接FairSync的原始碼
static final class FairSync extends Sync { private static final long serialVersionUID = -3000897897090466540L; final void lock() { acquire(1); } /** * Fair version of tryAcquire. Don't grant access unless * recursive call or no waiters or is first. */ protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } }
原始碼的邏輯比較清晰簡單,先判斷當前鎖是否是空閒狀態(state是否是0),如果是空閒的,就去嘗試獲取鎖,返回爭奪結果;如果不是空閒的,判斷是否是執行緒重入,如果是就累加state,返回成功,如果不是重入,直接返回失敗。如果tryAcquire方法返回了false,那麼就會將該執行緒封裝到CLH佇列的Node中並進行執行緒阻塞,後面的流程和非公平鎖時一致的。
總結下公平鎖加鎖的流程:
1.鎖未被持有,嘗試直接獲取鎖。2.1獲取鎖成功,結束;2.2獲取鎖失敗,封裝成CLH佇列的Node節點,並進行執行緒阻塞
2.鎖被持有。2.1執行緒重入加鎖,進行state累加,等待釋放鎖時進行減除;2.2封裝成CLH佇列的Node節點,並進行執行緒阻塞
4、釋放鎖:
public void unlock() { sync.release(1); }
ReentrantLock的釋放鎖直接呼叫的sync屬性的release(int)方法,實際是直接呼叫的AbstractQueuedSynchronizer的release(int)方法,我們直接看原始碼
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
依然是同樣的配方,呼叫子類重寫的tryRelease(int)方法去真正釋放鎖,如果釋放成功,而且佇列中有節點在等待隊首釋放鎖後進行通知,就會呼叫unparkSuccessor(Node node)去解除下一個節點的執行緒阻塞狀態,讓下一個執行緒去獲取鎖。
我們先看看子類重寫的tryRelease(int)方法
protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; }
原始碼比較簡單,先比較當前執行緒是否是持有鎖的執行緒,如果不是,直接拋異常,說明呼叫者使用不規範,沒有先去獲取鎖。然後進行state的減除,先判斷此時state是否為0,為0表示執行緒完全釋放了鎖。如果為0,就將鎖的持有者變為null。不管最後有沒有完全釋放鎖都會將state設定成新值。
我們再看看unparkSuccessor(Node node)都做了什麼
private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }
此時引數node代表的head,如果head的waitStatus小於0,表示後面節點需要在等待通知鎖被釋放的訊號,先將head的waitStatus改為0,然後去看看head的next節點是存在並且next節點的waitStatus小於0,。如果next節點為null或者waitStatus大於0,就從隊尾tail節點依次往前找,找到head節點後第一個waitStatus不大於0的節點,然後結束該節點的執行緒阻塞狀態;如果head的next節點存在並且waitStatus不大於0,直接解除head的next節點執行緒阻塞狀態。
總結下鎖釋放過程:
1.先判斷當前執行緒是否可以進行鎖釋放
2.state減除,如果減除後的state為0,就將鎖的持有者設為空,並解除下一個等待節點的執行緒阻塞狀態
為了加深印象,我專門還花了三個流程圖,一起看看
到這裡基本上ReentrantLock的主要功能點都說完了,還有一個Condition功能沒說,接著搞起來。
ReentrantLock的Condition:
ReentrantLock方法: public Condition newCondition() { return sync.newCondition(); } Sync內部類方法: final ConditionObject newCondition() { return new ConditionObject(); }
還是一如既往的配方,ReentrantLock的newCondition依然用的是sync屬性去實現功能,Sync也簡單,直接就是建立一個AbstractQueuedSynchronizer的內部列ConditionObject的例項。我們直接去看ConditionObject的原始碼去分析
Condition主要是wait()、signal()兩個方法,我們先來看wait()方法
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0; while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
分析之前先說明下,ConditionObject中同樣儲存的有CLH佇列,和外部類AbstractQueuedSynchronizer相似。
我們通過await()方法的原始碼簡單分析下:
1、首先呼叫了addConditionWaiter()方法將當前執行緒封裝到Node.CONDITION模式的Node中,放入到ConditionObject的CLH佇列中,順便去除了一些佇列中waitStatus不是Node.CONDITION的節點。
2、呼叫fullyRelease(Node node)去完全釋放掉鎖,並去解除AbstractQueuedSynchronizer中佇列的head節點,方法返回節點完全釋放前的state值
3、迴圈:對當前節點進行執行緒阻塞,直到被其他執行緒使用signal()或者signalAll()方法解除執行緒阻塞狀態
4、將節點加入到AbstractQueuedSynchronizer的CLH佇列中,等待爭奪鎖
整體上Condition的wait()方法的內容就這麼多,原始碼也比較簡單,有興趣的可以自己深入看看。
現在看下signal()的原始碼:
public final void signal() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); } private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); } final boolean transferForSignal(Node node) { if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; Node p = enq(node); int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }
signal()方法的主要工作還是放在了doSignal(Node first)方法和transferForSignal(Node node)兩個方法上。在doSignal(Node first)方法中,獲取到ConditionObject的CLH佇列的隊首,然後呼叫transferForSignal(Node node)方法先將節點的waitStatus改為0,然後將節點放入到AbstractQueuedSynchronizer的CLH佇列隊尾,如果前任隊尾的waitStatus大於0或者將前任隊尾的waitStatus改為Node.SIGNAL失敗時,直接解除節點的執行緒阻塞狀態,結束wait()方法中的迴圈,呼叫acquireQueued(Node node, int savedState)去嘗試搶奪鎖,因為此時當前執行緒仍然持有鎖,所以節點最後還是會被執行緒阻塞。因為此時節點node已經從ConditionObject的CLH佇列遷移到了AQS的CLH佇列隊尾,即使if條件不滿足,不能解除node節點的執行緒阻塞狀態,等到前任隊尾節點釋放鎖時還是會解除node節點的執行緒阻塞狀態。
Condition還有await(long time, TimeUnit unit)、signalAll()等等其它方法,原理差不多,這裡就不一一贅述了。至此,ReentrantLock的知識點基本上也說的差不多