1. 程式人生 > 其它 >AbstractQueuedSynchronizer(AQS)原始碼分析詳解

AbstractQueuedSynchronizer(AQS)原始碼分析詳解

AbstractQueuedSynchronizer 是一個抽象的同步等待佇列。juc下面多大多數鎖都是使用到了AQS。除了獲取資源和釋放資源的方法外,AQS基本上將其他同步等待佇列的邏輯都實現了,比如現場進入佇列同步等待、相應中斷、喚醒執行緒等。如果我們想實現一個簡單的同步等待佇列,那隻需要實現AQS的獲取和釋放資源的方法就行了(實際上還有一個用來判斷釋放是當前現場持有資源的方法需要實現)。

1. AQS類的基本結構

首先實現了AbstractOwnableSynchronizer類,裡面維護了一個FIFO雙向佇列,也就是我們的等待佇列,記錄了佇列的頭節點和尾節點;還維護了一個狀態資訊state

,對於不同的鎖來說state有著不同的意義,但是可以肯定的是state跟執行緒釋放進入等待佇列是由密切關係的。

在多執行緒情況下,對雙向佇列的頭節點和尾節點以及state的修改都是使用的cas進行的修改(不存在多執行緒的話,就直接賦值了,如果獨佔鎖釋放鎖資源,因為之會有一個執行緒持有鎖資源)。

1.1 AbstractOwnableSynchronizer類結構

核心就是設定和獲取exclusiveOwnerThreadexclusiveOwnerThread表示獨佔模式下持有鎖的執行緒。

public abstract class AbstractOwnableSynchronizer implements java.io.Serializable {
    /** Use serial ID even though all fields transient. */
    private static final long serialVersionUID = 3737899427754241961L;
    protected AbstractOwnableSynchronizer() { }
    /**
     * The current owner of exclusive mode synchronization.
     */
    private transient Thread exclusiveOwnerThread;
    /**
     * Sets the thread that currently owns exclusive access.
     * A {@code null} argument indicates that no thread owns access.
     * This method does not otherwise impose any synchronization or
     * {@code volatile} field accesses.
     * @param thread the owner thread
     */
    protected final void setExclusiveOwnerThread(Thread thread) {
        exclusiveOwnerThread = thread;
    }
    /**
     * Returns the thread last set by {@code setExclusiveOwnerThread},
     * or {@code null} if never set.  This method does not otherwise
     * impose any synchronization or {@code volatile} field accesses.
     * @return the owner thread
     */
    protected final Thread getExclusiveOwnerThread() {
        return exclusiveOwnerThread;
    }
}

1.2 等待佇列

同步等待佇列中是要給雙向佇列,在AQS中記錄了頭節點和尾節點(都用了volatile修飾,確保了headtail的可見性),一般來說頭節點是一個空節點(這點後面會說到)。每個節點裡面記錄了當前節點的等待模式(獨佔模式或者共享模式),同時記錄了當前節點的等待狀態,pre和next記錄了當前節點的上一個節點和下一個節點,thread表示節點對應的執行緒。

/**
 * Head of the wait queue, lazily initialized.  Except for
 * initialization, it is modified only via method setHead.  Note:
 * If head exists, its waitStatus is guaranteed not to be
 * CANCELLED.
 */
private transient volatile Node head;
/**
 * Tail of the wait queue, lazily initialized.  Modified only via
 * method enq to add new wait node.
 */
private transient volatile Node tail;

