java併發:AQS獨佔鎖原始碼詳解
說明:
AQS是併發包的基石。它有兩種模式:獨佔模式和共享模式。本篇只說獨佔模式。
什麼是獨佔模式?就相當於lock的鎖只有一把,一條執行緒佔用,其他執行緒就得處於BLOCK或者WAIT狀態。
在AQS裡,獲取的方法就是:
Acquire()
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
這個方法比較噁心,拆解成下面的可視性比較強的。
boolean acquire;//是否獲取成功 boolean putQueue;//放入佇列等行為是否成功 acquire = tryAcquire(arg);//1.嘗試去獲取 if(!acquire ){//如果沒有獲取到 Node node = addWaiter(Node.EXCLUSIVE);//2.新建個節點。 putQueue= acquireQueued(node,arg);//3.把節點放入佇列,並且阻塞佇列。 if (putQueue){ selfInterrupt();//4.中斷執行緒。 } }
依次檢視方法:
tryAcquire()
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
這個方法什麼都沒有幹,就拋了個異常。按照其他博主的說明是,AQS是一個抽象類,嘗試去獲取的方法應該由子類自己去實現,按理說本方法應該寫成抽象的。但是由於AQS有獨佔模式和共享模式,那麼獨佔模式的tryAcquire的寫成抽象方法,共享模式的tryAcquireShared也得寫成抽象的。但是對於繼承者來說只需要其中一樣功能即可。所以作者寫成了可繼承的類似於介面卡的模式。
addWaiter();
這個方法傳來了一個Node.EXCLUSIVE = null的一個引數。
得說下Node,就是執行緒的節點,是AQS的內部類,他們之間的結構就相當於連結串列結構。
AQS本身有兩個屬性,分別是Node head和Node tail。也就是節點頭和節點尾的意思。
而Node自身也有兩個屬性,即Node pre和Node net,也就是上個節點和下個節點的意思。
這樣的話,這樣一串有頭有尾的節點就形成了。
Node也有個Thread欄位,作為引數傳入了構造方法。這樣一連串的Node就相當於一連串的有序執行緒。
Node 還有個屬性是status,在獨佔模式下它有幾個有用的值。
當Node 當status分別為下列值時,則該Node的執行緒代表著下列意思。
1. 預設值 int = 0;//說明本執行緒節點根本未處理,還在初始化的狀態
//執行緒已超時或取消。
2. static final int CANCELLED = 1;
//代表著該節點的下一個節點是阻塞狀態.可以想象認為整個正常執行緒節點串都是SIGNAL狀態
3. static final int SIGNAL = -1;
具體還是看看addWaiter程式碼怎麼實現的。
private Node addWaiter(Node mode) {
//新建節點,把當前執行緒塞進去。
Node node = new Node(Thread.currentThread(), mode);
// 獲取節點串的最後一個
Node pred = tail;
if (pred != null) {
//如果有最後一個,說明這個節點串裡已經有存在節點了,那麼把新建的節點的前置節點設定為節點串的最後一個。
node.prev = pred;
//這個應該是CAS自旋方法吧,就是防止有多個執行緒同時設定為節點串的最後一個。可以按加鎖理解。
if (compareAndSetTail(pred, node)) {
//然後把本來節點串的最後一個節點的後置節點設定為新建的那個節點。
pred.next = node;
return node;
}
}
enq(node);
return node;
}
這裡最後還有個enq方法。看下
//在上個程式碼片裡可以看到,該方法的執行是有前置條件的。
//條件是:節點串沒有最後一個,也就是節點串是空的。
/*那麼依據前置條件該方法的意思是:
第一次迴圈:
如果節點串是空的,就建立一個空節點(沒有放執行緒進去)然後放到首位。
第二次迴圈:
把當前節點追加到空節點後面*/
private Node enq(final Node node) {
for (;;) {
/
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
acquireQueued()
//這個方法主要是處理節點串的頭部和後面串的。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//獲取當前節點的前置節點。
final Node p = node.predecessor();
//判斷前置節點是頭嗎?且還要執行一遍獲取資源的操作。
//根據上面的程式碼我們很清楚了,如果節點串本身是空的,那麼會在頭部建立一個空節點。
//那麼當前節點的前置節點正好是頭節點,符合這個判斷,進入本方法。
if (p == head && tryAcquire(arg)) {
//把頭部設定成當前節點。
setHead(node);
//把原來的頭部的後置節點設定為Null,方便GC。相當於把原來的頭節點拋棄了。
p.next = null; // help GC
failed = false;
//返回 中斷 = false。
return interrupted;
}
//如果當前節點排隊排很多,前置節點不是頭。執行下面的方法
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
又有個shouldParkAfterFailedAcquire方法,來看下。這裡用到了Node.status。上面裡有節點狀態說明
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//獲取前置節點的狀態
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
//如果前置節點的狀態是SIGNAL,那麼就這裡返回true。目的是進入下一個方法.
return true;
if (ws > 0) {
//如果前置節點的狀態被取消了。為什麼會被取消?可能是任務中途不執行了,比如IO讀一半嫌時間太長,那麼這個節點不能影響後續節點啊。
//這裡做的就是一直for迴圈往前遍歷節點,直到找到一個沒有取消的,然後追加到後面。
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//初始狀態是0,會進入這個方法.將設定為SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
我們可以看到因為新建的執行緒節點初始狀態都是0 那麼會將狀態設定為SIGNAL,然後返回false.
這個時候再回到acquireQueued方法裡這個地方:
for(;;){
....
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
...
}
因為shouldParkAfterFailedAcquire返回了false,所以由於&&具有短路效應,不會執行接下來的方法,而是跳入迴圈,又來到了shouldParkAfterFailedAcquire裡面,但這個時候狀態已經是SIGNAL了.該方法返回了true.然後到了parkAndCheckInterrupt這個方法裡.
1.private final boolean parkAndCheckInterrupt() {
2. LockSupport.park(this);
3. return Thread.interrupted();
4.}
LockSupport.park(this)的意思就是阻塞當前執行緒,.該執行緒就停留在了第二行,不會執行第三行的程式碼.
這樣說來,AQS的節點串,每過來一個執行緒都會追加到節點串後面,然後修改狀態為SIGNAL,然後變成阻塞的狀態.(當然頭部的那個比較特殊,根本沒有阻塞,而是直接獲取了.)非常完美.
關於阻塞後面的程式碼先不說.因為執行緒已經阻塞了.我們先看看釋放的程式碼吧.
release()
public final boolean release(int arg) {
if (tryRelease(arg)) {//這個還是拋異常,如果你連續看這篇文章,你會明白什麼意思的
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
上面這個方法取到節點頭,然後進入了unparkSuccessor方法
private void unparkSuccessor(Node node) {
//拿到頭節點的狀態
int ws = node.waitStatus;
//如果頭節點正常,那麼讓頭節點的狀態初始化.
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
//獲取第二個節點.
Node s = node.next;
//如果第二個節點不存在或已超時.那麼從尾節點開始往前擼,
//一直擼到距離頭節點最近的那個有效的節點t.
//把第二個節點設定為節點t
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
//放開第二個節點.不讓再阻塞
LockSupport.unpark(s.thread);
}
當第二個節點執行緒不阻塞後.再來看下acquireQueued這個方法.
它再次經歷了for迴圈.來到了
for (;;) {
//獲取當前節點的前置節點。
final Node p = node.predecessor();
//判斷前置節點是頭嗎?且還要執行一遍獲取資源的操作。
if (p == head && tryAcquire(arg)) {
//把頭部設定成當前節點。
setHead(node);
//把原來的頭部的後置節點設定為Null,方便GC。相當於把原來的頭節點拋棄了。
p.next = null; // help GC
failed = false;
//返回 中斷標誌。
return interrupted;
}
這個時候就跳出了for迴圈了.
(但也有可能非第二節點遭到外部力量喚醒,那就去下面再阻塞一次)
至於interrupted這個東西以及acquire()裡的selfInterrupt();作用是這樣的:
Thread.interrupted()方法在返回中斷標記的同時會清除中斷標記,
也就是說當由於中斷醒來然後獲取鎖成功,那麼整個acquireQueued方法就會返回true表示是因為中斷醒來,但如果中斷醒來以後沒有獲取到鎖,繼續掛起,由於這次的中斷已經被清除了,下次如果是被正常喚醒,那麼acquireQueued方法就會返回false,表示沒有中斷。