1. 程式人生 > >抽象同步佇列AQS——AbstractQueuedSynchronizer鎖詳解

抽象同步佇列AQS——AbstractQueuedSynchronizer鎖詳解

AQS——鎖的底層支援

談到併發,不得不談ReentrantLock;而談到ReentrantLock,不得不談AbstractQueuedSynchronizer(AQS)!

類如其名,抽象的佇列式的同步器,AQS定義了一套多執行緒訪問共享資源的同步器框架,許多同步類實現都依賴於它,如常用的ReentrantLock/Semaphore/CountDownLatch...

併發包的底層就是使用AQS實現的,以下是AQS的類圖結構

框架

它維護了一個volatile int state(代表共享資源)和一個FIFO執行緒等待佇列(多執行緒競爭資源被阻塞會進入此佇列)。這裡volatile保證執行緒可見性。

state的訪問方式有三種:

getState()

setState()

compareAndSetState()

這三種都是原子操作,其中compareAndSetState的實現依賴於Unsafe的compareAndSwapInt()方法。程式碼如下:

/**
 * Atomically sets synchronization state to the given updated
 * value if the current state value equals the expected value.
 * This operation has memory semantics of a {@code volatile} read
 * and write.
 *
 * @param expect the expected value
 * @param update the new value
 * @return {@code true} if successful. False return indicates that the actual
 *         value was not equal to the expected value.
 */
protected final boolean compareAndSetState(int expect, int update) {
    // See below for intrinsics setup to support this
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

自定義資源共享方式

AQS定義了兩種資源共享方式:Exclusive(獨佔,只有一個執行緒能執行,如ReentantLock)和Share(共享,多個執行緒可同時執行,如Semaphore/CountDownLatch)。

不同的自定義同步器爭用共享資源的方式也不同,自定義同步器在實現時只需要實現共享資源state的獲取與釋放方式即可,至於具體執行緒等待佇列的維護(如獲取資源失敗入隊/喚醒出隊等),AQS已經在頂層實現好了。自定義同步器實現時主要實現以下幾種方法。

isHeldExclusively():該執行緒是否正在獨佔資源。只有用到condition才需要去實現它。

tryAcquire(int):獨佔方式。嘗試獲取資源,成功則返回true,失敗則返回false。

tryRelease(int):獨佔方式。嘗試釋放資源,成功則返回true,失敗則返回false。

tryAcquireShared(int):共享方式。嘗試獲取資源,負數表示失敗;0表示成功,但沒用剩餘可用資源;正數表示成功,且有剩餘資源。

tryReleaseShared(int):共享方式。嘗試釋放資源,如果釋放後允許喚醒後續等待節點返回true,否則返回false。

原始碼實現

接下來我們開始開始講解AQS的原始碼實現。依照acquire-release、acquireShared-releaseShared的次序來。

1. acquire(int)

acquire是一種以獨佔方式獲取資源,如果獲取到資源,執行緒直接返回,否則進入等待佇列,直到獲取到資源為止,且整個過程忽略中斷的影響。該方法是獨佔模式下執行緒獲取共享資源的頂層入口。

獲取到資源後,執行緒就可以去執行其臨界區程式碼了。下面是acquire()的原始碼

/**
 * Acquires in exclusive mode, ignoring interrupts.  Implemented
 * by invoking at least once {@link #tryAcquire},
 * returning on success.  Otherwise the thread is queued, possibly
 * repeatedly blocking and unblocking, invoking {@link
 * #tryAcquire} until success.  This method can be used
 * to implement method {@link Lock#lock}.
 *
 * @param arg the acquire argument.  This value is conveyed to
 *        {@link #tryAcquire} but is otherwise uninterpreted and
 *        can represent anything you like.
 */
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

通過註釋我們知道,acquire方法是一種互斥模式,且忽略中斷。該方法至少執行一次tryAcquire(int)方法,如果tryAcquire(int)方法返回true,則acquire直接返回,否則當前執行緒需要進入佇列進行排隊。函式流程如下

1、tryAcquire():嘗試直接獲取資源,如果成功則直接返回;

2、addWaiter():將該執行緒加入等待佇列的尾部,並標記為獨佔模式;

3、acquireQueued():使執行緒在等待佇列中獲取資源,一直獲取到資源後才返回。如果在整個等待過程中被中斷過,則返回true,否則返回false。

4、如果執行緒在等待過程中被中斷過,它是不響應的。只有獲取資源後才再進行自我中斷selfInterrupt(),將中斷補上。

相關方法介紹

1.tryAcquire(int)

protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}