static final class Node {
    /** Marker to indicate a node is waiting in shared mode */
    static final Node SHARED = new Node();
    /** Marker to indicate a node is waiting in exclusive mode */
    static final Node EXCLUSIVE = null;
    /** waitStatus value to indicate thread has cancelled */
    static final int CANCELLED =  1;
    /** waitStatus value to indicate successor's thread needs unparking */
    static final int SIGNAL    = -1;
    /** waitStatus value to indicate thread is waiting on condition */
    static final int CONDITION = -2;
    /**
     * waitStatus value to indicate the next acquireShared should
     * unconditionally propagate
     */
    static final int PROPAGATE = -3;
    /**
     * Status field, taking on only the values:
     *   SIGNAL:     The successor of this node is (or will soon be)
     *               blocked (via park), so the current node must
     *               unpark its successor when it releases or
     *               cancels. To avoid races, acquire methods must
     *               first indicate they need a signal,
     *               then retry the atomic acquire, and then,
     *               on failure, block.
     *   CANCELLED:  This node is cancelled due to timeout or interrupt.
     *               Nodes never leave this state. In particular,
     *               a thread with cancelled node never again blocks.
     *   CONDITION:  This node is currently on a condition queue.
     *               It will not be used as a sync queue node
     *               until transferred, at which time the status
     *               will be set to 0. (Use of this value here has
     *               nothing to do with the other uses of the
     *               field, but simplifies mechanics.)
     *   PROPAGATE:  A releaseShared should be propagated to other
     *               nodes. This is set (for head node only) in
     *               doReleaseShared to ensure propagation
     *               continues, even if other operations have
     *               since intervened.
     *   0:          None of the above
     *
     * The values are arranged numerically to simplify use.
     * Non-negative values mean that a node doesn't need to
     * signal. So, most code doesn't need to check for particular
     * values, just for sign.
     *
     * The field is initialized to 0 for normal sync nodes, and
     * CONDITION for condition nodes.  It is modified using CAS
     * (or when possible, unconditional volatile writes).
     */
    volatile int waitStatus;
    /**
     * Link to predecessor node that current node/thread relies on
     * for checking waitStatus. Assigned during enqueuing, and nulled
     * out (for sake of GC) only upon dequeuing.  Also, upon
     * cancellation of a predecessor, we short-circuit while
     * finding a non-cancelled one, which will always exist
     * because the head node is never cancelled: A node becomes
     * head only as a result of successful acquire. A
     * cancelled thread never succeeds in acquiring, and a thread only
     * cancels itself, not any other node.
     */
    volatile Node prev;
    /**
     * Link to the successor node that the current node/thread
     * unparks upon release. Assigned during enqueuing, adjusted
     * when bypassing cancelled predecessors, and nulled out (for
     * sake of GC) when dequeued.  The enq operation does not
     * assign next field of a predecessor until after attachment,
     * so seeing a null next field does not necessarily mean that
     * node is at end of queue. However, if a next field appears
     * to be null, we can scan prev's from the tail to
     * double-check.  The next field of cancelled nodes is set to
     * point to the node itself instead of null, to make life
     * easier for isOnSyncQueue.
     */
    volatile Node next;
    /**
     * The thread that enqueued this node.  Initialized on
     * construction and nulled out after use.
     */
    volatile Thread thread;
    /**
     * Link to next node waiting on condition, or the special
     * value SHARED.  Because condition queues are accessed only
     * when holding in exclusive mode, we just need a simple
     * linked queue to hold nodes while they are waiting on
     * conditions. They are then transferred to the queue to
     * re-acquire. And because conditions can only be exclusive,
     * we save a field by using special value to indicate shared
     * mode.
     */
    Node nextWaiter;
    /**
     * Returns true if node is waiting in shared mode.
     */
    final boolean isShared() {
        return nextWaiter == SHARED;
    }
    /**
     * Returns previous node, or throws NullPointerException if null.
     * Use when predecessor cannot be null.  The null check could
     * be elided, but is present to help the VM.
     *
     * @return the predecessor of this node
     */
    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }
    Node() {    // Used to establish initial head or SHARED marker
    }
    Node(Thread thread, Node mode) {     // Used by addWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }
    Node(Thread thread, int waitStatus) { // Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}

上面兩個有參構造方法,一個是被addWiter呼叫,一個是被Condition(也就是條件佇列)呼叫,通過兩個方法我們可以看到,addWaiter建立的node,waitStatus=0;通過Condition建立的node,waitStatus是CONDITION(-2)狀態(addConditionWaiter方法中可以看到)。需要注意的是一般共享鎖是不支援Condition的,所以通過第二個有參構造方法建立的節點是處於獨佔模式的,這樣我們就可以直接根據nextWaiter就能判斷出節點的等待模式。

1.3 同步狀態state

在不同的鎖實現中,有著不同的意義,比如在ReentractLock中,state表示當前執行緒獲取鎖的重入次數。在ReentractReadWriteLock中,state的高16位表示持有讀鎖的執行緒數,低16位表示寫鎖的重入次數。但是無論state怎麼存資料,他都與鎖的獲取與釋放有著密切的關係。

/**
 * The synchronization state.
 */
private volatile int state;

1.4 幾個核心方法

tryAcquireacquire為例,tryAcquire只是進行鎖資源的獲取,獲取成功就返回true,失敗返回false,而acquire會根據tryAcquire的返回結果來判斷,是否將當前執行緒阻塞放入等待佇列中。看名字我們就可以很簡單的分辨出方法直接點對應關係。在比如tryReleaseShared是嘗試釋放鎖資源,如果返回true,就會去嘗試喚醒後繼處於等待狀態的節點。

以try開頭的這些方法是我們在實現自定義同步等待佇列的時候,需要重點關注的方法,我們需要重寫方法,實現具體的邏輯。當然,如果你只是自定義一個獨佔鎖,那你只需要實現tryAcquiretryReleaseisHeldExclusively就行,共享鎖相關的,你可以不用管。如果你想實現例如讀寫鎖,那就需要將共享模式對應的方法也要重寫,進行實現。

// 獨佔模式獲取鎖資源
protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}
// 獨佔模式釋放鎖資源
protected boolean tryRelease(int arg) {
    throw new UnsupportedOperationException();
}
// 共享模式獲取鎖資源
protected int tryAcquireShared(int arg) {
    throw new UnsupportedOperationException();
}
// 共享模式釋放鎖資源
protected boolean tryReleaseShared(int arg) {
    throw new UnsupportedOperationException();
}
// 獨佔模式下,當前執行緒釋放是持有鎖的執行緒
protected boolean isHeldExclusively() {
    throw new UnsupportedOperationException();
}
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}
public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

