【JVM第八篇】:Java併發程式設計:用AQS寫一把可重入鎖
前一篇部落格Java併發程式設計:自己動手寫一把可重入鎖詳述瞭如何用synchronized同步的方式來實現一把可重入鎖,今天我們來效仿ReentrantLock類用AQS來改寫一下這把鎖。要想使用AQS為我們服務,首先得弄懂三個問題:AQS是什麼?AQS已經做了什麼以及我們還需要做些什麼?
AQS簡介
AQS是J.U.C包下AbstractQueuedSynchronizer抽象的佇列式的同步器的簡稱,這是一個抽象類,它定義了一套多執行緒訪問共享資源的同步器框架,J.U.C包下的許多同步類實現都依賴於它,比如ReentrantLock/Semaphore/CountDownLatch,可以說這個抽象類是J.U.C併發包的基礎。
之所以把這一章節叫做AQS簡介而不是叫AQS詳解,是因為已經有大神寫過詳解的文章Java併發之AQS詳解,這篇文章對AQS的原始碼解析很透徹,博主讀了之後受益匪淺,鑑於對原作者的尊重,所以如上附上原文的連結。要想弄懂AQS還得從這一圖說起。
如上圖所述,AQS維護了一個state變數和一個FIFO先進先出佇列,這個state用來幹嘛的可以參考我前一篇部落格中的那個count計數器,就是用來計數執行緒的重入次數的。上一篇部落格還用了一個變數currentThread來記錄已經獲得這把鎖的執行緒。而我們的AQS用的是一個先進先出的等待佇列的完成這件事。當新的執行緒進來的時候,AQS呼叫tryAquice()方法試圖去獲得鎖,如果獲得的話,則呼叫interupt中斷方法;如果沒有獲得鎖,則把當前執行緒放入排隊的佇列,AQS佇列不斷的自旋嘗試去判斷已經佔用的執行緒是否已經放開,如果鎖依然被執行緒繼續佔用,則繼續新增進等待佇列。
原始碼如下:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
那個addWaiter方法,此方法用於將當前執行緒加入到等待佇列的隊尾,並返回當前執行緒所在的結點。
private Node addWaiter(Node mode) {
//以給定模式構造結點。mode有兩種:EXCLUSIVE(獨佔)和SHARED(共享)
Node node = new Node(Thread.currentThread(), mode);
//嘗試快速方式直接放到隊尾。
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//上一步失敗則通過enq入隊。
enq(node);
return node;
}
我們以獨佔式的同步幫助器為例來看一下AQS的執行流程。
大致流程如下:
- 呼叫自定義同步器的tryAcquire()嘗試直接去獲取資源,如果成功則直接返回;
- 沒成功,則addWaiter()將該執行緒加入等待佇列的尾部,並標記為獨佔模式;
- acquireQueued()使執行緒在等待佇列中休息,有機會時(輪到自己,會被unpark())會去嘗試獲取資源。獲取到資源後才返回。如果在整個等待過程中被中斷過,則返回true,否則返回false。
- 如果執行緒在等待過程中被中斷過,它是不響應的。只是獲取資源後才再進行自我中斷selfInterrupt(),將中斷補上。
上述的流程和步驟已經是AQS幫我們實現了的功能,估計我講的也不太清楚,這裡再次推薦讀者閱讀這篇文章Java併發之AQS詳解,下面我們應該來看看如何使用AQS。
用AQS寫一把互斥鎖
互斥鎖是為了保證資料的安全,在任一時刻只能有一個執行緒訪問該物件。由上一個小節我們可知,AQS已經為我們實現所有排隊和阻塞機制,我們只需要呼叫getState()、setState(int) 和 compareAndSetState(int, int) 方發來維護state變數的數值和呼叫setExclusiveOwnerThread/getExclusiveOwnerThread來維護當前佔用的執行緒是誰就行了。翻越JDK提供的API,它建議我們:應該將子類定義為非公共內部幫助器類,可用它們來實現其封閉類的同步屬性。類 AbstractQueuedSynchronizer 沒有實現任何同步介面。而是定義了諸如 acquireInterruptibly(int) 之類的一些方法,在適當的時候可以通過具體的鎖和相關同步器來呼叫它們,以實現其公共方法。
什麼意思呢?意思就是建議我們:如果你想要使用AQS實現一把互斥鎖Mutex,就必須先用一個類去繼承AbstractQueuedSynchronizer這個抽象類,然而這個實現的子類(暫取名叫Sync)應該是作為Mutex的內部類來用的,提供給Mutex當作幫助器來使用。那麼Lock介面,Mutex互斥鎖,AbstractQueuedSynchronizer抽象類和Sync幫助器這四者存在什麼聯絡呢?為了避免你聽糊塗了,下面我整理他們的UML類圖如下。
由上圖可知:Mutex互斥鎖繼承了Lock鎖的介面,具有鎖的屬性,可以提供上鎖和釋放鎖的方法,他是對外提供服務的服務者,而Mutex類有個Sync型別的私有物件sync,這個私有物件繼承了AbstractQueuedSynchronizer抽象類,是Mutex鎖和AQS的橋樑,是加鎖和釋放鎖真正的服務者。如果你看明白了上面的UML類圖,那麼我們的Mutex互斥鎖的定義應該如下:
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
public class Mutex implements Lock {
private Sync sync = new Sync();
private class Sync extends AbstractQueuedSynchronizer {
@Override
protected boolean tryAcquire(int arg) {
// TODO Auto-generated method stub
return super.tryAcquire(arg);
}
@Override
protected boolean tryRelease(int arg) {
// TODO Auto-generated method stub
return super.tryRelease(arg);
}
}
@Override
public void lock() {
// TODO Auto-generated method stub
}
@Override
public void lockInterruptibly() throws InterruptedException {
// TODO Auto-generated method stub
}
@Override
public boolean tryLock() {
// TODO Auto-generated method stub
return false;
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
// TODO Auto-generated method stub
return false;
}
@Override
public void unlock() {
// TODO Auto-generated method stub
}
@Override
public Condition newCondition() {
// TODO Auto-generated method stub
return null;
}
}
這裡我們實現的是獨佔式的鎖,Sync幫助器只需要覆蓋父類的tryAcquire(),tryRelease()方法就行了,其他方法可以暫時刪掉,如共享式的tryAcquireShared(),tryReleaseShared(),已經Condition用到的isHeldExclusively()和toString()方法都可以暫時不用實現,因為我們只是想先用AQS來做一把可以保證資料安全的鎖,考慮的問題暫時沒有那麼多。
/**
* 互斥鎖
* @author 張仕宗
* @date 2018.11.9
*/
public class Mutex implements Lock{
//AQS子類的物件,Mutex互斥鎖用它來工作
private Sync sync = new Sync();
//Sync同步器類作為公共內部幫助器,可用它來實現其封閉類的同步屬性
private class Sync extends AbstractQueuedSynchronizer {
@Override
protected boolean tryAcquire(int arg) {
assert arg == 1; //這裡用到了斷言,互斥鎖,鎖只能被獲取一次,如果arg不等於1,則直接中斷
if(this.compareAndSetState(0, 1)) { //這裡做一下判斷,如果state的值為等於0,立馬將state設定為1
//返回true,告訴acqure方法,獲取鎖成功
return true;
}
return false;
}
@Override
protected boolean tryRelease(int arg) {
//釋放鎖,由於這是一把互斥鎖,state不是0就是1,所以你需要做兩步:
//1.直接將state置為0
this.setState(0);
//返回true,告訴aqs的release方法釋放鎖成功
return true;
}
}
/**
* 上鎖的方法
*/
@Override
public void lock() {
sync.acquire(1);
}
/**
* 釋放鎖的方法
*/
@Override
public void unlock() {
sync.release(1);
}
@Override
public void lockInterruptibly() throws InterruptedException {
// TODO Auto-generated method stub
}
@Override
public boolean tryLock() {
// TODO Auto-generated method stub
return false;
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
// TODO Auto-generated method stub
return false;
}
@Override
public Condition newCondition() {
// TODO Auto-generated method stub
return null;
}
}
上訴程式碼實現了一把最簡單的鎖,我們只實現其lock()和unlock()方法,其他方法請暫時忽略,而lock()方法和unlock()方法是如何實現的呢?lock()方法呼叫了Sync幫助器物件的sync.acquire(1)方法,由於我們的幫助器Sync並沒有實現這個方法,所以實際呼叫的是AQS的acquire()方法,而AQS這時候做了什麼時呢?再來一次該方法的原始碼:
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
次方法乾的第一件事就是去呼叫tryAcquire()方法,這個方法需要Sync來實現,如果自己的Sync沒有實現這個方法的話,父類會直接丟擲UnsupportedOperationException這個異常。
@Override
protected boolean tryAcquire(int arg) {
assert arg == 1; //這裡用到了斷言,互斥鎖,鎖只能被獲取一次,如果arg不等於1,則直接中斷
if(this.compareAndSetState(0, 1)) { //這裡做一下判斷,如果state的值為等於0,立馬將state設定為1
//返回true,告訴acqure方法,獲取鎖成功
return true;
}
return false;
}
由於這是一把互斥鎖,所以只能有同一時刻只能獲得一次鎖。程式碼中用到了assert斷言,如果預獲得鎖的次數不是1,則中斷。接下來if中判斷state狀態是否為0,如果state狀態為0,則說明鎖還沒有被佔用,那麼我立刻佔用這把鎖,判斷state當前值和設定state為1這兩步用原子性操作的程式碼語句是this.compareAndSetState(0, 1),並立馬放回true,這時候AQS獲得返回值,獲得鎖成功。如果是第二個執行緒進來,if語句判斷得到的值非0,則直接返回false,這時候AQS將新進來的執行緒放進FIFO佇列排隊。
接下來看看Mutex的unlock()方法,該方法呼叫了sync.release(1),看看AQS這時候做了什麼!
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
此方法是獨佔模式下執行緒釋放共享資源的頂層入口。它會釋放指定量的資源,如果徹底釋放了(即state=0),它會喚醒等待佇列裡的其他執行緒來獲取資源。同樣的,我們的同步器Sync需要去實現這個tryRelease方法,不然同樣會丟擲UnsupportedOperationException異常。Sync的tryRelease方法比較簡單:
@Override
protected boolean tryRelease(int arg) {
//釋放鎖,由於這是一把互斥鎖,state不是0就是1,所以你需要做兩步:
//1.直接將state置為0
this.setState(0);
//返回true,告訴aqs的release方法釋放鎖成功
return true;
}
只需要設定state為0即可,由於這是一把互斥鎖,state不是0就是1所以直接呼叫this.setSate(0)。
用AQS寫一把重入鎖
上訴的Mutex並非一把可重入鎖,為了實現這把鎖能夠讓同一執行緒多次進來,回憶一下上一篇部落格中怎麼實現的?當時的做法是在鎖的lock()自旋方法中判斷新進來的是不是正在執行的執行緒,如果新進來的執行緒就是正在執行的執行緒,則獲取鎖成功,並讓計數器+1。而在釋放鎖的時候,如果釋放鎖的執行緒等於當前執行緒,讓計數器-1,只有當計數器count歸零的時候才真正的釋放鎖。同樣的,用AQS實現的鎖也是這個思路,那麼我們的tryAcquice方法如下:
@Override
protected boolean tryAcquire(int arg) {
//如果第一個執行緒進來,直接獲得鎖,並設定當前獨佔的執行緒為當前執行緒
int state = this.getState();
if(state == 0) { //state為0,說明當前沒有執行緒佔用該執行緒
if(this.compareAndSetState(0, arg)) { //判斷當前state值,第一個執行緒進來,立刻設定state為arg
this.setExclusiveOwnerThread(Thread.currentThread()); //設定當前獨佔執行緒為當前執行緒
return true; //告訴頂級aqs獲取鎖成功
}
} else { //如果是第二個執行緒進來
Thread currentThread = Thread.currentThread();//當前進來的執行緒
Thread ownerThread = this.getExclusiveOwnerThread();//已經儲存進去的獨佔式執行緒
if(currentThread == ownerThread) { //判斷一下進來的執行緒和儲存進去的執行緒是同一執行緒麼?如果是,則獲取鎖成功,如果不是則獲取鎖失敗
this.setState(state+arg); //設定state狀態
return true;
}
}
return false;
}
tryAcquice()方法程式碼含義如註釋所示,與Mutex互斥鎖不同的是當state狀態不為0時我們的邏輯處理,如果第二次進來的執行緒currentThread和正在獨佔的執行緒ownerThread為統一執行緒,第一步設定state增加1,第二步返回true給AQS。
tryRelease()方法程式碼如下:
@Override
protected boolean tryRelease(int arg) {
//鎖的獲取和鎖的釋放是一一對應的,獲取過多少次鎖就釋放多少次鎖
if(Thread.currentThread() != this.getExclusiveOwnerThread()) {
//如果釋放鎖的不是當前執行緒,則丟擲異常
throw new RuntimeException();
}
int state = this.getState()-arg;
//接下來判斷state是否已經歸零,只有state歸零的時候才真正的釋放鎖
if(state == 0) {
//state已經歸零,做掃尾工作
this.setState(0);
this.setExclusiveOwnerThread(null);
return true;
}
this.setState(state);
return false;
}
tryRelease()首先是獲取當前state的值,並對這個值進行欲判:如果當前值state減去sync.release()傳來的引數歸零,則真正的釋放鎖,那麼我們要做的第一步是設定state為0,接著設定當前獨佔的執行緒為null,再然後返回true告訴AQS釋放鎖成功。如果如果當前值state減去sync.release()傳來的引數歸零,如果讓state的值為state-arg相減之後的值。
目前為此,我們以來了AQS框架來改寫的重入鎖程式碼如下:
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
/**
* 用AQS實現的重入鎖
* @author 張仕宗
* @date 2018.11.9
*/
public class MyAqsLock implements Lock{
//AQS子類的物件,用它來輔助MyAqsLock工作
private Sync sync = new Sync();
private class Sync extends AbstractQueuedSynchronizer {
@Override
protected boolean tryAcquire(int arg) {
//如果第一個執行緒進來,直接獲得鎖,並設定當前獨佔的執行緒為當前執行緒
int state = this.getState();
if(state == 0) { //state為0,說明當前沒有執行緒佔用該執行緒
if(this.compareAndSetState(0, arg)) { //判斷當前state值,第一個執行緒進來,立刻設定state為arg
this.setExclusiveOwnerThread(Thread.currentThread()); //設定當前獨佔執行緒為當前執行緒
return true; //告訴頂級aqs獲取鎖成功
}
} else { //如果是第二個執行緒進來
Thread currentThread = Thread.currentThread();//當前進來的執行緒
Thread ownerThread = this.getExclusiveOwnerThread();//已經儲存進去的獨佔式執行緒
if(currentThread == ownerThread) { //判斷一下進來的執行緒和儲存進去的執行緒是同一執行緒麼?如果是,則獲取鎖成功,如果不是則獲取鎖失敗
this.setState(state+arg); //設定state狀態
return true;
}
}
return false;
}
@Override
protected boolean tryRelease(int arg) {
//鎖的獲取和鎖的釋放是一一對應的,獲取過多少次鎖就釋放多少次鎖
if(Thread.currentThread() != this.getExclusiveOwnerThread()) {
//如果釋放鎖的不是當前執行緒,則丟擲異常
throw new RuntimeException();
}
int state = this.getState()-arg;
//接下來判斷state是否已經歸零,只有state歸零的時候才真正的釋放鎖
if(state == 0) {
//state已經歸零,做掃尾工作
this.setState(0);
this.setExclusiveOwnerThread(null);
return true;
}
this.setState(state);
return false;
}
public Condition newCondition() {
return new ConditionObject();
}
}
/**
* 上鎖的方法
*/
@Override
public void lock() {
sync.acquire(1);
}
/**
* 釋放鎖的方法
*/
@Override
public void unlock() {
sync.release(1);
}
@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
@Override
public boolean tryLock() {
//呼叫幫助器的tryAcquire方法,測試獲取鎖一次,不會自旋
return sync.tryAcquire(1);
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
//呼叫幫助器的tryRelease方法,測試釋放鎖一次,不會子旋
return sync.tryRelease(1);
}
@Override
public Condition newCondition() {
//呼叫幫助類獲取Condition物件
return sync.newCondition();
}
}