1. 程式人生 > >併發程式設計-深入淺出AQS

併發程式設計-深入淺出AQS

AQS是併發程式設計中非常重要的概念,它是juc包下的許多併發工具類,如CountdownLatch,CyclicBarrier,Semaphore 和鎖, 如ReentrantLock, ReaderWriterLock的實現基礎,提供了一個基於int狀態碼和佇列來實現的併發框架。本文將對AQS框架的幾個重要組成進行簡要介紹,讀完本文你將get到以下幾個點:

  1. AQS進行併發控制的機制是什麼

  2. AQS獨佔和共享模式是如何實現的

  3. 同步佇列和條件等待佇列的區別,和資料出入隊原則

一,AQS基本概念

AQS(AbstractQueuedSynchronizer)是用來構建鎖或者其他同步元件的基礎框架,它使用了一個int成員變數來表示狀態,通過內建的FIFO(first in,first out)佇列來完成資源獲取執行緒的排隊工作。

佇列可分為兩種,一種是同步佇列,是程式執行入口出處的等待佇列;而另一種則是條件等待佇列,佇列中的元素是在程式執行時在某個條件上發生等待。

1.1 獨佔or共享模式

AQS支援兩種獲取同步狀態的模式既獨佔式和共享式。顧名思義,獨佔式模式同一時刻只允許一個執行緒獲取同步狀態,而共享模式則允許多個執行緒同時獲取。

1.2 同步佇列

當一個執行緒嘗試獲取同步狀態失敗時,同步器會將這個執行緒以及等待狀態等資訊構造成一個節點加入到等待佇列中,同時會阻塞當前執行緒,當同步狀態釋放時,會把首節點中的執行緒喚醒,使其再次嘗試重複獲取同步佇列。

1.3 條件佇列

AQS內部類ConditionObject來實現的條件佇列,當一個執行緒獲取到同步狀態,但是卻通過Condition呼叫了await相關的方法時,會將該執行緒封裝成Node節點並加入到條件佇列中,它的結構和同步佇列相同。

二,獨佔or共享模式

AQS框架中,通過維護一個int型別的狀態,來進行併發控制,執行緒通常通過修改此狀態資訊來表明當前執行緒持有此同步狀態。AQS則是通過儲存修改狀態執行緒的引用來實現獨佔和共享模式的。

/**
 * 獲取同步狀態
 */
public final void acquire(int arg) {
    //嘗試獲取同步狀態, 如果嘗試獲取到同步狀態失敗,則加入到同步佇列中
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
/**
 * 嘗試獲取同步狀態【子類中實現】,因為aqs基於模板模式,僅提供基於狀態和同步佇列的實 
 * 現思路,具體的實現由子類決定
 */
protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        // 如果當前狀態值為0,並且等待佇列中沒有元素,執行修改狀態值操作
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            // 修改狀態值成功,記錄當前持有同步狀態的執行緒資訊
            setExclusiveOwnerThread(current);
            return true;
        }
        // 如果當前執行緒已經持有同步狀態,繼續修改同步狀態【重入鎖實現原理】
    } else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

/**
 * 根據傳入的模式以及當前執行緒資訊建立一個佇列的節點並加入到同步佇列尾部
 */
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}
/**
 * 同步佇列中節點,嘗試獲取同步狀態
 */
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        // 自旋(死迴圈)
        for (;;) {
            // 只有當前節點的前驅節點是頭節點時才會嘗試執行獲取同步狀態操作
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

獨佔式是如何控制得?

當修改狀態資訊成功後,如果執行的是獨佔式操作,AQS的具體實現類中會儲存當前執行緒的資訊來宣告同步狀態已被當前執行緒佔用,此時其他執行緒再嘗試獲取同步狀態會返回false。

三,同步佇列

3.1 佇列中儲存那些資訊?

同步佇列節點中主要儲存著執行緒的資訊以及模式(共享or獨佔)。

3.2 何時執行入隊操作?

/**
 * 獲取同步狀態
 */
public final void acquire(int arg) {
    //嘗試獲取同步狀態, 如果嘗試獲取到同步狀態失敗,則加入到同步佇列中
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

複用上文中的程式碼,不難看出再獲取同步狀態失敗後,會執行入隊操作。

3.3 何時執行出隊操作?

當執行緒獲取同步狀態失敗時,會被封裝成Node節點加入到等待佇列中,此時所有節點都回進入自旋過程,首先判斷自己prev是否時頭節點,如果是則嘗試獲取同步狀態。
被阻塞執行緒的喚醒主要以靠前驅節點的出隊或阻塞執行緒被中斷來實現。

/**
 * 同步佇列中節點,嘗試獲取同步狀態
 * 
 * 1. 當一個執行緒獲取到同步狀態時,會將當前執行緒構造程Node並設定為頭節點
 * 2. 並將原始的head節點設定為null,以便於垃圾回收
 */
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

四,條件等待佇列

條件變數(ConidtionObject)是AQS中的一個內部類,用來實現同步佇列機制。同步佇列複用了等待佇列中Node節點,所以同步佇列到等待佇列中不需要進行額外的轉換。

4.1 什麼時候執行入隊操作?

當執行緒獲取到同步狀態,但是在臨界區中呼叫了await()方法,此時該執行緒會被加入到對應的條件佇列彙總。
ps: 臨界區,加鎖和釋放鎖之間的程式碼區域

/**
 * ConditionObject中的await方法,呼叫後使得當前執行執行緒加入條件等待佇列
 */
public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();
    // -----省略程式碼------
}
/**
 * 新增等待執行緒
 */
private Node addConditionWaiter() {
    Node t = lastWaiter;
    // -----省略程式碼------
    // 將當前執行緒構造程條件佇列節點,並加入到佇列中
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}

4.2 什麼時候執行出隊操作?

當對應的Conditioni呼叫signial/signalAll()方法時回選擇從條件佇列中出佇列,同步佇列是通過自旋的方式獲取同步狀態,而條件佇列中的節點則通過通知的方式出隊。條件佇列中的節點被喚醒後會加入到入口等待佇列中。

/**
 * 喚醒當前條件等到佇列中的所有等待執行緒
 */
public final void signalAll() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignalAll(first);
}
/**
 * 遍歷佇列,將元素從條件佇列 加入到 同步佇列
 */
private void doSignalAll(Node first) {
    lastWaiter = firstWaiter = null;
    do {
        Node next = first.nextWaiter;
        first.nextWaiter = null;
        transferForSignal(first);
        first = next;
    } while (first != null);
}
final boolean transferForSignal(Node node) {
    // -----省略程式碼------
    // 執行入隊操作,將node新增到同步佇列中
    Node p = enq(node);
    int ws = p.waitStatus;
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}

五,總結

  1. 使用Node實現的FIFO佇列,可以用於構建鎖或者其他同步裝置的基礎框架
  2. 利用一個int型別的屬性表示狀態
  3. 使用模板方法模式,子類可以通過繼承它來管理狀態實現各種併發工具
  4. 可以同時實現獨佔和共享模式

本文對AQS的基本原理進行的簡要的描述,對於子類的公平性和非公平行實現,中斷,佇列中節點的等待狀態,cas等操作沒有進行探討,感興趣的小夥伴可以進行原始碼閱讀或者查閱相關資料。

六,Q&A

Question1: 在java中通常使用synchronized來實現方法同步,AQS中通過CAS保證了修改同步狀態的一致性問題,那麼對比synchronized,cas有什麼優勢不同與優勢呢?你還知道其他無鎖併發的策略嗎?

筆者的個人部落格網