2. 獨佔鎖

獨佔鎖指的是隻能有一個執行緒持有鎖資源,如果其他執行緒想要持有鎖,需要等待持有鎖的執行緒釋放資源之後,否則會進入等待佇列等待(沒有獲取到鎖資源),直到等到有執行緒釋放了鎖資源,並且對該等待執行緒進行了喚醒,此時執行緒就會去再次嘗試獲取鎖資源。

2.1 acquire

首先判斷獲取鎖資源釋放成功,如果成功,後面的就不能執行了,如果失敗,就需要進入等待佇列,等待其他執行緒喚醒自己,喚醒後會更新中斷狀態,再次去嘗試獲取鎖資源,如果獲取鎖成功,就根據中斷狀態處理中斷(返回true就會執行selfInterrupt)。

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

2.1.1 建立等待節點和初始化佇列

這裡直接指定了當前執行緒和等待模式,建立了一個新的節點(上面我們在說等待佇列的時候已經說過這個構造方法了),然後判斷佇列是否需要初始化。如果不需要初始化,就會將剛建立的節點設定為尾節點。

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

enq方法幹了兩件事

  1. 如果發現佇列沒有初始化,就建立一個空節點,進行佇列初始化,此時佇列的頭節點和尾節點指向了同一個空節點。
  2. 初始化佇列之後,迴圈並沒有退出,而是來設定新建立的節點為尾節點,設定成功後推出迴圈。此時佇列的頭節點就是一個空節點,佇列的尾節點就是我們剛建立的的新節點。
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

2.1.2 阻塞執行緒

進入到該方法後,如果當前節點的前驅節點是頭節點,會先嚐試獲取一個鎖資源tryAcquire,所以一般來說,一個執行緒在呼叫acquire後,會進行兩次鎖資源獲取,如果獲取鎖資源成功了,就將當前節點設定為頭節點,最後返回中斷狀態(按照我們說的這個流程的話,interrupted的狀態就是宣告時候的false),會在上面我們說的selfInterrupt方法中處理中斷(這裡是線上程獲取到鎖資源之後處理中斷狀態)。

如果嘗試獲取鎖資源失敗,就會執行shouldParkAfterFailedAcquire,來判斷(當前節點的前驅節點必須是signal狀態才進入阻塞狀態)釋放需要將當前執行緒掛起,如果返回true,就會執行後面的parkAndCheckInterrupt將當前執行緒阻塞。

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

