1. 程式人生 > >AbstractQueuedSynchronizer原始碼解析

AbstractQueuedSynchronizer原始碼解析

AQS

AbstractQueuedSynchronizer 佇列同步器,是 JDK1.5 提供的一個基礎框架,用於構建依賴於先進先出(FIFO)等待佇列的阻塞鎖以及相關同步器(包括 ReentrantLock、CountDownLatch、Semaphore 等),它使用一個 int 型別的成員變數 state 表示同步狀態,同步器提供了一系列模板方法來訪問修改同步狀態: getState():獲取當前同步狀態 setState(int newState):設定當前同步狀態 compareAndSetState(int expect, int update)CAS 設定同步狀態(保證原子性)

/**
 * 等待佇列的頭節點,延遲初始化,除了初始化,只能通過setHead修改
 * 頭節點存在,它的waitStatus不能是CANCEllED
 */
private transient volatile Node head;
// 等待佇列的尾節點,延遲初始化,只能通過enq方法新增新的等待節點
private transient volatile Node tail;
// 同步狀態
private volatile int state;

頭結點和尾結點都使用volatile修飾,保證了可見性。

// 節點以共享模式等待的標識
static final Node SHARED = new Node
(); // 節點以獨佔模式等待的標識 static final Node EXCLUSIVE = null; // 表示執行緒被取消 static final int CANCELLED = 1; // 表示後續節點執行緒需要被喚醒 static final int SIGNAL = -1; // 表示執行緒正在condition上等待 static final int CONDITION = -2; // 表示下一次共享式同步狀態獲取將會無條件傳播下去 static final int PROPAGATE = -3; // 等待狀態 僅有上述取值和初始值0 volatile int waitStatus; // 當前節點前驅節點 入隊時分配 出隊時清空(GC原因)
volatile Node prev; // 當前節點後繼節點 volatile Node next; // 入隊節點執行緒 構造中初始化 使用完畢清空 volatile Thread thread; // 等待佇列的後繼節點 Node nextWaiter;

同步佇列結構圖 同步佇列結構圖

原始碼分析

獨佔鎖

獨佔鎖獲取

/**
 * 以獨佔模式獲取,忽略中斷,通過呼叫至少一次tryAcquire來實現,成功時返回
 * 否則執行緒會排隊等待,可能重複阻塞和解除阻塞,通過呼叫tryAcquire直到成功
 * 改方法可以用於實現Lock的lock()方法
 */
public final void acquire(int arg) {
/**
 * tryAcquire:嘗試以獨佔方式獲取,該方法應該查詢當前狀態是否允許獨佔模式獲取,如果可以則獲取
 * addWaiter:使用給定模式為當前執行緒建立入隊節點
 * acquireQueued:為佇列中的執行緒以獨佔不可中斷模式獲取同步狀態
 */
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
    // 中斷當前執行緒
        selfInterrupt();
}

建立入隊節點

// AbstractQueuedSynchronizer
private Node addWaiter(Node mode) {
// 以給定模式構建節點
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        node.prev = pred; // 設定當前節點的前驅節點為tail尾結點
    // CAS設定當前節點為尾結點
        if (compareAndSetTail(pred, node)) { 
            pred.next = node;
            return node;
        }
    }
// 節點插入佇列
    enq(node);
    return node;
}

將當前節點插入到佇列

// AbstractQueuedSynchronizer
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
	  // 初始化CAS建立空節點
            if (compareAndSetHead(new Node()))
                tail = head; // CAS設定成功 將頭結點賦值給尾結點,初始狀態,頭節點和尾節點都指向空節點
        } else {
	  // 將當前節點的前驅節點指向尾節點
            node.prev = t;
            if (compareAndSetTail(t, node)) { // CAS設定尾節點
                t.next = node; // 設定尾節點的後繼節點為當前節點
                return t;
            }
        }
    }
}

在acquireQueued方法中通過自旋獲取同步狀態,有一個條件是隻有當前節點的前驅節點是頭節點才能夠嘗試獲取同步狀態,為什麼會有這個判斷? 原因:因為頭節點是成功獲取到同步狀態的節點,而頭節點的執行緒釋放了同步狀態後,將會喚醒其後繼節點,後繼節點的執行緒被喚醒後需要檢查自己的前驅節點是否為頭節點。

//  AbstractQueuedSynchronizer
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true; //再次獲取鎖是否成功標識
    try {
        boolean interrupted = false; // 中斷標識
        for (;;) {
            final Node p = node.predecessor(); // 獲取當前節點的前驅節點
	  // 判斷當前節點的前驅節點是頭節點 且 獲取到同步狀態
            if (p == head && tryAcquire(arg)) {
                setHead(node); // 設定頭節點
                p.next = null; // 幫助GC
                failed = false; // 設定再次獲取鎖成功
                return interrupted; // 返回中斷標識
            }
	  // 判斷獲取鎖失敗後是否需要阻塞 如果需要,阻塞後判斷執行緒狀態
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
	  // 獲取鎖失敗 則取消嘗試獲取
            cancelAcquire(node);
    }
}

判斷獲取鎖失敗後是否需要阻塞

