JUC的AQS學習-ReentrantLock原始碼分析
什麼是AQS
AQS是JUC裡併發控制一個很重要的基礎,他提供了一個獨佔和共享訪問控制某個狀態的工具,JUC裡的鎖、訊號量、門閥都是基於AQS實現的,單獨去看AQS的程式碼很難理解,而且很難理解其設計之精妙,筆者計劃依次分析鎖(ReentrantLock)、門閥(CountDownLatch)、訊號量(Semaphor)、讀寫鎖(ReadWriteLock)來解析AQS的設計細節。
如果是我怎麼實現
在看ReentrantLock程式碼之前,我們試想一下如果是我們自己去實現要如何做?忽略可重入、公平等鎖的特性,需求具體來說有以下兩點:
1. 多個執行緒同時請求鎖時只有一個執行緒會取得鎖,其他執行緒進入佇列等待
2. 鎖釋放時會通知其他等待佇列中的執行緒去獲取鎖
所以如果我們自己去實現的話翻譯成技術語言需要有以下幾個基礎設施
1. 決定誰獲得鎖的競爭機制,這個很容易想到用CAS去實現
2. 記錄鎖狀態,這個可以用一個變量表示鎖計數,一個變量表示當前獲取鎖的執行緒(不考慮可重入其實非必要)
3. 鎖釋放時通知佇列中第一個執行緒並使其獲得鎖
看起來也不難對不對?讓我們一起來看看道格李是怎麼優雅地實現這些功能的吧
原始碼分析
lock方法
我們先看lock的程式碼,忽略簡單的巢狀呼叫,lock方法實際呼叫的是AQS的acquire方法:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
再往下看tryAcquire方法,這個方法AQS裡沒有實現,直接丟擲了異常,這麼做是避免子類實現所有介面,我們看java.util.concurrent.locks.ReentrantLock.FairSync這個AQS子類的實現
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// c=0 說明沒有其他執行緒佔有鎖
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
// 佇列中沒有其他執行緒在等待鎖,而且CAS把state設定成入參的值成功,這裡是1(這裡的CAS就是我
// 們前文提的併發競爭機制),則當前執行緒獲取鎖成功並將owner執行緒設定為當前執行緒
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
// 可重入設定,當前執行緒重複請求鎖成功,只是增加請求鎖的計數
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
再看acquire方法如果tryAcquire成功了就直接返回,不用執行後面的程式碼,如果tryAcquire失敗了就執行acquireQueued(addWaiter(Node.EXCLUSIVE), arg)),我們先看addWaiter方法,這個方法是把當前請求放到佇列中:
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
// 上面這個官方註釋很直白,其實下面的enq方法裡也執行了這段程式碼,但是這裡先直接試一下看能
// 否插入成功
Node pred = tail;
if (pred != null) {
node.prev = pred;
// CAS把tail設定成當前節點,如果成功的話就說明插入成功,直接返回node,失敗說明有其他執行緒也
// 在嘗試插入而且其他執行緒成功,如果是這樣就繼續執行enq方法
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
繼續看enq方法:
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
// 最開始head和tail都是空的,需要通過CAS做初始化,如果CAS失敗,則迴圈重新檢查tail
if (compareAndSetHead(new Node()))
tail = head;
} else {
// head和tail不是空的,說明已經完成初始化,和addWaiter方法的上半段一樣,CAS修改
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
將當前請求鎖失敗的節點插入到佇列中之後還執行了acquireQueued方法,因為我們執行插入佇列之後還沒有阻塞當前執行緒呢:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
/*
* 如果前置節點是head,說明當前節點是佇列第一個等待的節點,這時去嘗試獲取鎖,如果成功了則
* 獲取鎖成功。這裡有的同學可能沒看懂,不是剛嘗試失敗並插入隊列了嗎,咋又嘗試獲取鎖? 其實這*
* 裡是個迴圈,其他剛被喚醒的執行緒也會執行到這個程式碼
*/
if (p == head && tryAcquire(arg)) {
// 隊首且獲取鎖成功,把當前節點設定成head,下一個節點成了等待佇列的隊首
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
/* shouldParkAfterFailedAcquire方法判斷如果獲取鎖失敗是否需要阻塞,如果需要的話就執行
* parkAndCheckInterrupt方法,如果不需要就繼續迴圈
*/
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
下面看一下shouldParkAfterFailedAcquire方法:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 獲取pred前置節點的等待狀態
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
/* 前置節點狀態是signal,那當前節點可以安全阻塞,因為前置節點承諾執行完之後會通知喚醒當前
* 節點
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
// 前置節點如果已經被取消了,則一直往前遍歷直到前置節點不是取消狀態,與此同時會修改連結串列關係
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
// 前置節點是0或者propagate狀態,這裡通過CAS把前置節點狀態改成signal
// 這裡不返回true讓當前節點阻塞,而是返回false,目的是讓呼叫者再check一下當前執行緒是否能
// 成功獲取鎖,失敗的話再阻塞,這裡說實話我也不是特別理解這麼做的原因
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
假設前面一步返回true需要阻塞,則會呼叫parkAndCheckInterrupt進行阻塞
private final boolean parkAndCheckInterrupt() {
// 阻塞當前執行緒,監事是當前sync物件
LockSupport.park(this);
// 阻塞返回後,返回當前執行緒是否被中斷
return Thread.interrupted();
}
park方法
public static void park(Object blocker) {
Thread t = Thread.currentThread();
// 設定當前執行緒的監視器blocker
setBlocker(t, blocker);
// 這裡呼叫了native方法到JVM級別的阻塞機制阻塞當前執行緒
UNSAFE.park(false, 0L);
// 阻塞結束後把blocker置空
setBlocker(t, null);
}
至此,一次lock的呼叫就完成了,總結來說:
- 呼叫tryAcquire方法嘗試獲取鎖,獲取成功的話修改state並直接返回true,獲取失敗的話把當前執行緒加到等待佇列中
- 加到等待佇列之後先檢查前置節點狀態是否是signal,如果是的話直接阻塞當前執行緒等待喚醒,如果不是的話判斷是否是cancel狀態,是cancel狀態就往前遍歷並把cancel狀態的節點從佇列中刪除。如果狀態是0或者propagate的話將其修改成signal
- 阻塞被喚醒之後如果是隊首並且嘗試獲取鎖成功就返回true,否則就繼續執行前一步的程式碼進入阻塞
unlock方法
看完了lock方法再來看unlock方法,同樣unlock方法呼叫的就是AQS的release方法
public final boolean release(int arg) {
/*
嘗試釋放鎖如果失敗,直接返回失敗,如果成功並且head的狀態不等於0就喚醒後面等待的節點
*/
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
我們先看tryRelease方法:
protected final boolean tryRelease(int releases) {
// 釋放後c的狀態值
int c = getState() - releases;
// 如果持有鎖的執行緒不是當前執行緒,直接丟擲異常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
// 如果c==0,說明所有持有鎖都釋放完了,其他執行緒可以請求獲取鎖
free = true;
setExclusiveOwnerThread(null);
}
// 這裡只會有一個執行緒執行到這,不存在競爭,因此不需要CAS
setState(c);
return free;
}
再看看unparkSuccessor方法:
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
/*
如果狀態小於0,把狀態改成0,0是空的狀態,因為node這個節點的執行緒釋放了鎖後續不需要做任何
操作,不需要這個標誌位,即便CAS修改失敗了也沒關係,其實這裡如果只是對於鎖來說根本不需要CAS,因為這個方法只會被釋放鎖的執行緒訪問,只不過unparkSuccessor這個方法是AQS裡的方法就必須考慮到多個執行緒同時訪問的情況(可能共享鎖或者訊號量這種)
*/
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
// 這段程式碼的作用是如果下一個節點為空或者下一個節點的狀態>0(目前大於0就是取消狀態)
// 則從tail節點開始遍歷找到離當前節點最近的且waitStatus<=0(即非取消狀態)的節點並喚醒
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
至此,一次unlock的呼叫完成了,總結來說:
1. 修改狀態位
2. 喚醒排隊的節點
3. 結合lock方法,被喚醒的節點會自動替換當前節點成為head
總結
總的來說,用AQS來實現ReentrantLock還是比較簡單,因為互斥地訪問,不會存在太多併發訪問某個方法的場景,只需要處理好請求鎖競爭和釋放鎖的過程就可以了。後面筆者會繼續分析較為複雜的Semaphore訊號量、CountDownLatch、ReadWriteLock