學習AbstractQueuedSynchronizer原始碼以及原理
簡介
AbstractQueuedSynchronizer(以下簡稱AQS),是用來構建鎖或者其他同步元件的基礎架構,它使用一個int成員變數管理同步狀態,通過內建FIFO佇列來完成資源獲取獲取執行緒的排隊工作,作者(Doug lea)大神期望它能完成大部分同步需求的基礎。
AQS的主要使用方法是繼承,子類通過繼承AQS並實現它的模板方法來管理同步狀態,AQS通過(getState、setState(int newState)、compareAndSetState(int expect,int update)三個方法來管理同步狀態,這三個方法是執行緒安全的。AQS推薦將AQS定義為子類的內部靜態類,因為AQS自身沒有任何同步介面。它僅僅管理了若干同步狀態的獲取和釋放,AQS既支援獨佔所也支援共享鎖,這樣就可以向作者想的那樣實現大部分同步需求。
AQS是面向鎖的實現,簡化了鎖的實現方式,遮蔽了同步狀態管理、執行緒的排隊、等待與喚醒的底層操作。降低了開發者的實現鎖的成本。
AQS佇列以及節點分析
AQS內部依賴一個FIFO雙向連結串列來管理同步狀態,噹噹前執行緒獲取同步狀態失敗時,AQS會將當前執行緒以及等待狀態等資訊構造成一個節點(Node)加入到連結串列中。同事阻塞當前執行緒,當同步狀態釋放時,會把首節點中的執行緒喚醒,使其再次艙室獲取同步狀態。
Node用來儲存獲取同步狀態失敗的執行緒引用以及其他一些屬性,結果屬性以及描述如下表所示:
static final class Node {
// 同步佇列中等待的執行緒等待超時或者被中斷,需要從同步佇列中取消等待,節點進入該狀態後狀態將不會發生變化
static final int CANCELLED = 1;
// 後繼節點的執行緒處於等待狀態,而當前節點的執行緒如果釋放了同步狀態或者被取消,將會通知後繼節點,使後繼節點的執行緒得以執行
static final int SIGNAL = -1;
// 表示當前節點在等待condition,也就是在condition佇列中
static final int CONDITION = -2;
// 該狀態表示下一次共享式同步狀態獲取將會無條件的傳遞下去
static final int PROPAGATE = -3;
// 等待狀態 用上述的4個值以及初始值0代表同步佇列中的狀態
volatile int waitStatus;
// 前驅節點
volatile Node prev;
// 後繼節點
volatile Node next;
// 獲取同步狀態的執行緒
volatile Thread thread;
// 等待佇列中的後繼節點。如果當前節點是共享的,那麼這個欄位將是一個SHARED常量
Node nextWaiter;
}
複製程式碼
獨享鎖示例以及原始碼分析
class Mutex implements Lock, java.io.Serializable {
private static class Sync extends AbstractQueuedSynchronizer {
protected boolean isHeldExclusively() {
return getState() == 1;
}
public boolean tryAcquire(int acquires) {
assert acquires == 1;
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int releases) {
assert releases == 1;
if (getState() == 0) throw new IllegalMonitorStateException();
setExclusiveOwnerThread(null);
setState(0);
return true;
}
Condition newCondition() {
return new ConditionObject();
}
}
private final Sync sync = new Sync();
public void lock() {
sync.acquire(1);
}
public boolean tryLock() {
return sync.tryAcquire(1);
}
public void unlock() {
sync.release(1);
}
public Condition newCondition() {
return sync.newCondition();
}
public boolean isLocked() {
return sync.isHeldExclusively();
}
public boolean hasQueuedThreads() {
return sync.hasQueuedThreads();
}
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
}
複製程式碼
上述程式碼是用AQS實現的最簡單的獨享鎖,呼叫該鎖的方式:
Lock lock = new Mutex();
try{
lock.lock();
}finally {
lock.unlock();
}
複製程式碼
首先我們回撥用AQS的acquire方法:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
複製程式碼
上述程式碼首先呼叫我們自定義的tryAcquire(int arg)方法如果同步狀態獲取失敗,則構造獨佔式同步節點並通過addWaiter(Node node)方法將該節點加入到同步佇列尾部,最後呼叫acquireQueued(Node node,int arg)方法,使該節點以自旋的方法獲取同步狀態。如果獲取不到阻塞節點中的執行緒,而被阻塞的執行緒的喚醒主要依靠前驅節點的出隊以及阻塞佇列被中斷來實現。接下來看addWaiter和加入同步佇列enq方法:
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
//已初始化會通過下列方法嘗試一次加入node到隊尾
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
// 未初始化同步佇列或者上面快速新增節點失敗會通過這個方法將node加入到隊尾
private Node enq(final Node node) {
// 自旋保證加入到佇列尾部
for (;;) {
Node t = tail;
if (t == null) {
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
複製程式碼
上述程式碼通過compareAndSetHead和compareAndSetTail兩個方法確保執行緒安全的初始化同步佇列以及新增一個節點到同步佇列尾部。節點加入到同步佇列以後每個節點都會進入自旋狀態,更新自己的waitStatus:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 獲取前驅節點
final Node p = node.predecessor();
// 如果前驅節點是頭結點嘗試獲取同步狀態
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null;
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
複製程式碼
上述邏輯主要包括:
- 獲取當前節點的前驅節點; 需要獲取當前節點的前驅節點,而頭結點所對應的含義是當前站有鎖且正在執行。
- 當前驅節點是頭結點並且能夠獲取狀態,代表該當前節點佔有鎖; 如果滿足上述條件,那麼代表能夠佔有鎖,根據節點對鎖佔有的含義,設定頭結點為當前節點。
- 否則進入等待狀態。 如果沒有輪到當前節點執行,那麼將當前執行緒從執行緒排程器上摘下,也就是進入等待狀態。
上述做法不會過早的喚醒睡眠中的執行緒,降低資源使用率,提高效能。
最後我們回撥用release(int arg)方法釋放當前執行緒的同步狀態,使後繼節點能夠繼續獲取同步狀態。程式碼如下:
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
複製程式碼
上述邏輯會呼叫unparkSuccessor(Node node)喚醒處於等待狀態的執行緒:
private void unparkSuccessor(Node node) {
// 將狀態設定為同步狀態
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 獲取當前節點的後繼節點,如果滿足狀態,那麼進行喚醒操作
// 如果沒有滿足狀態,從尾部開始找尋符合要求的節點並將其喚醒
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
複製程式碼
分析至此總結一下AQS實現獨佔鎖的流程:在獲取同步狀態時,同步器維護一個同步佇列,獲取狀態失敗的執行緒都會加入到佇列中並進行自旋,移除佇列以及停止自旋的條件是前驅節點是頭結點且成功獲取了同步了狀態,最後通過自定義tryRelease(int arg)方法釋放同步狀態,喚醒後繼節點。
參考:
- 特別感謝《Java併發程式設計的藝術》