// AbstractQueuedSynchronizer
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        // 如果當前節點的前驅節點的等待狀態為SIGNAL,則可以阻塞(返回true)
        return true;
    if (ws > 0) {
        //如果當前節點的前驅節點的等待狀態為CANCELLED,處於取消狀態,則跳過前驅節點進行重試
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node; // 找到最近的狀態不是取消的前驅節點 將該節點的後繼節點設定為當前節點
    } else {
        /** 
         * 節點狀態為0或PROPAGATE,表明需要一個訊號,但不需要阻塞。需要重試確保在阻塞前不能獲取鎖
         *  CAS設定等待狀態為SIGNAL
         */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}
// AbstractQueuedSynchronizer
private void cancelAcquire(Node node) {
    if (node == null)
        return;

    node.thread = null;

// 跳過取消的前驅節點,找到最近的狀態不為取消的前驅節點
    Node pred = node.prev;
    while (pred.waitStatus > 0)
        node.prev = pred = pred.prev;

    Node predNext = pred.next;

    node.waitStatus = Node.CANCELLED;

// 如果當前節點是尾結點,則CAS設定當前節點的前驅節點為尾節點
    if (node == tail && compareAndSetTail(node, pred)) {
    // CAS設定當前節點的前驅節點的後繼為null
        compareAndSetNext(pred, predNext, null);
    } else {
        int ws;
    // 判斷 當前節點的前驅節點不是頭節點 且 (等待狀態為SIGNAL 或 等待狀態不是取消狀態,則設定等待狀態為SIGNAL) 且 執行緒不為null
        if (pred != head &&
            ((ws = pred.waitStatus) == Node.SIGNAL ||
             (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
            pred.thread != null) {
            Node next = node.next;
        /**
         * 如果當前節點的後繼節點不為nul 且 等待狀態不為取消狀態
         * 則CAS設定前驅節點的後繼節點為當前節點的後繼節點
         */
            if (next != null && next.waitStatus <= 0)
                compareAndSetNext(pred, predNext, next);
        } else {
        // 喚醒節點的後繼節點
            unparkSuccessor(node);
        }

        node.next = node; // 幫助GC
    }
}
// AbstractQueuedSynchronizer
private void unparkSuccessor(Node node) {
    /*
* 如果等待狀態為負數,可能需要signal,嘗試清除狀態,
* 如果失敗或被等待執行緒修改也沒問題
     */
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
   // 獲取當前節點的後繼節點,從尾結點開始遍歷,獲取最近一個等待狀態值不為取消的節點
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
  // 喚醒該節點的執行緒
        LockSupport.unpark(s.thread);
}

獨佔鎖釋放

// AbstractQueuedSynchronizer
public final boolean release(int arg) {
// 通過呼叫子類實現的該方法進行同步狀態值得修改來釋放獨佔鎖
    if (tryRelease(arg)) {
        Node h = head;
    // 頭節點不為空 且 等待狀態值不為0 則喚醒後繼節點
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

共享鎖

共享鎖獲取

// AbstractQueuedSynchronizer
public final void acquireShared(int arg) {
// 嘗試以共享方式獲取鎖
    if (tryAcquireShared(arg) < 0)
        // 嘗試以共享不可中斷模式獲取
        doAcquireShared(arg);
}
// AbstractQueuedSynchronizer
private void doAcquireShared(int arg) {
// 以給定模式構建節點
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true; // 獲取共享鎖是否失敗標識
    try {
        boolean interrupted = false; // 中斷標識
        for (;;) {
            final Node p = node.predecessor(); // 獲取當前節點的前驅節點
            if (p == head) { // 當前節點的前驅節點為頭節點
                int r = tryAcquireShared(arg); // 嘗試獲取共享鎖
                if (r >= 0) {
                    setHeadAndPropagate(node, r); // 獲取共享鎖成功 設定頭節點並傳播
                    p.next = null; // 幫助GC
                // 中斷標識為true 則中斷當前執行緒
                    if (interrupted)
                        selfInterrupt(); 
                    failed = false; // 獲取共享鎖成功 設定獲取共享鎖失敗標識為false
                    return;
                }
            }
         // 判斷再次獲取鎖失敗後是否需要阻塞 如果需要,阻塞後判斷執行緒狀態
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
         // 獲取鎖失敗 則取消嘗試獲取
            cancelAcquire(node);
    }
}
// AbstractQueuedSynchronizer
private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // Record old head for check below
    setHead(node); // 設定當前節點為頭節點
    /*
* propagate大於0表示後續節點需要被喚醒
* 頭節點為null或waitStatus<0,表示後繼節點需要被喚醒
     */

    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        // 後繼節點為null 或 後繼節點為共享型別
        if (s == null || s.isShared())
            // 釋放共享鎖
            doReleaseShared();
    }
}
// AbstractQueuedSynchronizer
private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                // 如果等待狀態為SIGNAL,則CAS設定等待狀態
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // CAS失敗則迴圈重新檢查
                unparkSuccessor(h); // 喚醒後繼節點
            }
// 如果後續節點不需要喚醒 則將節點等待狀態設定為PROPAGATE確保傳播
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // CAS失敗則迴圈設定
        }
        // 頭節點沒有發生變化,結束迴圈,否則迴圈保證喚醒動作傳播
        if (h == head)                   
            break;
    }
}

共享鎖釋放

// AbstractQueuedSynchronizer
public final boolean releaseShared(int arg) {
    // 嘗試釋放共享鎖
    if (tryReleaseShared(arg)) {
        doReleaseShared(); // 釋放共享鎖
        return true;
    }
    return false;
}

參考資料