就做了一件事,就是讓自己(當前節點)在一個signal狀態的節點後進入阻塞狀態。如果當前節點的前驅節點是一個有效狀態,就直接修改為signal狀態,如果不是,說明前驅節點被取消了,就需要往前找,找到一個有效狀態的節點,跟在後面。

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL) // 必須是signal狀態,才能被阻塞,否則就會再次去嘗試獲取一下鎖資源
        return true;
    if (ws > 0) { // 只有CANCELLED=1 這一個狀態,如果當前節點的前驅節點被取消了,就需要往前找一個狀態為有效狀態的waitStatus<=0節點,跟在他後面。
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else { // 如果是有效狀態,但是不是signal,就cas修改為signal
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

使用LockSupportpark方法將當前執行緒阻塞掛起。

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted(); // 被喚醒後的第一件事是返回中斷狀態,並重置中斷狀態
}

2.1.3 從等待佇列中移除

根據acquireQueued中的程式碼,我們可以知道,只有噹噹前節點未獲取到鎖資源,並且丟擲了異常,failed就是初始值true,就會執行下面的cancelAcquire方法,該方法主要的目的是將當前節點從佇列中移除,會同時移除掉當前節點前後waitStatus=canceled的節點,如果發現當前節點是head節點會去喚醒後繼節點。

private void cancelAcquire(Node node) {
    // Ignore if node doesn't exist
    if (node == null)
        return;
    node.thread = null; // 將當前節點thread置為null
    // Skip cancelled predecessors
    Node pred = node.prev;
    while (pred.waitStatus > 0)
        node.prev = pred = pred.prev; // 從當前節點往前找,找到一個有效等待節點 waitStatus <=0
    Node predNext = pred.next;
    node.waitStatus = Node.CANCELLED; // 將當前節點的狀態設定為cancelled(1)
    if (node == tail && compareAndSetTail(node, pred)) { // 如果他是tail節點,直接使用我們找到的pred節點替換他
        compareAndSetNext(pred, predNext, null); // 因為是tail節點,所以節點後面就不會有下一個節點了,cas將next設定為null,這裡next=null,prev不變
    } else {
        int ws;
        if (pred != head &&
            ((ws = pred.waitStatus) == Node.SIGNAL ||
             (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
            pred.thread != null) { // 如果找到的pred不是頭節點,就將pred的waitStatus設定為signal
            Node next = node.next; // 噹噹前節點的next是一個有效等待節點,就cas設定給pred的next
            if (next != null && next.waitStatus <= 0)
                compareAndSetNext(pred, predNext, next);
        } else {
            unparkSuccessor(node); // 如果當前節點時head,就去喚醒他的後繼節點。
        }
        node.next = node; // help GC
    }
}

2.1.4 中斷處理

將狀態操作補上,selfInterrupt執行,是因為acquireQueued返回true,根據上面的程式碼我們可以看到acquireQueued返回的就是中斷狀態,並且只有獲取到鎖資源的執行緒(這裡說的是正常請求,拋異常也是可以返回的),才能從acquireQueued中返回。如果是被中斷了,但是沒有獲取到鎖資源,那麼就會將中斷狀態記錄下來,等拿到鎖資源後返回。所以這裡的中斷操作我們才說是補的。因為存線上程被中斷了,但是一直沒有拿到鎖,導致不能馬上響應中斷操作。

static void selfInterrupt() {
    Thread.currentThread().interrupt();
}

2.2 release

release是釋放鎖資源,如果tryRelease釋放鎖資源成功(返回true),就會判斷頭節點是否有後繼節點需要喚醒。根據前面分析的程式碼,我們可以知道,當節點初始化時waitStatus是0,當有新的節點掛在剛初始化的節點的後面時,waitStatus會被修改為signal(-1)狀態,所以這裡判斷 h.waitStatus != 0的意思是說頭節點後面還有後繼節點。此時就會去喚醒後繼節點。

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

2.2.1 喚醒後繼節點

這裡傳入的node是head節點,首先是將head節點的waitStatus修改為0,並且拿到head的後繼節點,去喚醒。如果候機節點不是一個有效等待節點,就從佇列的尾部開始從後往前找,找到一個有效等待節點去喚醒。

  1. 目前看來將waitStatus=0會增加一次後繼節點(將要喚醒的節點嘗試獲取鎖的機會),可以去看一下acquireQueued的程式碼,喚醒後會嘗試湖區一次鎖,如果獲取鎖失敗,會執行shouldParkAfterFailedAcquire方法,因為waitStatus=0,如果第一次會將waitStatuscas為signal,然後返回false,此時就會再去嘗試獲取一下鎖,如果依舊獲取失敗,就會執行被阻塞(因為shouldParkAfterFailedAcquire方法判斷到waitStatus=signle,直接返回true)。所以waitStatus設定為0,會增加一次獲取鎖的機會。
  2. 我們知道,尾節點的next是null,這裡我們獲取到的s==null其實並不表示s就一定是tail節點。因為還存在新增加一個節點next還沒來得及設定。也正是因為這原因,所以這裡在找一個有效等待節點時,選擇了從後往前找,因為pre的更新是可靠的,下面補一點前面enq中的程式碼,我們可以看到在新增一個節點時,第一步是將新增加的節點的prev指向當前tail節點,然後進行第二步將新增加的節點設定tail節點,最後是第三步將舊的tail節點的next只想新增加的節點。首先者三個操作肯定不是原子性操作。當第二部操作成功了,說明第一步操作也肯定成功了,此時就會出現一種情況舊的tail節點的next還沒指向新的tail節點,就會出現unparkSuccessors=null(s不是tail節點)的請求,如果是從前往後遍歷,因為next是不可靠的,會出現s=null但是s並不是tail節點的情況。所以使用從後往前遍歷,因為prev的更新,在第一步操作的時候就完成了,新增節點能成功tail的前提是prev已經更新了。
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t; //   1
            if (compareAndSetTail(t, node)) { // 2
                // 可能執行緒會被掛起,來不及執行第三步
                t.next = node; // 3
                return t;
            }
        }
    }
}
private void unparkSuccessor(Node node) {
    /*
     * If status is negative (i.e., possibly needing signal) try
     * to clear in anticipation of signalling.  It is OK if this
     * fails or if status is changed by waiting thread.
     */
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
    /*
     * Thread to unpark is held in successor, which is normally
     * just the next node.  But if cancelled or apparently null,
     * traverse backwards from tail to find the actual
     * non-cancelled successor.
     */
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}

