從ReentrantLock原始碼入手看鎖的實現
寫這篇確實挺傷腦筋的,是按部就班一行一行讀,但是我想這麼寫估計很多沒有接觸過的可能就勸退了,很容易出現的一種現象就是看了後面忘了前面,而且很容易看了一行程式碼就一層層往下鑽,這樣不僅容易打擊看原始碼的積極性,而且效率賊低。doug lea大神的程式碼設計的那麼精妙,浪費時間在這上面太可惜了。
在講doug lea大神的設計之前,我們考慮一下,如果是我們設計一個鎖該怎麼實現,要滿足同一時間只有一個執行緒可以進入臨界區,我想如果是我的話,我設定一個狀態標識當前是否有執行緒在使用,在使用cas原子操作改變這個值,保證在多執行緒情況下不會出現有併發修改狀態的情況,類似這樣的虛擬碼:
if(cas success){
執行臨界區程式碼
cas out
}
return;
僅此肯定是不夠的,因為我還要保證我的鎖釋放了,其他執行緒可以立刻過來搶鎖,所以我可以在現有基礎上加上一段邏輯,就是如果我競爭失敗了,我不斷輪詢嘗試改變狀態,類似:
while(!cas success){
空轉
}
執行臨界區程式碼
cas out
return;
但是有沒有發現這種方式呢,太耗資源了,如果很多執行緒競爭,執行緒空轉肯定會導致我們的服務cpu負荷變大,那麼怎麼解決這個問題呢?
既然是cpu空轉導致的,那就讓沒拿到鎖的執行緒不要轉了,先回去歇歇吧,等到我鎖釋放了你再來。可是這就出現了一個問題,我沒拿到鎖,我回去歇著可以,但是你都不知道我是誰咋叫我呢。既然這樣我們就得解決拿到鎖的執行緒不認識沒拿到鎖的執行緒,我們可以把等待的執行緒放在一個集合或者佇列裡面,然後釋放鎖就去把所有的執行緒一個個叫醒,或者就一次叫醒一個,叫醒的移出佇列,相比之下,肯定是一次叫醒一個性能高,我想一個一個叫醒,很明顯佇列的優勢比較大,效能也高。
我們自己設計差不多也就這樣了,帶著這些思考,我們再去看看doug lea大神的reentrantLock是如何設計的:
可以看到reentrantLock包含了三個內部類:
abstract static class Sync extends AbstractQueuedSynchronizer(我叫它大娃)
static final class FairSync extends Sync(二娃)
static final class NonfairSync extends Sync(三娃)
很明顯二娃三娃是大娃衍生出來的,畢竟長兄如父嘛,二娃比較公正,也叫公平鎖,三娃比較機靈,他才不管公平不公平,也叫非公平鎖,我們再看看他們到底有啥區別:
三娃想要拿鎖會先去嘗試一遍,拿到直接結束,拿不到在走常規流程,二娃就沒有前面一步,不管能不能搶到,都按規矩去排隊,再看acquire方法,兩者都是一樣的:
看到上圖的時候,就看到tryAcquire方法不一樣,我們分析一下兩者的區別:
首先看二娃的實現:
protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { //鎖是否被佔用 if (!hasQueuedPredecessors() && //此行用於判斷當前佇列是否被初始化,沒有被初始化就不用排隊了直接搶鎖 compareAndSetState(0, acquires)) { //這個很簡單,cas修改鎖狀態 setExclusiveOwnerThread(current); //設定當前佔用執行緒為自己 return true; } } else if (current == getExclusiveOwnerThread()) { //這裡判斷執行緒是否是自己,用於重入鎖 int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; //搶鎖失敗,乖乖排隊去 }
上面cas操作可以點進去看可以發現其實就是修改的state:
下面重入鎖拿到鎖setState方法也就是對state進行操作:
這裡其實就可以看得出來,和我們一開始想的是一樣的嘛,用一個狀態控制,只不過他這裡更加牛逼的是可以重入,這是我們沒想到的。
看完二娃的,調皮的三娃也來秀一波:
protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); }
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); //拿到當前執行緒 int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { //拿鎖 setExclusiveOwnerThread(current); //拿到鎖設定為自己 return true; } } else if (current == getExclusiveOwnerThread()) { //重入校驗 int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
你以為三娃之前搶過一次就算了,這裡如果不確定已經被拿到鎖了,還是會去先搶一次,搶到就是賺到,搶不到再去排隊唄,後面其他的基本都是一樣的。
看到這裡是不是覺得咱們之前的設計和doug lea大神的也差不多嘛,咱也是大牛啊,哈哈。沾沾自喜的同時咱們接著往下看,拿到鎖是結束了,可是要是拿不到鎖,咱還要排隊呢,再看下是否和我們想的一樣,佇列睡眠,逐個喚醒:
接著看acquireQueued(addWaiter(Node.EXCLUSIVE), arg) ,發現裡面還巢狀著一個方法addWaiter,那我們就先看這個:
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); //1.新建一個節點 // Try the fast path of enq; backup to full enq on failure Node pred = tail; // 2.拿到佇列最後一個節點 if (pred != null) { node.prev = pred; //3.將新建節點前置節點設為當前隊尾節點 if (compareAndSetTail(pred, node)) { //4.cas設定最後一個節點為新建節點 pred.next = node; // 5.設定之前拿到的最後節點的下一個為自己,到這就已經完成了在佇列尾部加上自己了 return node; } } enq(node); // 如果上面沒成功,那就要進到這裡面無限輪詢,直到設定成功 return node; }
private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
addWaiter的前5步如果都執行成功了,那麼也就在隊尾加成功了,流程如下圖:
這一步其實還是蠻簡單的,但是如果沒成功呢,再看enq方法,方法首先判斷隊尾是否為空,啥時候隊尾為空呢,很明顯是這個佇列裡面還沒有其他執行緒在等,那麼就會傳一個新的節點給自己(這裡也是沒有加鎖的,雖然cas能保證原子性,但不能保證cas前後的操作也是,所以前面二娃在判斷佇列是否初始化時,前置條件就是頭尾節點不相等,而且頭結點的下一個節點不為null),也就是說,佇列只有在有執行緒競爭鎖的時候,才會初始化,而設定完頭尾節點還沒結束,會接著輪詢,直到能夠入隊。雖然能看懂,但是捫心自問,確實想不出來還能這樣處理。
上面已經完成了入隊,但是就按我們之前分析的光入隊沒用啊,還得去競爭鎖啊,那我們接著看acquireQueued方法,
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); //獲取前置節點 if (p == head && tryAcquire(arg)) { //前置節點為頭節點時,嘗試獲取鎖 setHead(node); //獲取成功設定當前節點為頭節點 p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && //判斷當前節點是否需要阻塞 parkAndCheckInterrupt()) // 阻塞當前執行緒 interrupted = true; } } finally { if (failed) //這個欄位是當取消鎖競爭時會觸發的 cancelAcquire(node); } }
我們可以看到這個方法核心的兩件事:1.嘗試拿鎖,2.阻塞當前執行緒,而拿鎖的前提便是前一個節點是頭節點,拿鎖成功便設定自己為頭節點,我們再看setHead方法做了什麼:
private void setHead(Node node) { head = node; node.thread = null; node.prev = null; }
就是將自己設定為頭節點,然後將執行緒屬性和前置屬性清空,其實我覺得這個時候前置節點清空是肯定要做的,但執行緒不需要,不過這也是大神們的厲害之處,細節處理得非常好。
拿到鎖這裡就結束了,沒拿到鎖那就要阻塞了,首先看shouldParkAfterFailedAcquire方法:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
乍一看,看不懂,這啥玩意,node節點waitStatus又和Node.SIGNAL對比,又是和0相比,點開Node:
一共有四種不同狀態:
SINGAL:表示當前節點很快被阻塞,因此節點再釋放或取消後,需要喚醒後續節點,喚醒後競爭失敗,則需要繼續阻塞 狀態值:-1
CANCELLED:表示當前執行緒超時或者中斷可,這個狀態的節點不會再次阻塞,也不會切換其他狀態 狀態值:1
CONDITION: 這個表示一個等待佇列,後面會講到 狀態值:-2
PROPAGATE:傳播屬性,表示這個節點是共享鎖,這個標識通常後面可以繼續喚醒 狀態值-3
介紹完這幾種狀態,我們在回到上面程式碼,那就很簡單了,首先判斷當前節點是不是SINGAL,是表示可以被阻塞,直接返回,然後判斷大於0,也就是取消狀態,一直往佇列前找,直到節點不為取消,返回false,表示不能被阻塞,進入下一次輪詢,都不是這兩種則嘗試將前一個節點的狀態設定為SINGAL(這裡有可能會發生其他執行緒已經將狀態改了,那就要進入下一次嘗試),如果修改成功則進入輪詢。總結一下就是後一個節點第一次過來,前面兩個條件正常情況都不滿足(先不考慮取消),那麼就會嘗試將前一個結點狀態修改為可阻塞。這塊設計的確實很精妙,雖然同樣是if else,doug lea大神幾行程式碼完成了我們幾十行都不一定能寫的很好的功能。
上面如果返回true則會執行parkAndCheckInterrupt方法,這個就很簡單:
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
阻塞當前執行緒,返回中斷標誌位狀態(這個是給Condition佇列用的)。
獨佔鎖的枷鎖過程到這裡也就結束了,除了在一些細節方面大神做的確實牛逼,目前為止和我們一開始的設計思路還是差不多的,寫到這剛好看到響應中斷的加鎖有啥區別:
public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); }
public final void acquireInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (!tryAcquire(arg)) doAcquireInterruptibly(arg); }
這裡多了一箇中斷響應,再往下,tryAcquire是一樣的,加鎖失敗的流程:
private void doAcquireInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
這裡也就標紅的地方不一樣吧,普通的就是把interrupted欄位設為true,而這個會丟擲異常,而為了防止已經被中斷執行冤枉程式碼,所以在一開始進來的地方也加了箇中斷響應,厲害了我的道哥。
看完上面這些,我們先總結一下,java當中鎖可不止ReentrantLock一個,Lock的實現有很多,:
基本都是使用或者參考aqs來實現的,寫了這麼長,才寫完了加鎖過程,然而ReentrantLock裡面其實沒有多少程式碼,這些程式碼基本上都是aqs裡面的,比如上面的acquire方法,基本上加鎖所有的邏輯都是在這裡完成的,再比如acquireInterruptibly方法,共享鎖的acquireShared方法,控制加鎖的狀態state,Node等都是aqs中的,可以說aqs設計了一整套,而具體到需求的實現,由具體的類來完成。
小插曲說完,本來是想只寫獨佔鎖的,既然聊到了傳播屬性,還是聊一聊共享鎖的加鎖吧:
入口:
public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }
這裡tryAcquireShared方法是aqs的一個鉤子,必須要子類來實現,比如CountDownLatch實現就很簡單,而類似讀寫鎖這些設計就複雜得多,這裡就不做過多贅述,我們主要講沒拿到鎖,aqs裡面做了什麼:
private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED); //新增佇列至隊尾 boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); //拿到前一個節點 if (p == head) { //判斷前一個是否是頭節點 int r = tryAcquireShared(arg); //嘗試拿鎖 if (r >= 0) { setHeadAndPropagate(node, r); //設定頭節點和傳播屬性,很重要 p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
其他操作基本和獨佔鎖一樣的,主要看setHeadAndPropagate方法:
private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below setHead(node); //設定當前節點為頭節點 /* * Try to signal next queued node if: * Propagation was indicated by caller, * or was recorded (as h.waitStatus either before * or after setHead) by a previous operation * (note: this uses sign-check of waitStatus because * PROPAGATE status may transition to SIGNAL.) * and * The next node is waiting in shared mode, * or we don't know, because it appears null * * The conservatism in both of these checks may cause * unnecessary wake-ups, but only when there are multiple * racing acquires/releases, so most need signals now or soon * anyway. */ if (propagate > 0 //表示拿鎖成功 || h == null //表示頭節點為空 || h.waitStatus < 0 //表示不為取消狀態 || (h = head) == null //這一步和下一步看起來是一樣的,但是其實這裡head已經變成新加的節點 || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) //判斷是否是null或者共享節點 doReleaseShared(); //嘗試釋放鎖 } }
這裡主要的變化就是加了一個如果頭結點的下個節點為為空SHARED屬性則嘗試釋放鎖:
private void doReleaseShared() { /* * Ensure that a release propagates, even if there are other * in-progress acquires/releases. This proceeds in the usual * way of trying to unparkSuccessor of head if it needs * signal. But if it does not, status is set to PROPAGATE to * ensure that upon release, propagation continues. * Additionally, we must loop in case a new node is added * while we are doing this. Also, unlike other uses of * unparkSuccessor, we need to know if CAS to reset status * fails, if so rechecking. */ for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }
這裡首先判斷佇列是否為空,不為空則判斷頭結點的wait屬性是否為SINGAL,並嘗試將狀態改為0並喚醒(喚醒後會繼續搶鎖,會再次來到這裡,則會走下一步,而此時這個喚醒的節點前一個節點又是頭結點,又會嘗試拿鎖,直到拿不到鎖,park),然後嘗試將waitStatus改為PROPAGATE,這才結束。設計可謂是一環套一環,我們接下來再看解鎖過程:
首先看獨佔鎖:
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
首先嚐試解鎖,這裡也是供不同需求實現的,如ReentrantLock便是修改state至0表示解鎖成功,解鎖成功後判斷頭結點不為空(防止其他執行緒先來),unparkSuccessor:
private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }
這邊操作也不算複雜,cas設定waitstatus為0,校驗下一個節點是否取消並排除掉,喚醒下一個節點執行緒。
接下來看共享鎖解鎖:
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
一樣的看doReleaseShared方法:
private void doReleaseShared() { /* * Ensure that a release propagates, even if there are other * in-progress acquires/releases. This proceeds in the usual * way of trying to unparkSuccessor of head if it needs * signal. But if it does not, status is set to PROPAGATE to * ensure that upon release, propagation continues. * Additionally, we must loop in case a new node is added * while we are doing this. Also, unlike other uses of * unparkSuccessor, we need to know if CAS to reset status * fails, if so rechecking. */ for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }
有沒有發現就是剛才講的釋放鎖過程,共享鎖也就比獨佔鎖多一個傳播機制,但卻實現了兩種不同的鎖。
講到這裡aqs關於鎖方面的知識也就結束了,接下來就要聊到等待佇列Condition了,Condition的使用很簡單,可以參考我上一篇文章:https://www.cnblogs.com/gmt-hao/p/14110722.html,最核心的方法也就是await , singal,至於可控時間或者中斷處理,就不細聊了。
首先介紹一下等待佇列吧,我們前面已經看了用雙端佇列實現鎖,等待佇列和之前的是一樣的,前面waitStatus就聊到過一個CONDITION屬性,也就是對應著我們這個CONDITION,有了前面的基礎,我們也不多扯了,直接看程式碼:
首先看await方法:
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); //新增節點 int savedState = fullyRelease(node); //釋放當前執行緒鎖 int interruptMode = 0; while (!isOnSyncQueue(node)) { //判斷當前是否CONDITION節點或者第一個節點 LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) //響應中斷 break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) //嘗試拿鎖,拿不到park interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled //去除掉取消的 unlinkCancelledWaiters(); if (interruptMode != 0) // reportInterruptAfterWait(interruptMode); }
首先新增一個CONDITION節點,然後釋放當前執行緒鎖,判斷是否CONDITION節點,按道理這裡肯定是CONDITION節點,還要判斷嗎,具體原因我們下面再聊,後面會嘗試拿鎖,和之前邏輯一樣,拿不到park阻塞,我們再看一下singal方法:
public final void signal() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); }
private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); }
final boolean transferForSignal(Node node) { /* * If cannot change waitStatus, the node has been cancelled. */ if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; /* * Splice onto queue and try to set waitStatus of predecessor to * indicate that thread is (probably) waiting. If cancelled or * attempt to set waitStatus fails, wake up to resync (in which * case the waitStatus can be transiently and harmlessly wrong). */ Node p = enq(node); int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }
主要看transferForSignal方法,首先將CONDITION狀態改為0,這也是為啥上面要判斷是不是CONDITION,改成功入隊(可以搶鎖了),如果已經被取消或者修改失敗,則重新喚醒同步。總結一下就是一個等待佇列,被喚醒後會加入到搶鎖佇列隊尾,然後等機會拿鎖。
大致就是這樣的一個模型,一個搶鎖佇列,多個等待佇列。
總結:
最重要的是理解上面的搶鎖是如何設計與實現的,首先我們碰到問題是想想自己是否有思路能做到這樣,然後看看大牛的實現是否和自己有想通之初,最後再想想大牛的做法比自己牛在什麼地方。看懂了原始碼只是第一步,doug lea的aqs 中基本上沒有多餘的程式碼,比如共享釋放鎖,入隊,搶鎖等功能雖然多個地方用到了,但是其實現始終只有一個,後面我也要考慮考慮在系統設計及程式碼實現的時候如何做到功能原子化,而在設計的時候更多的去考慮我們做的東西是為了解決什麼樣的一型別的問題。
java實現的鎖算是講完了,雖然像讀寫鎖,CountDownLatch之類的其實都是用了aqs,其核心原理都是一樣的,後面有時間我將寫一篇synchronized的原始碼分析,並總結一下鎖在我心裡是啥樣的,其實很多東西我雖然看懂了但是有些地方還是掌握不好,比如像doug lea大神在很多地方都用到了cas,而且大部分搶鎖,入隊都沒有做執行緒安全處理,只在一些核心的地方加cas便可以做到併發安全的控制,這是我現階段還無法做到的,希望有一天也可以做到這麼牛逼吧。