聊聊高併發(二十四)解析java.util.concurrent各個元件(六) 深入理解AQS(四)
最近整體過了下AQS的結構,也在網上看了一些講AQS的文章,大部分的文章都是泛泛而談。重新看了下AQS的程式碼,把一些新的要點拿出來說一說。
AQS是一個管程,提供了一個基本的同步器的能力,包含了一個狀態,修改狀態的原子操作,以及同步執行緒的一系列操作。它是CLHLock的變種,CLHLock是一個基於佇列鎖的自旋鎖演算法。AQS也採用了佇列來作為同步執行緒的結構,它維護了兩個佇列,一個是作為執行緒同步的同步佇列,另一個是基於Unsafe來進行阻塞/喚醒操作的條件佇列。所以理解佇列操作是理解AQS的關鍵。
1. 理解 head, tail引用
2. 理解 next, prev引用
3. 理解佇列節點何時入隊,何時出隊
關於head引用,需要記住的是
1. head引用始終指向獲得了鎖的節點,它不會被取消。acquire操作成功就表示獲得了鎖,acquire過程中如果中斷,那麼acquire就失敗了,這時候head就會指向下一個節點。
* because the head node is never cancelled: A node becomes * head only as a result of successful acquire. A * cancelled thread never succeeds in acquiring, and a thread only * cancels itself, not any other node.
而獲得了鎖的之後,如果執行緒中斷了,那麼就需要release來釋放head節點。如果執行緒中斷了不釋放鎖,就有可能造成問題。所以使用顯式鎖時,必須要在finally裡面釋放鎖
Lock lock = new ReentrantLock();
lock.lock();
try{
// 如果中斷,可以處理獲得丟擲,要保證在finally裡面釋放鎖
}finally{
lock.unlock();
}
再來看看獲得鎖時對head引用的處理,只有節點的前驅節點是head時,它才有可能獲得鎖,而獲得鎖之後,要把自己設定為head節點,同時把老的head的next設定為null。
這裡有幾層含義:
1. 始終從head節點開始獲得鎖
2. 新的執行緒獲得鎖之後,之前獲得鎖的節點從佇列中出隊
3. 一旦獲得了鎖,acquire方法肯定返回,這個過程中不會被中斷
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
關於tail引用,它負責無鎖地實現一個鏈式結構,採用CAS + 輪詢的方式。節點的入隊操作都是在tail節點
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
next引用在佇列中扮演了很重要的作用,它出現的頻率很高。關於next引用,它有幾種值的情況
1. next = null
2. next指向非null的下一個節點
3. next = 節點自己
next = null的情況有三種
1. 隊尾節點,隊尾節點的next沒有顯式地設定,所以為null
2. 隊尾節點入佇列時的上一個隊尾節點next節點有可能為null,因為enq不是原子操作,CAS之後是複合操作
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
// 這個期間next可能為null
t.next = node;
return t;
}
}
}
}
3. 獲取鎖時,之前獲取鎖的節點的next設定為null
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
next指向非null的下一個節點,這種情況就是正常的在同步佇列中等待的節點,入隊操作時設定了前一個節點的next值,這樣可以在釋放鎖時,通知下一個節點來獲取鎖
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
next指向自己,這個是取消操作時,會把節點的前一個節點指向它的後一個節點,最後把next域設定為自己
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;
node.thread = null;
// Skip cancelled predecessors
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// predNext is the apparent node to unsplice. CASes below will
// fail if not, in which case, we lost race vs another cancel
// or signal, so no further action is necessary.
Node predNext = pred.next;
// Can use unconditional write instead of CAS here.
// After this atomic step, other Nodes can skip past us.
// Before, we are free of interference from other threads.
node.waitStatus = Node.CANCELLED;
// If we are the tail, remove ourselves.
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
prev引用比較簡單,它主要是維護連結串列結構。CLHLock是在前一個節點的狀態自旋,AQS裡面的節點不是在前一個狀態等待,而是釋放的時候由前一個節點通知佇列來查詢下一個要被喚醒的節點。
最後說說節點進入佇列和出佇列的情況。
節點入佇列只有一種情況,那就是它的tryAcquire操作失敗,沒有獲得鎖,就進入同步佇列等待,如果tryAcquire成功了,就不需要進入同步佇列等待了。AQS提供了充分的靈活性,它提供了tryAcquire和tryRelase方法給子類擴充套件,基類負責維護佇列操作,子類可以自己決定是否要進入佇列。
所以實際子類擴充套件的時候有兩種型別,一種是公平的同步器,一種是非公平的同步器。這裡需要注意的是,所謂的非公平,不是說不使用佇列來維護阻塞操作,而是說在獲取競爭時,不考慮先來的執行緒,後來的執行緒可以直接競爭資源。非公平和公平的同步器競爭失敗後,都需要進入AQS的同步佇列進行等待,而同步佇列是先來先服務的公平的佇列。
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
NonfairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
/**
* Fair version
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;
FairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
出佇列有兩種情況,
1. 後一個執行緒獲得鎖是,head引用指向當前獲得鎖的執行緒,前一個獲得鎖的節點自動出佇列
2. 取消操作時,節點出佇列,取消只有兩種情況,一種是執行緒被中斷,還有一種是等待超時