2.3 其他

2.3.1 acquireInterruptibly

acquire相比,多了很多中斷處理的判斷,會對中斷進行及時處理。不會像acquire那樣,需要等到獲取到鎖資源才去處理中斷。這裡主要說與acquire相比的一些不同點。

在進入方法後就會呼叫Thread.interrupted()獲取當前中斷狀態,並重置中斷狀態,如果被中斷了就丟擲異常。然後是獲取鎖資源。可以看到這裡換成了doAcquireInterruptibly處理排隊。裡面與acquireQueued很類似,核心邏輯沒變。

public final void acquireInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (!tryAcquire(arg))
        doAcquireInterruptibly(arg);
}

這裡將addWaiter方法放了進來,在parkAndCheckInterrupt之後,如果發現中斷了,就直接拋異常,此時會執行cancelAcquire方法將當前節點從佇列中移除。最後將異常丟擲。

private void doAcquireInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException(); // 這裡直接丟擲異常,而不是僅僅對中斷狀態進行記錄
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

2.3.2 tryAcquireNanos

不經會對中斷進行及時處理,還設定了一個等待超時時間,核心是使用了LockSupport.parkNanos方法,當超過執行時間會返回。

public final boolean tryAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    return tryAcquire(arg) ||
        doAcquireNanos(arg, nanosTimeout);
}

首先會計算出絕對時間,如果獲取到了鎖資源就返回true,如果超時了就返回false。

private boolean doAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (nanosTimeout <= 0L)
        return false;
    final long deadline = System.nanoTime() + nanosTimeout;
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return true;
            }
            nanosTimeout = deadline - System.nanoTime();
            if (nanosTimeout <= 0L)
                return false;
            if (shouldParkAfterFailedAcquire(p, node) &&
                nanosTimeout > spinForTimeoutThreshold) // 時間小於等於給定值,就直接不park了
                LockSupport.parkNanos(this, nanosTimeout);
            if (Thread.interrupted())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