tryAcquire嘗試以獨佔的方式獲取資源,如果獲取成功,則直接返回true,否則直接返回false。該方法可以用於實現Lock中的tryLock()方法。該方法的預設實現是丟擲UnsupportedOperationException異常,

具體實現由自定義的擴充套件了AQS的同步類來實現。AQS在這裡只負責定義了一個公共的方法框架。這裡之所以沒用定義為abstract,是因為獨佔模式下只用實現tryAcquire-tryRelease,而共享模式下只用實現tryAcquireShared-tryReleaseShared

如果都定義成abstract,那麼每個模式都要去實現另外一個模式下的介面。

2.addWaiter(Node)

/**
 * Creates and enqueues node for current thread and given mode.
 *
 * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
 * @return the new node
 */
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

該方法用於將當前執行緒根據不同的模式,(Node.EXCLUSIVE互斥模式、Node.SHARED共享模式)加入到等待佇列的隊尾並返回當前執行緒所在的節點。如果佇列不為空,

則以通過compareAndSetTail方法以CAS的方式將當前節點加入到等待佇列的末尾。否則,通過enq(node)方法初始化一個等待佇列,並返回當前節點。

3. enq(node)

/**
 * Inserts node into queue, initializing if necessary. See picture above.
 * @param node the node to insert
 * @return node's predecessor
 */
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

enq(node)用於將當前節點插入到等待佇列,如果佇列為空,則初始化當前佇列。整個過程以CAS自旋的方式進行,直到成功加入隊尾為止。

4.acquireQueued(Node, int)

/**
 * Acquires in exclusive uninterruptible mode for thread already in
 * queue. Used by condition wait methods as well as acquire.
 *
 * @param node the node
 * @param arg the acquire argument
 * @return {@code true} if interrupted while waiting
 */
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;//——標記是否成功拿到資源,預設是false
    try {
        boolean interrupted = false;//——標記等待過程是否被中斷過
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

acquireQueued()用於佇列中的執行緒自旋地以獨佔且不可中斷的方式獲取同步狀態(acquire),直到拿到鎖之後再返回。該方法的實現分成兩部分:

如果當前節點已經成為頭結點,嘗試獲取鎖(tryAcquire)成功,然後返回;否則檢查當前節點是否應該被park,然後將該執行緒park並且檢查當前執行緒是否被可以被中斷。

5.shouldParkAfterFailedAcquire(Node, Node)

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        /*
         * This node has already set status asking a release
         * to signal it, so it can safely park.
         */
        return true;
    if (ws > 0) {
        /*
         * Predecessor was cancelled. Skip over predecessors and
         * indicate retry.
         */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
         * waitStatus must be 0 or PROPAGATE.  Indicate that we
         * need a signal, but don't park yet.  Caller will need to
         * retry to make sure it cannot acquire before parking.
         */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

shouldParkAfterFailedAcquire方法通過對當前節點的前一個節點的狀態進行判斷,對當前節點做出不同的操作,至於每個Node的狀態表示,可以參考介面文件。

6.parkAndCheckInterrupt()

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

該方法讓執行緒去休息,真正進入等待狀態。park()會讓當前執行緒進入waiting狀態。在此狀態下,有兩種途徑可以喚醒該執行緒:

1)被unpark();

2)被interrupt()。

需要注意的是,Thread.interrupted()會清除當前執行緒的中斷標記位。

我們再回到acquireQueued(),總結下該函式的具體流程:

1、節點進入隊尾後,檢查狀態,找到安全休息點

2、呼叫park()進入waiting狀態,等待unpark()或interrupt()喚醒自己

3、被喚醒後,看自己是不是有資格能拿到號。如果能拿到,head指向當前節點,並返回從入隊到拿到號的整個過程中是否被中斷過;如果沒用拿到,繼續流程1

最後,總結一下acquire()的流程:

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

1、呼叫自定義同步器的tryAcquire()嘗試直接去獲取資源,如果成功則直接返回;

2、沒成功,則執行addWaiter()將執行緒加入等待佇列的尾部並標記為獨佔模式;

3、acquireQueued()使執行緒在等待佇列中休息,有機會時(輪到自己,會被unpark())會去嘗試獲取資源。獲取到資源才返回。如果在整個等待過程中被中斷過,則會返回true,否則返回false。

4、如果執行緒在等待過程中被中斷過,他是不響應的。只是獲取資源後才進行自我中斷selfInterrupt(),將中斷補上。

未完...

 

 

 

參考書籍

Java併發程式設計之美

參考連結

https://www.jianshu.com/p/da9d051dcc3d