圖解AQS原理之ReentrantLock詳解-非公平鎖
概述
併發程式設計中,ReentrantLock
的使用是比較多的,包括之前講的LinkedBlockingQueue
和ArrayBlockQueue
的內部都是使用的ReentrantLock
,談到它又不能的不說AQS,AQS的全稱是AbstractQueuedSynchronizer
,這個類也是在java.util.concurrent.locks
下面,提供了一個FIFO的佇列,可以用於構建鎖的基礎框架,內部通過原子變數state
來表示鎖的狀態,當state
大於0的時候表示鎖被佔用,如果state等於0時表示沒有佔用鎖,ReentrantLock
是一個重入鎖,表現在state
上,如果持有鎖的執行緒重複獲取鎖時,它會將state
本文分析內容主要是針對jdk1.8版本
約束:文中圖片的ref-xxx代表引用地址
圖片中的內容prve更正為prev,由於文章不是一天寫的所以有些圖片更正了有些沒有。
AQS主要欄位
/** * 頭節點指標,通過setHead進行修改 */ private transient volatile Node head; /** * 佇列的尾指標 */ private transient volatile Node tail; /** * 同步器狀態 */ private volatile int state;
AQS需要子類實現的方法
AQS是提供了併發的框架,它內部提供一種機制,它是基於模板方法的實現,整個類中沒有任何一個abstract的抽象方法,取而代之的是,需要子類去實現的那些方法通過一個方法體丟擲UnsupportedOperationException異常來讓子類知道,告知如果沒有實現模板的方法,則直接丟擲異常。
方法名 | 方法描述 |
---|---|
tryAcquire | 以獨佔模式嘗試獲取鎖,獨佔模式下呼叫acquire,嘗試去設定state的值,如果設定成功則返回,如果設定失敗則將當前執行緒加入到等待佇列,直到其他執行緒喚醒 |
tryRelease | 嘗試獨佔模式下釋放狀態 |
tryAcquireShared | 嘗試在共享模式獲得鎖,共享模式下呼叫acquire,嘗試去設定state的值,如果設定成功則返回,如果設定失敗則將當前執行緒加入到等待佇列,直到其他執行緒喚醒 |
tryReleaseShared | 嘗試共享模式下釋放狀態 |
isHeldExclusively | 是否是獨佔模式,表示是否被當前執行緒佔用 |
AQS是基於FIFO佇列實現的,那麼佇列的Node節點又是存放的什麼呢?
Node欄位資訊
欄位名 | 型別 | 預設值 | 描述 |
---|---|---|---|
SHARED | Node | new Node() | 一個標識,指示節點使用共享模式等待 |
EXCLUSIVE | Nodel | Null | 一個標識,指示節點使用獨佔模式等待 |
CANCELLED |
int | 1 | 節點因超時或被中斷而取消時設定狀態為取消狀態 |
SIGNAL |
int | -1 | 當前節點的後節點被park,當前節點釋放時,必須呼叫unpark通知後面節點,當後面節點競爭時,會將前面節點更新為SIGNAL |
CONDITION |
int | -2 | 標識當前節點已經處於等待中,通過條件進行等待的狀態 |
PROPAGATE |
int | -3 | 共享模式下釋放節點時設定的狀態,被標記為當前狀態是表示無限傳播下去 |
0 |
int | 不屬於上面的任何一種狀態 | |
waitStatus | int | 0 | 等待狀態,預設初始化為0,表示正常同步等待, |
pre | Node | Null | 佇列中上一個節點 |
next | Node | Null | 佇列中下一個節點 |
thread | Thread | Null | 當前Node操作的執行緒 |
nextWaiter | Node | Null | 指向下一個處於阻塞的節點 |
通過上面的內容我們可以看到waitStatus其實是有5個狀態的,雖然這裡面0並不是什麼欄位,但是他是waitStatus狀態的一種,表示不是任何一種型別的欄位,上面也講解了關於AQS中子類實現的方法,AQS提供了獨佔模式和共享模式兩種,但是ReentrantLock
實現的是獨佔模式的方式,下面來通過原始碼的方式解析ReentrantLock
。
ReentrantLock原始碼分析
首先在原始碼分析之前我們先來看一下ReentrantLock的類的繼承關係,如下圖所示:
可以看到ReentrantLock
繼承自Lock
介面,它提供了一些獲取鎖和釋放鎖的方法,以及條件判斷的獲取的方法,通過實現它來進行鎖的控制,它是顯示鎖,需要顯示指定起始位置和終止位置,Lock
介面的方法介紹:
方法名稱 | 方法描述 |
---|---|
lock | 用來獲取鎖,如果鎖已被其他執行緒獲取,則進行等待。 |
tryLock | 表示用來嘗試獲取鎖,如果獲取成功,則返回true,如果獲取失敗(即鎖已被其他執行緒獲取),則返回false,也就說這個方法無論如何都會立即返回。在拿不到鎖時不會一直在那等待 |
tryLock(long time, TimeUnit unit) | 和tryLock()類似,區別在於它在拿不到鎖時會等待一定的時間,在時間期限之內如果還拿不到鎖,就返回false。如果如果一開始拿到鎖或者在等待期間內拿到了鎖,則返回true |
lockInterruptibly | 獲取鎖,如果獲取鎖失敗則進行等到,如果等待的執行緒被中斷會相應中斷資訊。 |
unlock | 釋放鎖的操作 |
newCondition | 獲取Condition物件,該元件和當前的鎖繫結,當前執行緒只有獲得了鎖,才能呼叫該元件wait()方法,而呼叫後,當前執行緒釋放鎖。 |
ReentrantLock也實現了上面介面的內容,前面講解了很多理論行的內容,接下來我們以一個簡單的例子來進行探討
public class ReentrantLockDemo {
public static void main(String[] args) throws Exception {
AddDemo runnalbeDemo = new AddDemo();
Thread thread = new Thread(runnalbeDemo::add);
thread.start();
Thread thread1 = new Thread(runnalbeDemo::add);
thread1.start();
Thread.sleep(1000);
System.out.println(runnalbeDemo.getCount());
}
private static class AddDemo {
private final AtomicInteger count = new AtomicInteger();
private final ReentrantLock reentrantLock = new ReentrantLock();
private void add() {
try {
reentrantLock.lock();
count.getAndIncrement();
} finally {
// reentrantLock.unlock();
}
}
int getCount() {
return count.get();
}
}
}
- 首先宣告內部類AddDemo,AddDemo的主要作用是將原子變數count進行遞增的操作
- AddDemo內部聲明瞭ReentrantLock物件進行同步操作
- AddDemo的add方法,進行遞增操作,細心地同學發現,使用了lock方法獲取鎖,但是沒有釋放鎖,這裡面沒有釋放鎖可以更讓我們清晰的分析內部結構的變化。
- 主執行緒開啟了兩個執行緒進行同步進行遞增的操作,最後讓執行緒休眠一會輸出累加的最後結果。
ReentrantLock
內部提供了兩種AQS的實現,一種公平模式,一種是非公平模式,如果沒有特別指定在構造器中,預設是非公平的模式,我們可以看一下無參的建構函式。
public ReentrantLock() {
sync = new NonfairSync();
}
當呼叫有參建構函式時,指定使用哪種模式來進行操作,引數為布林型別,如果指定為false的話代表非公平模式,如果指定為true的話代表的是公平模式,如下所示:
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
我們使用的是非公平模式,後面再來進行分析公平模式,上面也講到了分為兩種模式,這兩種模式為FairSync
和NonfairSync
兩個內部靜態類不可變類,不能被繼承和例項化,這兩個類是我們今天分析的重點,為什麼說是重點呢,這裡講的內容是有關於AQS的,而FairSync
和NonfairSync
實現了抽象內部類Sync
,Sync
實現了AbstractQueuedSynchronizer
這個類,這個類就是我們說的AQS也是主要同步操作的類,下面我們來看一下公平模式和非公平模式下類的繼承關係,如下圖所示:
非公平模式:
公平模式:
通過上面兩個繼承關係UML來看其實無差別,差別在於內部實現的原理不一樣,回到上面例子中使用的是非公平模式,那先以非公平模式來進行分析,
假設第一個執行緒啟動呼叫AddDemo的add方法時,首先執行的事reentrantLock.lock()
方法,這個lock方法呼叫了sync.lock()
,sync就是我們上面提到的兩種模式的物件,來看一下原始碼內容:
public void lock() {
sync.lock();
}
內部呼叫了sync.lock()
,其實是呼叫了NonfairSync
物件的lock
方法,也就是下面的方法內容。
/**
* 非公平模式鎖
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* 執行鎖動作,先進行修改狀態,如果鎖被佔用則進行請求申請鎖,申請鎖失敗則將執行緒放到佇列中
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
// 繼承自AQS的tryAcquire方法,嘗試獲取鎖操作,這個方法會被AQS的acquire呼叫
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
我們看到lock
方法首先先對state
狀態進行修改操作,如果鎖沒有被佔用則獲取鎖,並設定當前執行緒獨佔鎖資源,如果嘗試獲取鎖失敗了,則進行acqurie
方法的呼叫,例子中第一個執行緒當嘗試獲取鎖是內部state
狀態為0
,進行修改操作的時候,發現鎖並沒有被佔用,則獲得鎖,此時我們來看一下內部變化的情況,如下圖所示:
此時只是將state
的狀態更新為1
,表示鎖已經被佔用了,獨佔鎖資源的執行緒是Thread0
,也就是exclusiveOwnerThread
的內容,頭節點和尾節點都沒有被初始化,當第二個執行緒嘗試去獲取鎖的時候,發現鎖已經被佔用了,因為上一個執行緒並沒有釋放鎖,所以第二執行緒直接獲取鎖時獲取失敗則進入到acquire
方法中,這個方法是AbstractQueuedSynchronizer
中的方法acquire
,先來看一下具體的實現原始碼如下所示:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
我個人理解acquire
方法不間斷的嘗試獲取鎖,如果鎖沒有獲取到則現將節點加入到佇列中,並將當前執行緒設定為獨佔鎖資源,也就是獨佔了鎖的意思,別的執行緒不能擁有鎖,然後如果當前節點的前節點是頭節點話,再去嘗試爭搶鎖,則設定當前節點為頭節點,並將原頭節點的下一個節點設定為null,幫助GC回收它,如果不是頭節點或爭搶鎖不成功,則會現將前面節點的狀態設定直到設定為SIGNAL
為止,代表下面有節點被等待了等待上一個執行緒發來的訊號,然後就掛起當前執行緒。
我們接下來慢慢一步一步的分析,我們先來看一下NonfairSync
中的tryAcquire
,如下所示:
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
它呼叫的是他的父類方法,也就是ReentrantLock
下Sync
中的nonfairTryAcquire
方法,這個方法主要就是去申請鎖的操作,來看一下具體原始碼:
final boolean nonfairTryAcquire(int acquires) { //首先是一個被final修飾的方法
final Thread current = Thread.currentThread(); //獲取當前執行緒
int c = getState(); //獲取state的狀態值
if (c == 0) { //如果狀態等於0代表執行緒沒有被佔用
if (compareAndSetState(0, acquires)) { //cas修改state值
setExclusiveOwnerThread(current); //設定當前執行緒為獨佔模式
return true;
}
}
else if (current == getExclusiveOwnerThread()) {//如果state狀態不等於0則先判斷是否是當前執行緒佔用鎖,如果是則進行下面的流程。
int nextc = c + acquires; //這個地方就說明重入鎖的原理,如果擁有鎖的是當前執行緒,則每次獲取鎖state值都會跟隨遞增
if (nextc < 0) // overflow //溢位了
throw new Error("Maximum lock count exceeded");
setState(nextc); //直接設定state值就可以不需要CAS
return true;
}
return false; //都不是就返回false
}
通過原始碼我們可以看到其實他是有三種操作邏輯:
- 如果
state
為0,則代表鎖沒有被佔用,嘗試去修改state狀態,並且將當前執行緒設定為獨佔鎖資源,表示獲得鎖成功 - 如果
state
大於0並且擁有鎖的執行緒和當前申請鎖的執行緒一致,則代表重入了鎖,state
值會進行遞增,表示獲得鎖成功 - 如果
state
大於0並且擁有鎖的執行緒和當前申請鎖的執行緒不一致則直接返回false,代表申請鎖失敗
當第二個執行緒去爭搶鎖的時候,state值已經設定為1了也就是已經被第一個執行緒佔用了鎖,所以這裡它會返回false,而通過acquire
方法內容可以看到if語句中是!tryAcquire(arg)
,也就是!false=ture
,它會進行acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
方法,這個方法裡面又有一個addWaiter
方法,從方法語義上能看到是新增等待佇列的操作,方法的引數代表的是模式,Node.EXCLUSIVE
表示的是在獨佔模式下等待,我們先來看一下addWaiter
裡面是如何進行操作,如下所示:
private Node addWaiter(Node mode) {
//首先生成當前執行緒擁有的節點
Node node = new Node(Thread.currentThread(), mode);
// 下面的內容是嘗試快速進行插入末尾的操作,在沒有其他執行緒同時操作的情況
Node pred = tail; //獲取尾節點
if (pred != null) { //尾節點不為空,代表隊列不為空
node.prev = pred; //尾節點設定為當前節點的前節點
if (compareAndSetTail(pred, node)) {//修改尾節點為當前節點
pred.next = node; //原尾節點的下一個節點設定為當前節點
return node; //返回node節點
}
}
enq(node); //如果前面入隊失敗,這裡進行迴圈入隊操作,直到入隊成功
return node;
}
前面程式碼中可以看到,它有一個快速入隊的操作,如果快速入隊失敗則進行死迴圈進行入隊操作,當然我們上面例子中發現佇列其實是為空的,也就是pred==null,不能進行快速入隊操作,則進入到enq
進行入隊操作,下面看一下enq
方法實現,如下所示:
private Node enq(final Node node) {
for (;;) { //死迴圈進行入隊操作,直到入隊成功
Node t = tail; //獲取尾節點
if (t == null) { // Must initialize //判斷尾節點為空,則必須先進行初始化
if (compareAndSetHead(new Node()))//生成一個Node,並將當前Node作為頭節點
tail = head; //head和tail同時指向上面Node節點
} else {
node.prev = t; //設定入隊的當前節點的前節點設定為尾節點
if (compareAndSetTail(t, node)) { //將當前節點設定為尾節點
t.next = node; //修改原有尾節點的下一個節點為當前節點
return t; //返回最新的節點
}
}
}
}
通過上面入隊操作,可以清晰的瞭解入隊操作其實就是Node節點的prev節點和next節點之前的引用,執行到這裡我們應該能看到入隊的狀態了,如下圖所示:
如上圖可以清晰的看到,此時擁有鎖的執行緒是Thread0,而當前執行緒是Threa1,頭節點為初始化的節點,Ref-707
引用地址所在的Node節點操作當前操作的節點資訊,入隊操作後並沒有完成,而是繼續往下進行,此時則進行acquireQueued
這個方法,這個方法是不間斷的去獲取已經入隊佇列中的前節點的狀態,如果前節點的狀態為大於0,則代表當前節點被取消了,會一直往前面的節點進行查詢,如果節點狀態小於0並且不等於SIGNAL
則將其設定為SIGNAL
狀態,設定成功後將當前執行緒掛起,掛起執行緒後也有可能會反覆喚醒掛起操作,原因後面會講到。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true; //取消節點標誌位
try {
boolean interrupted = false; //中斷標誌位
for (;;) {
final Node p = node.predecessor(); //獲取前節點
if (p == head && tryAcquire(arg)) { //這裡的邏輯是如果前節點為頭結點並且獲取到鎖則進行頭結點變換
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) && //設定waitStatus狀態
parkAndCheckInterrupt()) //掛起執行緒
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node); //取消操作
}
}
前面的原始碼可以看到它在acquireQueued
中對已經入隊的節點進行嘗試鎖的獲取,如果鎖獲得就修改頭節點的指標,如果不是頭節點或者爭搶鎖失敗時,此時會進入到shouldParkAfterFailedAcquire
方法,這個方法是獲取不到鎖時需要停止繼續無限期等待鎖,其實就是內部的操作邏輯也很簡單,就是如果前節點狀態為0
時,需要將前節點修改為SIGNAL
,如果前節點大於0
則代表前節點已經被取消了,應該移除佇列,並將前前節點作為當前節點的前節點,一直迴圈直到前節點狀態修改為SIGNAL
或者前節點被釋放鎖,當前節點獲取到鎖停止迴圈。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* 此節點已經設定了狀態,要求對當前節點進行掛起操作
*/
return true;
if (ws > 0) {
/*
* 如果前節點被取消,則將取消節點移除佇列操作
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus=0或者PROPAGATE時,表示當前節點還沒有被掛起停止,需要等待訊號來通知節點停止操作。
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
上面的方法其實很容易理解就是等待掛起訊號,如果前節點的狀態為0或PROPAGATE則將前節點修改為SIGNAL
,則代表後面前節點釋放鎖後會通知下一個節點,也就是說喚醒下一個可以喚醒的節點繼續爭搶所資源,如果前節點被取消了那就繼續往前尋找不是被取消的節點,這裡不會找到前節點為null的情況,因為它預設會有一個空的頭結點,也就是上圖內容,此時的佇列狀態是如何的我們看一下,這裡它會進來兩次,以為我們上圖可以看到當前節點前節點是Ref-724
此時waitStatus=0
,他需要先將狀態更改為SIGNAL
也就是執行最有一個else語句,此時又會回到外面的for迴圈中,由於方法返回的是false則不會執行parkAndCheckInterrupt
方法,而是又迴圈了一次,此時發現當前節點爭搶鎖又失敗了,然後此時佇列的狀態如下圖所示:
再次進入到方法之後發現前驅節點的waitStatus=-1,表示當前節點需要進行掛起等到,此時返回的結果是true,則會執行parkAndCheckInterrupt
方法,這個方法很簡單就是將當前執行緒進行掛起操作,如下所示:
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this); //掛起執行緒
return Thread.interrupted(); //判斷是否被中斷,獲取中斷標識
}
park
掛起執行緒並且響應中斷資訊,其實我們從這裡就能發現一個問題,Thread.interrupted方法是用來獲取是否被中斷的標誌,如果被中斷則返回true,如果沒有被中斷則返回false,噹噹前節點被中斷後,其實就會返回true,返回true這裡並沒有結束,而是跳到呼叫地方,也就是acquireQueued
方法內部:
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
以一個案例來進行分析:
public class ReentrantLockDemo {
public static void main(String[] args) throws Exception {
AddDemo runnalbeDemo = new AddDemo();
Thread thread = new Thread(runnalbeDemo::add);
thread.start();
Thread thread1 = new Thread(runnalbeDemo::add);
thread1.start();
Thread thread2 = new Thread(runnalbeDemo::add);
thread2.start();
Thread.sleep(10000);
thread1.interrupt();
System.out.println(runnalbeDemo.getCount());
}
private static class AddDemo {
private final AtomicInteger count = new AtomicInteger();
private final ReentrantLock reentrantLock = new ReentrantLock();
private final Condition condition = reentrantLock.newCondition();
private void add() {
try {
reentrantLock.lock();
count.getAndIncrement();
} finally {
// reentrantLock.unlock();
}
}
int getCount() {
return count.get();
}
}
}
通過上面的例子可以發現,thread1呼叫中斷方法interrupt(),當呼叫第一次方法的時候,它會進入到parkAndCheckInterrupt
方法,然後執行緒響應中斷,最後返回true,最後返回到acquireQueued
方法內部,整個if語句為true,則開始設定interrupted=true,僅僅是設定了等於true,但是這離還會進入下一輪的迴圈,假如說上次的執行緒沒有完成任務,則沒有獲取到鎖,還是會進入到shouldParkAfterFailedAcquire
由於已經修改了上一個節點的waitStatus=-1,直接返回true,然後再進入到parkAndCheckInterrupt
又被掛起執行緒,但是如果上步驟操作他正搶到鎖,則會返回ture,外面也會清除中斷標誌位,從這裡可以清楚地看到acquire
方法是一個不間斷獲得鎖的操作,可能重複阻塞和解除阻塞操作。
上面阻塞佇列的內容已經講完了,接下來我們看一下unlock都為我們做了什麼工作:
public void unlock() {
sync.release(1);
}
我們可以看到他直接呼叫了獨佔模式的release
方法,看一下具體原始碼:
public final boolean release(int arg) {
if (tryRelease(arg)) { //呼叫ReentrantLock中的Sync裡面的tryRelease方法
Node h = head; //獲取頭節點
if (h != null && h.waitStatus != 0) //頭節點不為空且狀態不為0時進行unpark方法
unparkSuccessor(h); //喚醒下一個未被取消的節點
return true;
}
return false;
}
release方法,首先先進行嘗試去釋放鎖,如果釋放鎖仍然被佔用則直接返回false,如果嘗試釋放鎖時,發現鎖已經釋放,當前執行緒不在佔用鎖資源時,則會進入的下面進行一些列操作後返回true,接下來我們先來看一下ReentrantLock
的Sync
下的tryRelease
方法,如下所示:
protected final boolean tryRelease(int releases) {
int c = getState() - releases; //獲取state狀態,標誌資訊減少1
if (Thread.currentThread() != getExclusiveOwnerThread()) //執行緒不一致丟擲異常
throw new IllegalMonitorStateException();
boolean free = false; //是否已經釋放鎖
if (c == 0) { //state=0時表示鎖已經釋放
free = true; //將標誌free設定為true
setExclusiveOwnerThread(null); //取消獨佔鎖資訊
}
setState(c); //設定鎖標誌資訊
return free;
}
看上面的原始碼,表示首先先獲取state
狀態,如果state
狀態減少1之後和0不相等則代表有重入鎖,則表示當前執行緒還在佔用所資源,直到執行緒釋放鎖返回ture標識,還是以上例子為主(此時AddDemo
中的unlock
不在被註釋),分析其現在的佇列中的狀態
釋放鎖後,進入到if語句中,判斷當前頭節點不為空且waitStatus!=0
,通過上圖也可以發現頭節點為-1,則進入到unparkSuccessor
方法內:
private void unparkSuccessor(Node node) {
/*
* 獲取節點的waitStatus狀態
*/
int ws = node.waitStatus;
// 如果小於0則設定為0
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* 喚醒下一個節點,喚醒下一個節點之前需要判斷節點是否存在或已經被取消了節點,如果沒有節點則不需喚醒操作,如果下一個節點被取消了則一直一個沒有被取消的節點。
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
可以看到它是現將頭節點的狀態更新為0,然後再喚醒下一個節點,如果下一個節點為空則直接返回不喚醒任何節點,如果下一個節點被取消了,那麼它會從尾節點往前進行遍歷,遍歷與頭節點最近的沒有被取消的節點進行喚醒操作,在喚醒前看一下佇列狀態:
然後喚醒節點後他會進入到parkAndCheckInterrupt
方法裡面,再次去執行下面的方法:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true; //取消節點標誌位
try {
boolean interrupted = false; //中斷標誌位
for (;;) {
final Node p = node.predecessor(); //獲取前節點
if (p == head && tryAcquire(arg)) { //這裡的邏輯是如果前節點為頭結點並且獲取到鎖則進行頭結點變換
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) && //設定waitStatus狀態
parkAndCheckInterrupt()) //掛起執行緒
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node); //取消操作
}
}
此時獲取p==head成立,並且可以正搶到所資源,所以它會進入到迴圈體內,進行設定頭結點為當前節點,前節點的下一個節點設定為null,返回中斷標誌,看一下此時佇列情況,如下圖所示:
AbstractQueuedSynchronizer
的獨佔模式其實提供了三種不同的形式進行獲取鎖操作,看一下下表所示:
方法名稱 | 方法描述 | 對應呼叫的內部方法 |
---|---|---|
acquire | 以獨佔模式進行不間斷的獲取鎖 | tryAcquire,acquireQueued |
acquireInterruptibly | 以獨佔模式相應中斷的方式獲取鎖,發生中斷丟擲異常 | tryAcquire,doAcquireInterruptibly |
tryAcquireNanos | 以獨佔模式相應中斷的方式並且在指定時間內獲取鎖,會阻塞一段時間,如果還未獲得鎖直接返回,發生中斷丟擲異常 | tryAcquire,doAcquireNanos |
通過上面圖可以發現,他都會呼叫圖表一中需要使用者實現的方法,ReentrantLock
實現了獨佔模式則內部實現的是tryAcquire
和tryRelease
方法,用來嘗試獲取鎖和嘗試釋放鎖的操作,其實上面內容我們用的是ReentrantLock
中的lock
方法作為同步器,細心的朋友會發現,這個lock
,方法是ReentrantLock實現的,它內部呼叫了acquire
方法,實現了不間斷的獲取鎖機制,ReentrantLock
中還有一個lockInterruptibly
方法,它內部直接呼叫的是AbstractQueuedSynchronizer
的acquireInterruptibly
方法,兩個之間的區別在於,兩者都會相應中斷資訊,前者不會做任何處理還會進入等待狀態,而後者則丟擲異常終止操作,
這裡為了詳細看清楚它內部關係我這裡用張圖來進行闡述,如下所示:
- 左側代表的事ReentrantLock,右側代表的AQS
- 左側內部黃色區域代表
NonfairSync
- 圖中1和2代表AQS呼叫其他方法的過程
接下來我們來看一下原始碼資訊:
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
發現他呼叫的Sync
類中的acquireInterruptibly
方法,但其實這個方法是AQS中的方法,原始碼如下所示:
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted()) //判斷執行緒是否被中斷
throw new InterruptedException(); //中斷則丟擲異常
if (!tryAcquire(arg)) //嘗試獲取鎖
doAcquireInterruptibly(arg); //進行新增佇列,並且修改前置節點狀態,且響應中斷丟擲異常
}
通過上面的原始碼,它也呼叫了子類實現的tryAcquire
方法,這個方法和我們上文提到的tryAcquire
是一樣,ReentrantLock
下的NonfairSync
下的tryAcquire
方法,這裡這個方法就不多說了詳細請看上文內容,這裡主要講一下doAcquireInterruptibly
這個方法:
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE); //將節點新增到佇列尾部
boolean failed = true; //失敗匹配機制
try {
for (;;) {
final Node p = node.predecessor(); //獲取前節點
if (p == head && tryAcquire(arg)) { //如果前節點為頭節點並且獲得了鎖
setHead(node); //設定當前節點為頭節點
p.next = null; // help GC //頭節點的下一個節點設定為null
failed = false; //匹配失敗變為false
return;
}
if (shouldParkAfterFailedAcquire(p, node) && //將前節點設定為-1,如果前節點為取消節點則往前一直尋找直到修改為-1為止。
parkAndCheckInterrupt()) //掛起執行緒返回是否中斷
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
其實這個方法和acquireQueued
區別在於以下幾點:
acquireQueued
是在方法內部新增節點到佇列尾部,而doAcquireInterruptibly
是在方法內部進行新增節點到尾部,這個區別點並不是很重要- 重點是
acquireQueued
響應中斷,但是他不會丟擲異常,而後者會丟擲異常throw new InterruptedException()
分析到這裡我們來用前面的例子來進行模擬一下中中斷的操作,詳細程式碼如下所示:
public class ReentrantLockDemo {
public static void main(String[] args) throws Exception {
AddDemo runnalbeDemo = new AddDemo();
Thread thread = new Thread(runnalbeDemo::add);
thread.start();
Thread.sleep(500);
Thread thread1 = new Thread(runnalbeDemo::add);
thread1.start();
Thread.sleep(500);
Thread thread2 = new Thread(runnalbeDemo::add);
thread2.start();
Thread.sleep(500);
Thread thread3 = new Thread(runnalbeDemo::add);
thread3.start();
Thread.sleep(10000);
thread1.interrupt();
System.out.println(runnalbeDemo.getCount());
}
private static class AddDemo {
private final AtomicInteger count = new AtomicInteger();
private final ReentrantLock reentrantLock = new ReentrantLock();
private final Condition condition = reentrantLock.newCondition();
private void add() {
try {
reentrantLock.lockInterruptibly();
count.getAndIncrement();
} catch (Exception ex) {
System.out.println("執行緒被中斷了");
} finally {
// reentrantLock.unlock();
}
}
int getCount() {
return count.get();
}
}
}
上面的例子其實和前面提到的例子沒有什麼太大的差別主要的差別是將lock
替換為lockInterruptibly
,其次就是在三個執行緒後面講執行緒1進行中斷操作,這裡入隊的操作不在多說,因為操作內容和上面大致相同,下面是四個個執行緒操作完成的狀態資訊:
如果執行緒等待的過程中丟擲異常,則當前執行緒進入到finally中的時候failed為true,因為修改該欄位只有獲取到鎖的時候才會修改為false,進來之後它會執行cancelAcquire
來進行取消當前節點,下面我們先來分析下原始碼內容:
private void cancelAcquire(Node node) {
// 如果節點為空直接返回,節點不存在直接返回
if (node == null)
return;
// 設定節點所在的執行緒為空,清除執行緒操作
node.thread = null;
// 獲取當前節點的前節點
Node pred = node.prev;
// 如果前節點是取消節點則跳過前節點,一直尋找一個不是取消節點為止
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// 獲取頭節點下一個節點
Node predNext = pred.next;
// 這裡直接設定為取消節點狀態,沒有使用CAS原因是因為直接設定只有其他執行緒可以跳過取消的節點
node.waitStatus = Node.CANCELLED;
// 如果當前節點為尾節點,並且設定尾節點為找到的合適的前節點時,修改前節點的下一個節點為null
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
// 如果不是尾節點,則說明是中間節點,則需要通知後續節點,嘿,夥計你被喚醒了。
int ws;
if (pred != head && //前節點不是頭結點
((ws = pred.waitStatus) == Node.SIGNAL || // 前節點的狀態為SIGNAL
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) //或者前節點狀態小於0而且修改前節點狀態為SIGNAL成功
&& pred.thread != null) { //前節點執行緒不為空
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
//喚醒下一個不是取消的節點
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
- 首先找到當前節點的前節點,如果前節點為取消節點則一直往前尋找一個節點。
- 取消的是尾節點,則直接將前節點的下一個節點設定為null
- 如果取消的是頭節點的下一個節點,且不是尾節點的情況時,它是喚醒下一個節點,喚醒之前並沒有將其移除佇列,而是在喚醒下一個節點的時候,
shouldParkAfterFailedAcquire
裡面將取消的節點移除佇列,喚醒之後,當前節點的下一個節點也設定成自己,幫助GC回收它。 - 如果取消節點是中間的節點,則直接將其前節點的下一個節點設定為取消節點的下下個節點即可。
第一種情況如果我們取消的節點是前節點是頭節點,此時執行緒1的節點應該是被中斷操作,此時進入到cancelAcquire
之後會進入else語句中,然後進去到unparkSuccessor
方法,當進入到這個方法之前我們看一下狀態變化:
我們發現執行緒1的Node節點的waitStatus變為1也就是Node.CANCELLED
節點,然後執行unparkSuccessor
方法,該方法上面就已經講述了其中的原始碼,這裡就不在貼原始碼了,就是要喚醒下一個沒有被取消的節點,這裡是Ref-695
這個執行緒,當Ref-695
被喚醒之後它會繼續執行下面的內容:
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) { //再一次迴圈發現還是沒有爭搶到鎖
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) && //再一次迴圈之後有執行到這裡了
parkAndCheckInterrupt()) //這裡被喚醒了,又要進行迴圈操作了
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
發現再一次迴圈操作後,還是沒有正搶到鎖,這時候還是會執行shouldParkAfterFailedAcquire
方法,這個方法內部發現前節點的狀態是Node.CANCELLED
這時候它會在內部先將節點給幹掉,也就是這個程式碼:
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
}
最後還是會被掛起狀態,因為沒有釋放鎖操作,最後移除的節點如下所示:
如果取消的事尾節點,也就是執行緒3被中斷操作,這個是比較簡單的直接將尾節點刪除即可,其中會走如下程式碼:
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
}
如果取消的節點是中間的節點,通過上例子中則是取消執行緒2,其實它內部只是將執行緒取消執行緒的前節點的下一個節點指向了取消節點的下節點,如下圖所示:
結束語
這章節分析的主要是ReentrantLock
的內部原理,本來公平模式和非公平模式想放在一起來寫,無奈發現篇幅有點長了,所以就分開進行寫,這樣讀取來不會那麼費勁,內部還有條件內容等待下章節分析,如果有分析不到位的請大家指正