3. 共享鎖

對於共享鎖來說,鎖資源與具體執行緒是不相關的,如果一個執行緒獲取了鎖資源,另一個執行緒再來獲取鎖資源時,如果鎖資源還能滿足當前執行緒需求,那麼依舊可以獲取鎖資源。

3.1 acquireShared

首先是通過tryAcquireShared嘗試獲取鎖資源,並根據返回值判斷,是否獲取鎖資源成功,<0表示獲取鎖資源失敗,=0說明獲取到了鎖資源,但是沒有多餘的資源了,>0說明獲取到了鎖資源,並且還有多餘的資源。

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

3.1.1 doAcquireShared

與獨佔鎖的實現很類似,首先呼叫addWaiter建立節點並新增到佇列中,然後獲取當前節點的前驅節點,如果前驅節點是頭節點,就嘗試獲取鎖資源,根據返回值判斷是否獲取到鎖資源,如果獲取到了鎖資源,這裡首先會執行setHeadAndPropagate方法,該方法主要是更行head,然後根據條件看是否需要區喚醒後繼節點,這裡的中斷與acquire中基本一致,獲取到鎖資源後,判斷記錄的中斷狀態,然後響應中斷。後面的程式碼和acquire中的基本一致,就不多說了。

private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

3.1.2 setHeadAndPropagate

這裡的setHeadacquireQueued中的一樣,調的是同一個方法,重點是後面的判斷,propagate表示的是剩餘的鎖資源,根據if中的程式碼我們可以知道,只有當存在剩餘鎖資源或者頭節點為null(一般來說head不會為null),或者頭節點waiteStatus<0也就是需要被喚醒,就獲取當前節點的後繼節點判斷,後繼節點是否是null或者是共享節點(也就是判斷是否是一個非獨佔模式的節點),就會執行doReleaseShared進行節點喚醒。

private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // Record old head for check below
    setHead(node);
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())
            doReleaseShared();
    }
}

3.1.3 doReleaseShared

將頭節點的是signal,說明需要喚醒後繼節點,就從signal修改為0,如果修改成功,就呼叫unparkSuccessor喚醒後繼節點。如果頭節點的狀態是0,就從0改為PROPAGATE,不做其他操作。

private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}

3.2 releaseShared

共享鎖釋放資源,首先使用tryReleaseShared進行鎖資源釋放,根據返回結果判斷是否執行doReleaseShared區喚醒後繼節點。

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

3.3 其他

3.3.1 acquireSharedInterruptibly

共享模式下獲取鎖響應中斷。跟acquireInterruptibly處理中斷基本一致,看一眼下面程式碼就知道了。

public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}
private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

3.3.2 tryAcquireSharedNanos

共享模式下的獲取鎖並設定等待超時時間。

public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    return tryAcquireShared(arg) >= 0 ||
        doAcquireSharedNanos(arg, nanosTimeout);
}
private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (nanosTimeout <= 0L)
        return false;
    final long deadline = System.nanoTime() + nanosTimeout;
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return true;
                }
            }
            nanosTimeout = deadline - System.nanoTime();
            if (nanosTimeout <= 0L)
                return false;
            if (shouldParkAfterFailedAcquire(p, node) &&
                nanosTimeout > spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanosTimeout);
            if (Thread.interrupted())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

4. 總結

AQS提供了執行緒同步的基本實現,提供了兩種模式,獨佔模式和共享模式。使用連結串列的方式維護了一個雙向佇列,記錄存放著一個一個等待的執行緒。通過對被volatile修改的state操作,進行鎖資源的獲取與釋放,根據操作結果對佇列中節點進行阻塞和喚醒。我們需要仔細分割槽在什麼情況是存在多執行緒併發,在什麼情況下同一時刻只會有一個執行緒執行(比如獨佔鎖釋放鎖資源),從而確定我們的操作是否需要確保執行緒安全(cas操作)。其中獨佔和共享兩種模式其實編碼上很相似,只不過共享模式在每次獲取完資源後,會判斷是否是否有剩餘資源,從而選擇是否區喚醒後繼節點。

GitHub:https://github.com/godfunc
部落格園:http://www.cnblogs.com/godfunc
Copyright ©2019 Godfunc