1. 程式人生 > 其它 >AQS - 抽象同步佇列:獨佔鎖的實現

AQS - 抽象同步佇列:獨佔鎖的實現

目錄

參考連結:https://www.bilibili.com/video/BV12K411G7Fg

通過 CAS ,我們可以實現樂觀鎖操作,從而使得執行緒進行同步,但是通過 CAS 的原始碼,我們發現 CAS 僅僅能修改記憶體中的一個值,而不是對物件進行同步,那麼該如何對物件進行同步呢?同時,在多執行緒對統一資源進行競爭的情況下,如何能管理到所有需要該資源的執行緒呢?於是,AQS應運而生。

參考:《深入 Java 虛擬機器》

AbstractQueuedSynchronizer抽象同步佇列簡稱AQS,它是實現同步器的基礎元件,併發包中鎖的底層就是通過AQS實現。結構如下

屬性

int state

在共享模式下,需要表示共享鎖的持有執行緒數量。

共享鎖 和 獨佔鎖(排他鎖)

共享鎖:該鎖允許被多個執行緒持有,共享鎖僅支援讀資料,如果一個執行緒對資料加了共享鎖後,其他資料只能對該資料加共享鎖。

獨佔鎖(排他鎖):只有一個執行緒能獲得鎖。

共享鎖 和 獨佔鎖是 AQS 的不同實現方式

Node head & Node tail

用於維護一個 FIFO 的雙向連結串列,兩個 Node 節點分別指向頭節點和尾節點

Node

佇列中的節點,結構見上圖

方法(以獨佔模式為例)

tryAcquire(int arg)

protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
}

嘗試獲取鎖,獲取鎖失敗直接返回。

該方法僅僅丟擲一個異常,AQS 繼承類需要繼承該方法,用於給上層開放空間,使使用者能編寫業務邏輯。

acquire(int arg)

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            	acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
}

tryAcquire方法失敗後,會進入等待佇列

addWaiter(Node.EXCLUSIVE), arg)

主要作用為新建一個 Node 節點,並將節點插入等待佇列。

private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
    	// 獲取當前尾節點,tail 是 AQS 的屬性
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            // 嘗試通過原子操作將當前節點置為尾節點
            // 其實獲取 pred 後,其他執行緒也可能會對 tail 進行修改
            // compareAndSetTail(Node expect, Node update) 會讀取 tail 的偏移
            // 判斷當前的 pred 是不是還是隊尾(期間可能被其他執行緒修改),若是,則更新隊尾為當前 node
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
    
    	// 呼叫完整的入隊方法,上面嘗試快速入隊失敗時會進入該方法
    	// 例如 tail 被修改的情況
        enq(node);
        return node;
}

acquireQueued(final Node node, int arg)

加入佇列後,在佇列中自旋對鎖進行獲取。

經過程式碼可以看出,head 節點後的節點組成了等待佇列

當 head 後第一個 node 獲得鎖時,node 會成為新的頭節點

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            // 自選操作
            for (;;) {
                final Node p = node.predecessor();
                // 當前節點的前置節點為頭節點 && 當前執行緒獲取鎖成功
                if (p == head && tryAcquire(arg)) {
                    // 當前節點作為頭節點
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                // 根據目前的節點狀態判斷執行緒是否需要掛起
                if (shouldParkAfterFailedAcquire(p, node) &&
                    	parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
}

boolean tryRelease(int arg)

tryAcquire ,作為開放給上層的方法

protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
}

boolean release(int arg)

釋放鎖,並通知佇列,改變等待佇列中的執行緒狀態

public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
            	// 傳入 head 並喚醒等待佇列中的 node
                unparkSuccessor(h);
            return true;
        }
        return false;
}

unparkSuccessor(Node node)

頭節點操作完資源後,通知等待佇列中的節點

下方程式碼的操作中,為什麼喚醒不從頭節點開始呢

該處搜尋並不是原子性的,從後往前搜尋,可能會因為佇列構建順序未

  1. 後節點 pre 指向前節點
  2. 前節點 next 才會指向後節點

從前往後可能會因第2步還未完成而造成搜尋中斷

private void unparkSuccessor(Node node) {
    	// 設定頭節點的狀態
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
		
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            // 從尾節點開始搜尋,head 後最靠前的節點並且 waitStatus <= 0 的節點
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
    	// 對找到的節點進行喚醒操作,喚醒後會自旋執行 acquire 方法獲取鎖
        if (s != null)
            LockSupport.unpark(s.thread);
}