AQS - 抽象同步佇列:獨佔鎖的實現
參考連結:https://www.bilibili.com/video/BV12K411G7Fg
通過 CAS ,我們可以實現樂觀鎖操作,從而使得執行緒進行同步,但是通過 CAS 的原始碼,我們發現 CAS 僅僅能修改記憶體中的一個值,而不是對物件進行同步,那麼該如何對物件進行同步呢?同時,在多執行緒對統一資源進行競爭的情況下,如何能管理到所有需要該資源的執行緒呢?於是,AQS應運而生。
參考:《深入 Java 虛擬機器》
AbstractQueuedSynchronizer抽象同步佇列簡稱AQS,它是實現同步器的基礎元件,併發包中鎖的底層就是通過AQS實現。結構如下
屬性
int state
在共享模式下,需要表示共享鎖的持有執行緒數量。
共享鎖 和 獨佔鎖(排他鎖)
共享鎖:該鎖允許被多個執行緒持有,共享鎖僅支援讀資料,如果一個執行緒對資料加了共享鎖後,其他資料只能對該資料加共享鎖。
獨佔鎖(排他鎖):只有一個執行緒能獲得鎖。
共享鎖 和 獨佔鎖是 AQS 的不同實現方式
Node head & Node tail
用於維護一個 FIFO 的雙向連結串列,兩個 Node 節點分別指向頭節點和尾節點
Node
佇列中的節點,結構見上圖
方法(以獨佔模式為例)
tryAcquire(int arg)
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
嘗試獲取鎖,獲取鎖失敗直接返回。
該方法僅僅丟擲一個異常,AQS 繼承類需要繼承該方法,用於給上層開放空間,使使用者能編寫業務邏輯。
acquire(int arg)
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
tryAcquire
方法失敗後,會進入等待佇列
addWaiter(Node.EXCLUSIVE), arg)
主要作用為新建一個 Node 節點,並將節點插入等待佇列。
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// 獲取當前尾節點,tail 是 AQS 的屬性
Node pred = tail;
if (pred != null) {
node.prev = pred;
// 嘗試通過原子操作將當前節點置為尾節點
// 其實獲取 pred 後,其他執行緒也可能會對 tail 進行修改
// compareAndSetTail(Node expect, Node update) 會讀取 tail 的偏移
// 判斷當前的 pred 是不是還是隊尾(期間可能被其他執行緒修改),若是,則更新隊尾為當前 node
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 呼叫完整的入隊方法,上面嘗試快速入隊失敗時會進入該方法
// 例如 tail 被修改的情況
enq(node);
return node;
}
acquireQueued(final Node node, int arg)
加入佇列後,在佇列中自旋對鎖進行獲取。
經過程式碼可以看出,head 節點後的節點組成了等待佇列
當 head 後第一個 node 獲得鎖時,node 會成為新的頭節點
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
// 自選操作
for (;;) {
final Node p = node.predecessor();
// 當前節點的前置節點為頭節點 && 當前執行緒獲取鎖成功
if (p == head && tryAcquire(arg)) {
// 當前節點作為頭節點
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 根據目前的節點狀態判斷執行緒是否需要掛起
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
boolean tryRelease(int arg)
同 tryAcquire
,作為開放給上層的方法
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
boolean release(int arg)
釋放鎖,並通知佇列,改變等待佇列中的執行緒狀態
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
// 傳入 head 並喚醒等待佇列中的 node
unparkSuccessor(h);
return true;
}
return false;
}
unparkSuccessor(Node node)
頭節點操作完資源後,通知等待佇列中的節點
下方程式碼的操作中,為什麼喚醒不從頭節點開始呢
該處搜尋並不是原子性的,從後往前搜尋,可能會因為佇列構建順序未
- 後節點 pre 指向前節點
- 前節點 next 才會指向後節點
從前往後可能會因第2步還未完成而造成搜尋中斷
private void unparkSuccessor(Node node) {
// 設定頭節點的狀態
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
// 從尾節點開始搜尋,head 後最靠前的節點並且 waitStatus <= 0 的節點
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 對找到的節點進行喚醒操作,喚醒後會自旋執行 acquire 方法獲取鎖
if (s != null)
LockSupport.unpark(s.thread);
}