AbstractQueuedSynchronizer(1)原始碼解析(jdk1.8)
AbstractQueuedSynchronizer原始碼解析(簡稱AQS)
1.執行流程簡介
- 第一個執行緒呼叫 reentrantLock.lock(),翻到最前面可以發現,tryAcquire(1) 直接就返回 true了,結束。只是設定了 state=1(連 head 都沒有初始化,更談不上什麼阻塞佇列了)要是執行緒 1 呼叫 unlock()了,才有執行緒 2 來,那世界就太太太平了,完全沒有交集嘛,那我還要 AQS 幹嘛。
如果執行緒 1 沒有呼叫 unlock() 之前,執行緒 2 呼叫了 lock(), 想想會發生什麼?
執行緒 2 會初始化 head【new Node()】,同時執行緒 2 也會插入到阻塞佇列(雙向連結串列
)並掛起 (注意看這裡是一個for 死迴圈自旋入列,而且設定 head 和 tail 的部分是不 return 的,只有入隊成功才會跳出迴圈)
- 首先,是執行緒 2 初始化 head 節點,此時 headtail, waitStatus0
- 然後,執行緒 2 入隊同時我們也要看此時節點的 waitStatus,我們知道 head 節點是執行緒 2 初始化的,此時的 waitStatus 沒有設定, java 預設會設定為 0,但是到 shouldParkAfterFailedAcquire 這個方法的時候,執行緒 2 會把前驅節點,也就是 head 的waitStatus設定為-1。那執行緒 2 節點此時的 waitStatus 是多少呢,由於沒有設定,所以是 0
- 同時我們也要看此時節點的 waitStatus,我們知道 head 節點是執行緒 2 初始化的,此時的 waitStatus 沒有設定, java 預設會設定為 0,但是到 shouldParkAfterFailedAcquire 這個方法的時候,執行緒 2 會把前驅節點,也就是 head 的waitStatus設定為-1。那執行緒 2 節點此時的 waitStatus 是多少呢,由於沒有設定,所以是 0;
2.程式碼分析
AQS的結構圖
//Node類可以看成是執行緒
static final class Node
// 頭結點,你直接把它當做 當前持有鎖的執行緒 可能是最好理解的
private transient volatile Node head;
// 阻塞的尾節點,每個新的節點進來,都插入到最後,也就形成了一個隱視的連結串列
private transient volatile Node tail;
// 這個是最重要的,不過也是最簡單的,代表當前鎖的狀態,0代表沒有被佔用,大於0代表有執行緒持有當前鎖
// 之所以說大於0,而不是等於1,是因為鎖可以重入嘛,每次重入都加上1
private volatile int state;
// 代表當前持有獨佔鎖的執行緒,舉個最重要的使用例子,因為鎖可以重入
// reentrantLock.lock()可以巢狀呼叫多次,所以每次用這個來判斷當前執行緒是否已經擁有了鎖
// if (currentThread == getExclusiveOwnerThread()) {state++}
//繼承自AbstractOwnableSynchronizer
private transient Thread exclusiveOwnerThread;
等待佇列中每個執行緒被包裝成一個 node,資料結構是連結串列。
thread + waitStatus + pre + next 四個屬性
static final class Node {
/** Marker to indicate a node is waiting in shared mode */
// 標識節點當前在共享模式下
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
// 標識節點當前在獨佔模式下
static final Node EXCLUSIVE = null;
// ======== 下面的幾個int常量是給waitStatus用的 ===========
/** waitStatus value to indicate thread has cancelled */
// 程式碼此執行緒取消了爭搶這個鎖
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
// 官方的描述是,其表示當前node的後繼節點對應的執行緒需要被喚醒
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
// 本文不分析condition,所以略過吧,下一篇文章會介紹這個
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
// 同樣的不分析,略過吧
static final int PROPAGATE = -3;
// =====================================================
// 取值為上面的1、-1、-2、-3,或者0(以後會講到)
// 這麼理解,暫時只需要知道如果這個值 大於0 代表此執行緒取消了等待,
// 也許就是說半天搶不到鎖,不搶了,ReentrantLock是可以指定timeouot的。。。
volatile int waitStatus;
// 前驅節點的引用
volatile Node prev;
// 後繼節點的引用
volatile Node next;
// 這個就是執行緒本尊
volatile Thread thread;
}
分析AQS加鎖和解鎖的程式碼
ReentranLock的lock方法和unlock方法最終呼叫的是
AQS加鎖和解鎖方法
ReentranLock的內部類Sync 有兩個實現,分別為 NonfairSync(非公平鎖)和 FairSync(公平鎖),我們看 FairSync 部分。
1.FairSync(公平鎖)
下面是函式呼叫流程圖
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
// 爭鎖
final void lock() {
//該方法為AQS的方法
acquire(1);
}
acquire方法解析
// 我們看到,這個方法,如果tryAcquire(arg) 返回true, 也就結束了。
// 否則,acquireQueued方法會將執行緒壓到佇列中
public final void acquire(int arg) { // 此時 arg == 1
// 首先呼叫tryAcquire(1)一下,名字上就知道,這個只是試一試
// 因為有可能直接就成功了呢,也就不需要進佇列排隊了,
// 對於公平鎖的語義就是:本來就沒人持有鎖,根本沒必要進佇列等待(又是掛起,又是等待被喚醒的)
if (!tryAcquire(arg) &&
// tryAcquire(arg)沒有成功,這個時候需要把當前執行緒掛起,放到阻塞佇列中。
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) {
selfInterrupt();
}
}
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
// 嘗試直接獲取鎖,返回值是boolean,代表是否獲取到鎖
// 返回true:1.沒有執行緒在等待鎖;2.重入鎖,執行緒本來就持有鎖,也就可以理所當然可以直接獲取
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// state == 0 此時此刻沒有執行緒持有鎖
if (c == 0) {
// 雖然此時此刻鎖是可以用的,但是這是公平鎖,既然是公平,就得講究先來後到,
// 看看有沒有別人在佇列中等了半天了
if (!hasQueuedPredecessors() &&
// 如果沒有執行緒在等待,那就用CAS嘗試一下,成功了就獲取到鎖了,
// 不成功的話,只能說明一個問題,就在剛剛幾乎同一時刻有個執行緒搶先了 =_=
// 因為剛剛還沒人的,我判斷過了(因為status為0)
compareAndSetState(0, acquires)) {
// 到這裡就是獲取到鎖了,標記一下,告訴大家,現在是我佔用了鎖
setExclusiveOwnerThread(current);
return true;
}
}
// 會進入這個else if分支,說明是重入了,需要操作:state=state+1
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
// 如果到這裡,說明前面的if和else if都沒有返回true,說明沒有獲取到鎖
// 回到上面一個外層呼叫方法繼續看:
// if (!tryAcquire(arg)
// && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// selfInterrupt();
return false;
}
// 假設tryAcquire(arg) 返回false,那麼程式碼將執行:
// acquireQueued(addWaiter(Node.EXCLUSIVE), arg),
// 這個方法,首先需要執行:addWaiter(Node.EXCLUSIVE)
/**
* Creates and enqueues node for current thread and given mode.
*
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/
// 此方法的作用是把執行緒包裝成node,同時進入到佇列中
// 引數mode此時是Node.EXCLUSIVE,代表獨佔模式
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
// 以下幾行程式碼想把當前node加到連結串列的最後面去,也就是進到阻塞佇列的最後
Node pred = tail;
// tail!=null => 佇列不為空(tail==head的時候,其實佇列是空的,不過不管這個吧)
if (pred != null) {
// 設定自己的前驅 為當前的隊尾節點
node.prev = pred;
// 用CAS把自己設定為隊尾, 如果成功後,tail == node了
if (compareAndSetTail(pred, node)) {
// 進到這裡說明設定成功,當前node==tail, 將自己與之前的隊尾相連,
// 上面已經有 node.prev = pred
// 加上下面這句,也就實現了和之前的尾節點雙向連線了
pred.next = node;
// 執行緒入隊了,可以返回了
return node;
}
}
// 仔細看看上面的程式碼,如果會到這裡,
// 說明 pred==null(佇列是空的) 或者 CAS失敗(有執行緒在競爭入隊)
// 讀者一定要跟上思路,如果沒有跟上,建議先不要往下讀了,往回仔細看,否則會浪費時間的
enq(node);
return node;
}
/**
* Inserts node into queue, initializing if necessary. See picture above.
* @param node the node to insert
* @return node's predecessor
*/
// 採用自旋的方式入隊
// 之前說過,到這個方法只有兩種可能:等待佇列為空,或者有執行緒競爭入隊,
// 自旋在這邊的語義是:CAS設定tail過程中,競爭一次競爭不到,我就多次競爭,總會排到的
private Node enq(final Node node) {
for (;;) {
Node t = tail;
// 之前說過,佇列為空也會進來這裡
if (t == null) { // Must initialize
// 初始化head節點
// 細心的讀者會知道原來head和tail初始化的時候都是null,反正我不細心
// 還是一步CAS,你懂的,現在可能是很多執行緒同時進來呢
if (compareAndSetHead(new Node()))
// 給後面用:這個時候head節點的waitStatus==0, 看new Node()構造方法就知道了
// 這個時候有了head,但是tail還是null,設定一下,
// 把tail指向head,放心,馬上就有執行緒要來了,到時候tail就要被搶了
// 注意:這裡只是設定了tail=head,這裡可沒return哦,沒有return,沒有return
// 所以,設定完了以後,繼續for迴圈,下次就到下面的else分支了
//相當於初始化head,並將tail設定為head
tail = head;
} else {
// 下面幾行,和上一個方法 addWaiter 是一樣的,
// 只是這個套在無限迴圈裡,反正就是將當前執行緒排到隊尾,有執行緒競爭的話排不上重複排
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
// 現在,又回到這段程式碼了
// if (!tryAcquire(arg)
// && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// selfInterrupt();
// 下面這個方法,引數node,經過addWaiter(Node.EXCLUSIVE),此時已經進入阻塞佇列
// 注意一下:如果acquireQueued(addWaiter(Node.EXCLUSIVE), arg))返回true的話,
// 意味著上面這段程式碼將進入selfInterrupt(),所以正常情況下,下面應該返回false
// 這個方法非常重要,應該說真正的執行緒掛起,然後被喚醒後去獲取鎖,都在這個方法裡了
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
// p == head 說明當前節點雖然進到了阻塞佇列,但是是阻塞佇列的第一個,因為它的前驅是head
// 注意,阻塞佇列不包含head節點,head一般指的是佔有鎖的執行緒,head後面的才稱為阻塞佇列
// 所以當前節點可以去試搶一下鎖
// 這裡我們說一下,為什麼可以去試試:
// 首先,它是隊頭,這個是第一個條件,其次,當前的head有可能是剛剛初始化的node,
// enq(node) 方法裡面有提到,head是延時初始化的,而且new Node()的時候沒有設定任何執行緒
// 也就是說,當前的head不屬於任何一個執行緒,所以作為隊頭,可以去試一試,
// tryAcquire已經分析過了, 忘記了請往前看一下,就是簡單用CAS試操作一下state
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 到這裡,說明上面的if分支沒有成功,要麼當前node本來就不是隊頭,
// 要麼就是tryAcquire(arg)沒有搶贏別人,繼續往下看
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
/**
* Checks and updates status for a node that failed to acquire.
* Returns true if thread should block. This is the main signal
* control in all acquire loops. Requires that pred == node.prev
*
* @param pred node's predecessor holding status
* @param node the node
* @return {@code true} if thread should block
*/
// 剛剛說過,會到這裡就是沒有搶到鎖唄,這個方法說的是:"當前執行緒沒有搶到鎖,是否需要掛起當前執行緒?"
// 第一個引數是前驅節點,第二個引數才是代表當前執行緒的節點
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
// 前驅節點的 waitStatus == -1 ,說明前驅節點狀態正常,當前執行緒需要掛起,直接可以返回true(然後將執行緒掛起)
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
// 前驅節點 waitStatus大於0 ,之前說過,大於0 說明前驅節點取消了排隊。這裡需要知道這點:
// 進入阻塞佇列排隊的執行緒會被掛起,而喚醒的操作是由前驅節點完成的。
// 所以下面這塊程式碼說的是將當前節點的prev指向waitStatus<=0的節點,
// 簡單說,就是為了找個好爹,因為你還得依賴它來喚醒呢,如果前驅節點取消了排隊,
// 找前驅節點的前驅節點做爹,往前迴圈總能找到一個好爹的
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
// 仔細想想,如果進入到這個分支意味著什麼
// 前驅節點的waitStatus不等於-1和1,那也就是隻可能是0,-2,-3
// 在我們前面的原始碼中,都沒有看到有設定waitStatus的,所以每個新的node入隊時,waitStatu都是0() 用CAS將前驅節點的waitStatus設定為Node.SIGNAL(也就是-1)
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
// private static boolean shouldParkAfterFailedAcquire(Node pred, Node node)
// 這個方法結束根據返回值我們簡單分析下:
// 如果返回true, 說明前驅節點的waitStatus==-1,是正常情況,那麼當前執行緒需要被掛起,等待以後被喚醒
// 我們也說過,以後是被前驅節點喚醒,就等著前驅節點拿到鎖,然後釋放鎖的時候叫你好了
// 如果返回false, 說明當前不需要被掛起,為什麼呢?往後看
// 跳回到前面是這個方法
// if (shouldParkAfterFailedAcquire(p, node) &&
// parkAndCheckInterrupt())
// interrupted = true;
// 1. 如果shouldParkAfterFailedAcquire(p, node)返回true,
// 那麼需要執行parkAndCheckInterrupt():
// 這個方法很簡單,因為前面返回true,所以需要掛起執行緒,這個方法就是負責掛起執行緒的
// 這裡用了LockSupport.park(this)來掛起執行緒,然後就停在這裡了,等待被喚醒=======
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
// 2. 接下來說說如果shouldParkAfterFailedAcquire(p, node)返回false的情況
// 仔細看shouldParkAfterFailedAcquire(p, node),我們可以發現,其實第一次進來的時候,一般都不會返回true的,原因很簡單,前驅節點的waitStatus=-1是依賴於後繼節點設定的。也就是說,我都還沒給前驅設定-1呢,怎麼可能是true呢,但是要看到,這個方法是套在迴圈裡的,所以第二次進來的時候狀態就是-1了。
// 解釋下為什麼shouldParkAfterFailedAcquire(p, node)返回false的時候不直接掛起執行緒:
// => 是為了應對在經過這個方法後,node已經是head的直接後繼節點了。剩下的讀者自己想想吧。
}
(重點方法)多看幾遍 final boolean acquireQueued(final Node node, int arg)
解鎖操作
介紹下喚醒的動作了。我們知道,正常情況下,如果執行緒沒獲取到鎖,執行緒會被 LockSupport.park(this); 掛起停止,等待被喚醒。
// 喚醒的程式碼還是比較簡單的,你如果上面加鎖的都看懂了,下面都不需要看就知道怎麼回事了
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
// 往後看吧
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
// 回到ReentrantLock看tryRelease方法
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
// 是否完全釋放鎖
boolean free = false;
// 其實就是重入的問題,如果c==0,也就是說沒有巢狀鎖了,可以釋放了,否則還不能釋放掉
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
/**
* Wakes up node's successor, if one exists.
*
* @param node the node
*/
// 喚醒後繼節點
// 從上面呼叫處知道,引數node是head頭結點
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
// 如果head節點當前waitStatus<0, 將其修改為0
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
// 下面的程式碼就是喚醒後繼節點,但是有可能後繼節點取消了等待(waitStatus==1)
// 從隊尾往前找,找到waitStatus<=0的所有節點中排在最前面的
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
// 從後往前找,仔細看程式碼,不必擔心中間有節點取消(waitStatus==1)的情況
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
// 喚醒執行緒
LockSupport.unpark(s.thread);
}
喚醒執行緒以後,被喚醒的執行緒將從以下程式碼中繼續往前走:
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this); // 剛剛執行緒被掛起在這裡了
return Thread.interrupted();
}
// 又回到這個方法了:acquireQueued(final Node node, int arg),這個時候,node的前驅是head了
2.NonfairSync非公平鎖
公平鎖和非公平鎖只有兩處不同:
1.非公平鎖在呼叫 lock 後,首先就會呼叫 CAS 進行一次搶鎖,如果這個時候恰巧鎖沒有被佔用,那麼直接就獲取到鎖返回了。
2.非公平鎖在 CAS 失敗後,和公平鎖一樣都會進入到 tryAcquire 方法,在 tryAcquire 方法中,如果發現鎖這個時候被釋放了(state == 0),非公平鎖會直接 CAS
搶鎖,但是公平鎖會判斷等待佇列是否有執行緒處於等待狀態,如果有則不去搶鎖,乖乖排到後面。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
/**
* Performs non-fair tryLock. tryAcquire is implemented in
* subclasses, but both need nonfair try for trylock method.
*/
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
總結
在併發環境下,加鎖和解鎖需要以下三個部件的協調:
**1.鎖狀態。**我們要知道鎖是不是被別的執行緒佔有了,這個就是 state 的作用,它為 0 的時候代表沒有執行緒佔有鎖,可以去爭搶這個鎖,用 CAS 將 state 設為 1,如果 CAS 成功,說明搶到了鎖,這樣其他執行緒就搶不到了,如果鎖重入的話,state進行+1 就可以,解鎖就是減 1,直到 state 又變為 0,代表釋放鎖,所以 lock() 和 unlock() 必須要配對啊。然後喚醒等待佇列中的第一個執行緒,讓其來佔有鎖。
2.執行緒的阻塞和解除阻塞。AQS 中採用了 LockSupport.park(thread) 來掛起執行緒,用 unpark 來喚醒執行緒。
3.阻塞佇列。因為爭搶鎖的執行緒可能很多,但是隻能有一個執行緒拿到鎖,其他的執行緒都必須等待,這個時候就需要一個 queue 來管理這些執行緒,AQS 用的是一個 FIFO 的佇列,就是一個連結串列,每個 node 都持有後繼節點的引用。AQS 採用了 CLH 鎖的變體來實現,