1. 程式人生 > >併發程式設計之 AQS 原始碼剖析

併發程式設計之 AQS 原始碼剖析

前言

JDK 1.5 的 java.util.concurrent.locks 包中都是鎖,其中有一個抽象類 AbstractQueuedSynchronizer (抽象佇列同步器),也就是 AQS, 我們今天就來看看該類。

1.結構

類結構

我們看看該類的結構,該類被 CountDown,ThreadPoolExecutor,ReentrantLock,ReentrantReadWriteLock,Semaphore 的內部類所繼承,而這些內部類都是這些鎖的真正實現,不論是公平鎖還是非公平鎖。

也就是說,這些鎖的真正實現都是該類來實現的。那麼,我們就從這些鎖開始看看是如何實現從鎖到解鎖的。

2. 重入鎖的 lock 方法

我們先看看重入鎖 ReentranLock 的 lock 方法。

    public void lock() {
        sync.lock();
    }

該方法呼叫了內部類的 sync 抽象類的 lock 方法,該方法的實現有公平鎖和非公平鎖。我們看看公平鎖是如何實現的:

    static final class FairSync extends Sync {
        private static final long serialVersionUID = -3000897897090466540L;

        final
void lock() { acquire(1); }

呼叫了 acquire 方法,該方法就是 AQS 的的方法,因為 sync 繼承了 AQS,而公平鎖繼承了 Sync,等於間接繼承了 AQS,我們看看該方法。

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

該方法JDK註釋 :

以獨佔模式獲取物件,如果被中斷則中止。通過先檢查中斷狀態,然後至少呼叫一次 tryAcquire(int) 來實現此方法,並在成功時返回。否則在成功之前,或者執行緒被中斷之前,一直呼叫 tryAcquire(int) 將執行緒加入佇列,執行緒可能重複被阻塞或不被阻塞。可以使用此方法來實現 Lock.lockInterruptibly() 方法。

樓主來簡單說一下該方法的作用:該方法會試圖獲取鎖,如果獲取不到,就會被加入等待佇列等待被喚醒,這個其實和我們之前分析的 synchronized 是差不多的。

我們仔細看看該方法,首先是 tryAcquire 方法,也就是嘗試獲取鎖,該方法是需要被寫的,父類預設的方法是丟擲異常。如何重寫呢?抽象類定義一個標準:如果返回 true,表示獲取鎖成功,反之失敗。

tryAcquire

我們回到 acquire 方法,如果獲取鎖成功,就直接返回了,如果失敗了,則繼續後面的操作,也就是將執行緒放入等待佇列中:

acquireQueued(addWaiter(Node.EXCLUSIVE), arg)

我們先看看 addWaiter(Node.EXCLUSIVE) 方法:

    /**
     * Creates and enqueues node for current thread and given mode.
     *
     * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
     * @return the new node
     */
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

該方法註釋:將當前執行緒放入到佇列節點。引數呢?引數有2種,Node.EXCLUSIVE 是獨佔鎖,Node.SHARED 是分享鎖。

在 Node 類種定義了這兩個常量:

    static final class Node {
        /** Marker to indicate a node is waiting in shared mode */
        static final Node SHARED = new Node();
        /** Marker to indicate a node is waiting in exclusive mode */
        static final Node EXCLUSIVE = null;

獨佔鎖是null,共享鎖是空物件。

我們看看該方法的步驟:
1. 建立一個當前執行緒的 Node 物件(nextWaiter 屬性為 null, thread 屬性為 當前執行緒)。
2. 獲取到末端節點,如果末端節點不為 null,則將末端節點設定為剛剛建立的節點的 prev 屬性。
2.1. 通過 CAS 設定末端節點為新的節點。如果成功,將剛剛建立的節點設定為老末端節點的next節點。最後返回。
3. 如果 tail 末端節點是null,則呼叫enq 方法。建立一個末端節點,然後,將剛剛建立的末端節點設定為新節點的 prev 屬性(此時的末端節點就是 head 頭節點)。最後返回剛剛建立的 node 節點。

我們看看 enq 方法的實現:

    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

該方法步驟如下:
1. 死迴圈,獲取到末端節點,如果是null,則使用CAS建立一個頭節點(頭節點此時也是null),並將頭節點賦值末端節點。
2. 由於剛剛CAS 成功,走else 邏輯,將末端節點賦值給新節點的 prev 屬性,使用CAS設定新的末端節點為剛剛建立的 node物件。然後返回node 物件。

該方法主要就是初始化頭節點和末端節點,並將新的節點追加到末端節點並更新末端節點。

我們會到 addWaiter 方法中,該方法主要作用就是根據當前執行緒建立一個 node 物件,並追加到佇列的末端。

我們再回到 acquire 方法:

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

addWaiter 方法會返回剛剛建立的node 物件,然後呼叫 acquireQueued 方法,我們進入該方法檢視:

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

該方法步驟如下:
1. 死迴圈。先獲取 node 物件 prev 節點,如果該節點和 head 相等,說明是他是第二個節點,那麼此時就可以嘗試獲取鎖了。
1.1 如果獲取鎖成功,就設定當前節點為 head 節點(同時設定當前node的執行緒為null,prev為null),並設定他的 prev 節點的 next 節點為 null(幫助GC回收)。最後,返回等待過程中是否中斷的布林值。
2. 如果上面的兩個條件不成立,則呼叫 shouldParkAfterFailedAcquire 方法和 parkAndCheckInterrupt 方法。這兩個方法的目的就是將當前執行緒掛起。然後等待被喚醒或者被中斷。稍後,我們仔細檢視這兩個方法。
3. 如果掛起後被當前執行緒喚醒,則再度迴圈,判斷是該節點的 prev 節點是否是 head,一般來講,當你被喚醒,說明你別准許去拿鎖了,也就是 head 節點完成了任務釋放了鎖。然後重複步驟 1。最後返回。

我們看看 shouldParkAfterFailedAcquire 方法:

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)

            return true;
        if (ws > 0) {

            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {

            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

該方法步驟如下:

  1. 獲取去上一個節點的等待狀態,如果狀態是 SIGNAL -1,就直接返回 true,表示可以掛起並休息。
  2. 如果 waitStatus 大於 0, 則迴圈檢查 prev 節點的 prev 的waitStatus,知道遇到一個狀態不大於0。該欄位有4個狀態,分別是 CANCELLED = 1,SIGNAL = -1, CONDITION = -2, PROPAGATE = -3,也就是說,如果大於 0,就是取消狀態。那麼,往上找到那個不大於0的節點後怎麼辦?將當前節點指向 那個節點的 next 節點,也就是說,那些大於0 狀態的節點都失效這裡,隨時會被GC回收。
  3. 如果不大於0 也不是 -1,則將上一個節點的狀態設定為有效, 也就是 -1.最後返回 false。注意,在acquireQueued 方法中,返回 false 後會繼續迴圈,此時 pred 節點已經是 -1 了,因此最終會返回 true。

再看 parkAndCheckInterrupt 方法(掛起並檢查是否中斷):

   private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

該方法非常的簡單,就是將當前執行緒掛起,等到有別的執行緒喚醒(通常是 head 節點中執行緒),然後返回當前執行緒是否是被中斷了,注意,該方法會清除中斷狀態。

回到 acquireQueued 方法,總結一下該方法,該方法就是將剛剛建立的執行緒節點掛起,然後等待喚醒,如果被喚醒了,則將自己設定為 head 節點。最後,返回是否被中斷。

再回到 acquire 方法:

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

在該方法中,如果獲取鎖失敗並被喚醒,且被中斷了,那麼就執行 selfInterrupt 方法:

    static void selfInterrupt() {
        Thread.currentThread().interrupt();
    }

將當前執行緒設定中斷狀態位。

好了,到這裡,整個lock 方法,我們基本就分析完了,可以說,整個方法就是將執行緒放入到等待佇列並掛起然後等待 head 節點喚醒。其中,tryAcquire 方法高頻出現,該方法具體實現由子類實現,比如 重入鎖,讀寫鎖,執行緒池的 worker,其中 CountDown 和 Semaphore 實現的是共享模式的 tryAcquire 方法,但原理相同。AQS 如何定義的?就是返回 true 表示拿到鎖了,返回 false 表示拿鎖失敗,具體如何實現AQS管不了。但他們都依賴一個極其重要的欄位 ——- state。

樓主有必要說說這個欄位,該欄位定義了當前同步器的狀態,如果大家知道 pv 原語的話,應該很好理解這個欄位,該欄位在 AQS 中是如何定義的:

    /**
     * The synchronization state.
     */
    private volatile int state;

volatile。該欄位可能會被多個執行緒修改,因此,需要設定為 volatile ,保證變數的可見性。

我們可以看看 重入鎖中的公平鎖是如何使用該欄位的。

        /**
         * Fair version of tryAcquire.  Don't grant access unless
         * recursive call or no waiters or is first.
         */
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    }

該方法重寫了 tryAcquire 方法,步驟如下:
1. 獲取當前執行緒,獲取鎖(同步器)的狀態。
2. 如果同步器等於0,就 CAS 設定 state 為 1,表示同步器被佔用了,並且設定同步器的持有執行緒為當前執行緒(為了判斷重入)。最後返回拿鎖成功 true。
3. 如果不是0,並且當前執行緒就是同步器的持有執行緒,說明是重入。那麼就將 state 加1,最後返回 true。所以說,當你重入一次,就需要解鎖一次,否則下個執行緒永遠拿不到鎖。
4. 如果都不是,返回 false ,表示拿鎖失敗。

從這裡,我們可以看到, statei 欄位非常的重要,判斷鎖是否被持有完全根據這個欄位來的。這點一定要注意,而這個設計和作業系統的 pv 由異曲同工之妙。

那麼看完了拿鎖,再看看解鎖,我們可以先猜想一下如何設計,首先肯定是要將 state 欄位設定為 0,才能讓下個執行緒拿鎖,然後呢?喚醒等待佇列中的下個執行緒。讓他嘗試拿鎖。那到底 doug lea 是不是這麼設計的呢?我們來看看。

3. 重入鎖的 unlock 方法

該方法呼叫了AQS 的 release 方法:

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

首先嚐試釋放,如果成功,則喚醒下一個執行緒。

我們先看看 tryRelease 方法 (需要重寫):

        protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }

該方法步驟如下:
1. 計算同步器狀態減去1後的值。
2. 判斷同步器執行緒和當前執行緒是否相同,如果不同,丟擲監視器狀態異常。
3. 判斷狀態是否是 0,也就是說,如果是0,表示沒有執行緒持有鎖了,那麼就是設定 free 為 true,並且設定同步器的 thread 屬性為null,
4. 最後設定 state 為 計算的值,這裡需要考慮重入。最後返回。

可以看到,如果 state 不是 0 的話,就會返回 false ,後面的步驟就沒有了,也就是說,重入鎖解鎖的時候不會喚醒下一個執行緒。

如果解鎖成功,執行下面的步驟,如果 head 頭節點不是 null 並且他的狀態不是0,說明有執行緒可以喚醒,執行 unparkSuccessor 方法。

    private void unparkSuccessor(Node node) {

        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);


        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

該方法步驟如下:
1. 獲取到頭節點的狀態。
2. 如果小於0,CAS 設定狀態為0。
2. 獲取到頭節點的next 節點,判斷是否為null,或者 next 節點是否大於0,如果是null 或者大於0,則從末端節點開始向上查詢,直到找到狀態小於等於0 的節點。
3. 最後喚醒該節點的執行緒。

這個時候,等待在 acquireQueued 方法中,準確的說是 parkAndCheckInterrupt 方法中的 執行緒被喚醒,開始繼續迴圈,嘗試拿鎖(需要修改 state 變數),並設定自己為 head。

這裡還有一個漏掉的地方,就是 waitStatus 變數,什麼時候會大於等於0? 該變數預設是 0,大於 0 的狀態是被取消的狀態。什麼時候會被取消呢? 在acquireQueued 方法中,如果方法沒有正常結束,則會執行 finally 中的 cancelAcquire 方法,該方法會將狀態變成 1,也就是取消狀態。

4 總結

這次我們分析 AQS,也就是鎖的的真正實現,只分析了 lock 方法和 unlock 方法,這兩個方法是重入鎖的基礎。CountDown 和 Semaphore 是共享鎖,但是基本原理相同,只是將 state 的數字加大便可以實現。而和重入鎖等鎖相關聯的 Condition 則是通過 LockSupport 工具類直接掛起當前執行緒,並將當前執行緒新增到等待佇列中,當呼叫 Condition 的 signal 方法時,則喚醒佇列中的第一個執行緒。具體原始碼我們有機會再分析。

總之,java 重入鎖的實現基於 AQS,而 AQS 主要基於 state 變數和佇列來實現。實現原理和 pv原語 類似。

good luck!!!!!