Java中的顯示鎖ReentrantLock使用與AbstractQueuedSynchronizer原理剖析
考慮一個場景,輪流列印0-100以內的技術和偶數。通過使用 synchronize 的 wait,notify機制就可以實現,核心思路如下:
使用兩個執行緒,一個列印奇數,一個列印偶數。這兩個執行緒會共享一個數據,資料每次自增,當列印奇數的執行緒發現當前要列印的數字不是奇數時,執行等待,否則列印奇數,並將數字自增1,對於列印偶數的執行緒也是如此
//列印奇數的執行緒
private static class OldRunner implements Runnable{
private MyNumber n;
public OldRunner(MyNumber n) {
this.n = n;
}
public void run () {
while (true){
n.waitToOld(); //等待資料變成奇數
System.out.println("old:" + n.getVal());
n.increase();
if (n.getVal()>98){
break;
}
}
}
}
//列印偶數的執行緒
private static class EvenRunner implements Runnable{
private MyNumber n;
public EvenRunner(MyNumber n) {
this.n = n;
}
public void run () {
while (true){
n.waitToEven(); //等待資料變成偶數
System.out.println("even:"+n.getVal());
n.increase();
if (n.getVal()>99){
break;
}
}
}
}
複製程式碼
共享的資料如下
private static class MyNumber{
private int val;
public MyNumber(int val) {
this.val = val;
}
public int getVal () {
return val;
}
public synchronized void increase(){
val++;
notify(); //資料變了,喚醒另外的執行緒
}
public synchronized void waitToOld(){
while ((val % 2)==0){
try {
System.out.println("i am "+Thread.currentThread().getName()+" ,but now is even:"+val+",so wait");
wait(); //只要是偶數,一直等待
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public synchronized void waitToEven(){
while ((val % 2)!=0){
try {
System.out.println("i am "+Thread.currentThread().getName()+" ,but now old:"+val+",so wait");
wait(); //只要是奇數,一直等待
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
複製程式碼
執行程式碼如下
MyNumber n = new MyNumber(0);
Thread old=new Thread(new OldRunner(n),"old-thread");
Thread even = new Thread(new EvenRunner(n),"even-thread");
old.start();
even.start();
複製程式碼
執行結果如下
i am old-thread ,but now is even:0,so wait
even:0
i am even-thread ,but now old:1,so wait
old:1
i am old-thread ,but now is even:2,so wait
even:2
i am even-thread ,but now old:3,so wait
old:3
i am old-thread ,but now is even:4,so wait
even:4
i am even-thread ,but now old:5,so wait
old:5
i am old-thread ,but now is even:6,so wait
even:6
i am even-thread ,but now old:7,so wait
old:7
i am old-thread ,but now is even:8,so wait
even:8
複製程式碼
上述方法使用的是 synchronize的 wait notify機制,同樣可以使用顯示鎖來實現,兩個列印的執行緒還是同一個執行緒,只是使用的是顯示鎖來控制等待事件
private static class MyNumber{
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
private int val;
public MyNumber(int val) {
this.val = val;
}
public int getVal() {
return val;
}
public void increase(){
lock.lock();
try {
val++;
condition.signalAll(); //通知執行緒
}finally {
lock.unlock();
}
}
public void waitToOld(){
lock.lock();
try{
while ((val % 2)==0){
try {
System.out.println("i am should print old ,but now is even:"+val+",so wait");
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}finally {
lock.unlock();
}
}
public void waitToEven(){
lock.lock(); //顯示的鎖定
try{
while ((val % 2)!=0){
try {
System.out.println("i am should print even ,but now old:"+val+",so wait");
condition.await();//執行等待
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}finally {
lock.unlock(); //顯示的釋放
}
}
}
複製程式碼
同樣可以得到上述的效果
顯示鎖的功能
顯示鎖在java中通過介面Lock提供如下功能
- lock: 執行緒無法獲取鎖會進入休眠狀態,直到獲取成功
- lockInterruptibly: 如果獲取成功,立即返回,否則一直休眠到執行緒被中斷或者是獲取成功
- tryLock:不會造成執行緒休眠,方法執行會立即返回,獲取到了鎖,返回true,否則返回false
- tryLock(long time, TimeUnit unit) throws InterruptedException : 在等待時間內沒有發生過中斷,並且沒有獲取鎖,就一直等待,當獲取到了,或者是執行緒中斷了,或者是超時時間到了這三者發生一個就返回,並記錄是否有獲取到鎖
- unlock:釋放鎖
- newCondition:每次呼叫建立一個鎖的等待條件,也就是說一個鎖可以擁有多個條件
Condition的功能
介面Condition把Object的監視器方法wait和notify分離出來,使得一個物件可以有多個等待的條件來執行等待,配合Lock的newCondition來實現。
- await:使當前執行緒休眠,不可排程。這四種情況下會恢復 1:其它執行緒呼叫了signal,當前執行緒恰好被選中了恢復執行;2: 其它執行緒呼叫了signalAll;3:其它執行緒中斷了當前執行緒 4:spurious wakeup (假醒)。無論什麼情況,在await方法返回之前,當前執行緒必須重新獲取鎖
- awaitUninterruptibly:使當前執行緒休眠,不可排程。這三種情況下會恢復 1:其它執行緒呼叫了signal,當前執行緒恰好被選中了恢復執行;2: 其它執行緒呼叫了signalAll;3:spurious wakeup (假醒)。
- awaitNanos:使當前執行緒休眠,不可排程。這四種情況下會恢復 1:其它執行緒呼叫了signal,當前執行緒恰好被選中了恢復執行;2: 其它執行緒呼叫了signalAll;3:其它執行緒中斷了當前執行緒 4:spurious wakeup (假醒)。5:超時了
- await(long time, TimeUnit unit) :與awaitNanos類似,只是換了個時間單位
- awaitUntil(Date deadline):與awaitNanos相似,只是指定日期之後返回,而不是指定的一段時間
- signal:喚醒一個等待的執行緒
- signalAll:喚醒所有等待的執行緒
ReentrantLock
從原始碼中可以看到,ReentrantLock的所有實現全都依賴於內部類Sync和ConditionObject。
Sync本身是個抽象類,負責手動lock和unlock,ConditionObject則實現在父類AbstractOwnableSynchronizer中,負責await與signal Sync的繼承結構如下
公平的鎖會把許可權給等待時間最長的執行緒來執行,非公平則獲取執行許可權的執行緒與執行緒本身的等待時間無關
預設初始化ReentrantLock使用的是非公平鎖,當然可以通過指定引數來使用公平鎖
public ReentrantLock() { sync = new NonfairSync(); } 複製程式碼
當執行獲取鎖時,實際就是去執行 Sync 的lock操作:
public void lock() {
sync.lock();
}
複製程式碼
對應在不同的鎖機制中有不同的實現
- 公平鎖實現
final void lock() { acquire(1); } 複製程式碼
- 非公平鎖實現
final void lock() { if (compareAndSetState(0, 1)) //先看當前鎖是不是已經被佔有了,如果沒有,就直接將當前執行緒設定為佔有的執行緒 setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); //鎖已經被佔有的情況下,嘗試獲取 } 複製程式碼
二者都呼叫父類AbstractQueuedSynchronizer的方法
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) //一旦搶失敗,就會進入佇列,進入佇列後則是依據FIFO的原則來執行喚醒
selfInterrupt();
}
複製程式碼
當執行unlock時,對應方法在父類AbstractQueuedSynchronizer中
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
複製程式碼
公平鎖和非公平鎖則分別對獲取鎖的方式tryAcquire
做了實現,而tryRelease的實現機制則都是一樣的
公平鎖實現tryAcquire
原始碼如下
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState(); //獲取當前的同步狀態
if (c == 0) {
//等於0 表示沒有被其它執行緒獲取過鎖
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
//hasQueuedPredecessors 判斷在當前執行緒的前面是不是還有其它的執行緒,如果有,也就是鎖sync上有一個等待的執行緒,那麼它不能獲取鎖,這意味著,只有等待時間最長的執行緒能夠獲取鎖,這就是是公平性的體現
//compareAndSetState 看當前在記憶體中儲存的值是不是真的是0,如果是0就設定成accquires的取值。對於JAVA,這種需要直接操作記憶體的操作是通過unsafe來完成,具體的實現機制則依賴於作業系統。
//儲存獲取當前鎖的執行緒
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
//判斷是不是當前執行緒獲取的鎖
int nextc = c + acquires;
if (nextc < 0)//一個執行緒能夠獲取同一個鎖的次數是有限制的,就是int的最大值
throw new Error("Maximum lock count exceeded");
setState(nextc); //在當前的基礎上再增加一次鎖被持有的次數
return true;
}
//鎖被其它執行緒持有,獲取失敗
return false;
}
複製程式碼
非公平鎖實現tryAcquire
獲取的關鍵實現為nonfairTryAcquire
,原始碼如下
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
//鎖沒有被持有
//可以看到這裡會無視sync queue中是否有其它執行緒,只要執行到了當前執行緒,就會去獲取鎖
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current); //在判斷一次是不是鎖沒有被佔有,沒有就去標記當前執行緒擁有這個鎖了
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);//如果當前執行緒已經佔有過,增加佔有的次數
return true;
}
return false;
}
複製程式碼
釋放鎖的機制
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread()) //只能是執行緒擁有這釋放
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
//當佔有次數為0的時候,就認為所有的鎖都釋放完畢了
free = true;
setExclusiveOwnerThread(null);
}
setState(c); //更新鎖的狀態
return free;
}
複製程式碼
從原始碼的實現可以看到
- ReentrantLock獲取鎖時,在鎖已經被佔有的情況下,如果佔有鎖的執行緒是當前執行緒,那麼允許重入,即再次佔有,如果由其它執行緒佔有,則獲取失敗,由此可見,
ReetrantLock本身對鎖的持有是可重入的,同時是執行緒獨佔的
。 - 公平與非公平就體現在,當執行的執行緒去獲取鎖的時候,公平的會去看是否有等待時間比它更長的,而非公平的就優先直接去佔有鎖
ReentrantLock的tryLock()與tryLock(long timeout, TimeUnit unit):
public boolean tryLock() { //本質上就是執行一次非公平的搶鎖 return sync.nonfairTryAcquire(1); } 複製程式碼
有時限的tryLock核心程式碼是
sync.tryAcquireNanos(1, unit.toNanos(timeout));
,由於有超時時間,它會直接放到等待佇列中,他與後面要講的AQS的lock原理中acquireQueued的區別在於park的時間是有限的,詳見原始碼AbstractQueuedSynchronizer.doAcquireNanos
AbstractQueuedSynchronizer
無論是公平鎖還是非公平鎖,它們的實現都依賴於AbstractQueuedSynchronizer,它提供了一個基於先進先出等待佇列 實現block locks和synchronizers的框架。特性如下
- 僅通過一個 int 型別來代表狀態。對於ReentrantLock而言,他就是執行緒持有鎖的次數,當次數為0時,代表鎖沒有被持有,正數代表被持有的次數,負數則是超出了鎖的持有範圍,有可能存在死迴圈
- 支援獨佔模式(預設的)和共享模式。在獨佔模式中,去獲取一個已經被其它執行緒擁有的鎖只會失敗,共享模式中,則多個執行緒是可以成功的。
lock()原理
當ReentrantLock獲取鎖失敗時,會執行 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode); //建立一個節點,儲存當前的執行緒,以及鎖持有的模式,對於 ReentrantLock來說就是 獨佔 型
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {//CAS操作,如果當前的尾部節點沒有被其它執行緒更改,那麼把新的節點設定成佇列的尾部
pred.next = node;
return node;
}
}
enq(node);//首次入隊
return node;
}
複製程式碼
獲取失敗進行入隊操作,首先就是往佇列中新增一個正在等待的節點Node
從Node本身的結構可以看到,AQS(AbstractQueuedSynchronizer)本身就維護了一個雙向連結串列,用來存放等待中的執行緒。連結串列的每個節點,代表那個執行緒,是獨佔還是共享鎖。建立好節點之後,便執行入隊操作,對於首次建立佇列
private Node enq(final Node node) {
for (;;) {
//藉助CAS機制實現無鎖操作,所以需要一直執行直到CAS成功
Node t = tail;
if (t == null) { // 初始化發生在第一次建立佇列,這樣的好處是,當競爭不激烈的時候,實際上也就不會發生這些操作,效能也會好些
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
複製程式碼
可以看到,入隊也就是從隊尾插入新的等待執行緒,入隊完畢,也就開始去進行不斷的嘗試,直到獲取鎖成功,可以看到,對於lock來說,其實已經是阻塞了
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor(); //優先執行當前節點的前一個節點
if (p == head && tryAcquire(arg)) { //僅噹噹前節點的前一個節點是head,才去獲取執行緒,這裡可以看出其實先等待的執行緒是會優先處理,也就是FIFO原則
setHead(node);
p.next = null; // help GC ,釋放掉當前執行緒在佇列中的引用,也可以看做’出隊'了
failed = false;
//執行到這裡說明獲取鎖成功
return interrupted;
}
//執行到這裡說明存在競爭,有多個執行緒都在等待一個鎖
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()) //裡面會對當前執行緒執行中斷,當被喚醒時,繼續迴圈
//如果執行緒被中斷,設定中斷標記,區別於 doAcquireInterruptibly,doAcquireInterruptibly是直接丟擲異常,這也就是 lockInterruptibly能夠丟擲中斷的原因
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
複製程式碼
從這裡可以看到,無論鎖是公平鎖還是非公平鎖,只要被放入了等待佇列,此時的執行依然是誰先等待就先執行誰 ,非公平鎖體現在新來的執行緒會無視已經等了的執行緒,可以優先去搶鎖,所以公平體現在第一次參與搶鎖的執行緒會去等待已經在等待佇列中的執行緒,非公平並不是說從已經在等待的執行緒佇列裡面隨便選一個
shouldParkAfterFailedAcquire的原始碼如下
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus; //檢視前一個節點的等待狀態
if (ws == Node.SIGNAL)
//已經嘗試過獲取鎖,可以執行park了
return true;
if (ws > 0) {
do {
//去掉佇列中所有已經取消的執行緒
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//此時當前執行緒的前一個節點的等待狀態必定是0或者PROGATE,這表明當前執行緒在park之前可以再嘗試一次去獲取鎖,也就是說前一個節點可能剛獲取到SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
複製程式碼
waitStatus:等待的狀態,共有5種
- SIGNAL:,表明它的前一個節點需要執行 unparking;
- CANCELLED:當前節點儲存的執行緒由於超時或者中斷被取消了;
- CONDITION:接檔正處於條件佇列中,執行了await;
- PROPAGATE:一個共享的鎖需要傳遞釋放訊號到其它節點
- 0:非上述4中狀態,有可能是剛獲取signal,此時它的值是0,也有可能是新建的head節點
parkAndCheckInterrupt主要是park當前執行緒
private final boolean parkAndCheckInterrupt() {
//當獲取不到許可時,阻塞執行緒,解除阻塞狀態的情況如下:
//1 某個執行緒對這個執行緒呼叫了unpark方法
//2 某個執行緒中斷了這個執行緒
//3 這個方法毫無理由的返回了 [park比較奇特的地方],基於這樣,呼叫的時候必須去判斷park的條件,以及當它返回的時候,去設定中斷的狀態
LockSupport.park(this);
//返回執行緒的中斷狀態
return Thread.interrupted();
}
複製程式碼
至此lock()執行結束
unlock()原理
當執行unlock時,ReentrentLock執行對應的Release
public final boolean release(int arg) {
if (tryRelease(arg)) {
//執行這裡表示所已經被釋放,可以讓它的下一個節點來搶鎖
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h); //h.waitStatus == 0 表示還沒有執行park,自然不需要unpark
return true;
}
return false;
}
複製程式碼
如果release成功,即當前執行緒持有的所有鎖都已經釋放,那麼就可以執行 unparkSuccessor
,從原始碼可以看到,unpark是從頭部開始進行的,結合lock的原理,可知AQS本身就是一個先進先出的佇列 unparkSuccessor原始碼如下
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next; //有可能執行緒被取消了,所以從尾部往前遍歷,到最近一個沒有被取消的執行緒
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t; //確保執行緒沒有取消
}
if (s != null)
LockSupport.unpark(s.thread); //恢復執行緒
}
複製程式碼
至此unlock()完畢
await的原理
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();//當前執行緒已經中斷了,丟擲中斷異常
//新增一個新的waiter到condition queue中,這個新的Node的waitStatus會被標記為CONDITION
Node node = addConditionWaiter();
//釋放當前執行緒擁有的鎖,即從sync queue中去掉當前執行緒
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
//如果當前執行緒不在持有鎖的佇列裡頭,對他進行休眠,當其它執行緒執行 unlock的時候,釋放鎖,就會執行unpark操作,此時它會被喚醒,喚醒後,如果它在syn佇列裡頭,開始繼續往下執行。(這個插入操作則是由signal完成)
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;//等待的過程中執行緒中斷了,退出
}
//重新競爭鎖,相當於執行了lock操作
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
//再次去獲取鎖,如果當前的執行緒在park的時候是被中斷了,並且ConditionObject並不是由於中斷返回,這裡再次標記為中斷
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
//清除非Condition模式的執行緒,而在signal中有先關操作將conditon的執行緒設定成非condition
unlinkCancelledWaiters();
if (interruptMode != 0)
//上報等待的過程中發生了中斷,如果是要丟擲中斷,就丟擲,否則再次執行中斷
reportInterruptAfterWait(interruptMode);
}
複製程式碼
isOnSyncQueue原始碼如下
final boolean isOnSyncQueue(Node node) {
if (node.waitStatus == Node.CONDITION || node.prev == null)
//node本身是呼叫了 await 方法,或者沒有在獲取鎖的佇列裡頭,[如果在裡頭必定有一個前置的節點]
return false;
if (node.next != null)
//當前節點存在下一個節點,那麼它肯定是執行過 enq ,即獲取過鎖
return true;
// CAS失敗的時候,有可能 node.rev是沒有的,因此需要從頭到尾遍歷一次
return findNodeFromTail(node);
}
複製程式碼
checkInterruptWhileWaiting原始碼如下
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
final boolean transferAfterCancelledWait(Node node) {
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
//執行緒中斷重新獲取鎖,並且設定waitStatus為0,以便後續執行緒從condition queue清除
enq(node);
return true;
}
while (!isOnSyncQueue(node))
//如果CAS失敗,只要當前節點沒有在Sync queue中,那麼一直自旋,每次都會交出執行許可權
Thread.yield();
return false;
}
複製程式碼
可以看到,await其實就是釋放執行緒原有的鎖,並把它放入conditon佇列中,然後執行阻塞。等喚醒的時候,重新獲取鎖,並清掉condition queue中的執行緒。 至此await執行結束
singnal的原理
public final void signal() {
if (!isHeldExclusively()) //只有當前執行緒持有了鎖,才能釋放
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);//優先釋放佇列頭的,也就是等待時間最長的condition node
}
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
//將節點從condition queue轉移到sync queue
final boolean transferForSignal(Node node) {
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false; //設定為非等待失敗,則不繼續轉移
//CAS設定等待狀態為0成功
Node p = enq(node); //新節點放入sync queue,並返回原來的尾部節點,也就是新節點的前一個節點
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) //參考shouldParkAfterFailedAcquire
LockSupport.unpark(node.thread);//如果當前節點的前一個節點執行緒已經取消,或者將當前節點的前一個節點執行緒的waitStatus設定成SIGNAL失敗,則直接喚醒當前執行緒
return true;
}
複製程式碼
可以看到signal最關鍵的資訊就是去掉等待佇列中的CONDITION狀態,並將執行緒加入sync佇列,至此signal結束
為什麼需要顯示鎖
內建鎖功能上有一定的侷限性,它無法響應中斷,不能設定等待的時間