1. 程式人生 > >併發系列(七)-----AQS詳解同步對列

併發系列(七)-----AQS詳解同步對列

一 簡介

     AbstractQueuedSynchronizer,即佇列同步器(簡稱AQS)。它是構建鎖或者其他同步元件的基礎框架。它的設計是基於模板方法模式的,也就是說,使用者需要繼承同步器並重寫指定的方法,隨後將同步器組合在自定義同步元件的實現中,並呼叫同步器提供的模板方法,而這些模板方法將會呼叫使用者重寫的方法。

二 AQS的框架說明

    AbstractQueuedSynchronizer繼承自AbstractOwnableSynchronizer。AbstractOwnableSynchronizer有一個成員變數,這個變數代表的是獨佔模式同步的當前所有者。

    AbstractQueuedSynchronizer內部實現了獨佔模式和共享模式。其實現原理就是通過內部維護一個被volatile所以修飾的int變數和一個FIFO的佇列,佇列中的節點去獲取int變數,一旦獲取到那麼意味著該節點(AQS中的內部類中的Node一會原始碼中會說明)中封裝的執行緒就會持有執行時間。下面是網上找到的圖。

從上圖中我們不難看出同步佇列與資源的關係,上面說明了同步佇列和資源的關係,下面我們在看一下,等待佇列與同步佇列的關係。

當呼叫Condition的await()方法(或者以await開頭的方法),會使當前執行緒進入等待佇列並釋放鎖,同時執行緒狀態變為等待狀態。當從await()方法返回時,當前執行緒一定獲取了Condition相關聯的鎖。

如果從佇列(同步佇列和等待佇列)的角度看await()方法,當呼叫await()方法時,相當於同步佇列的首節點(獲取了鎖的節點)移動到Condition的等待佇列中。

三 原始碼分析

  在AQS中的同步佇列和等待佇列中都用到了一個共同一個內部的Node類。根據圖一可以猜測在Node類中一定存在的有Thread、前驅結點(prev)的引用和後驅節點(next)的引用。上面這三項就構成可同步佇列最基礎的東西。根據圖二我們猜測Node中一定存在一個用來區分是等待這個節點是等待狀態還是同步狀態還是取消狀態,又一個變量出來了waitStatus,在看等待佇列的話那麼也就需要一個指向下一個節點的引用(nextWaiter)。至此已經推測出Node中自少也有五個變數。分別是Thread、 prev、 next、 waitStatus、nextWaiter。下面是原始碼來驗證猜測是否正確。

 static final class Node {

        /**
         * 表明這個節點是一個共享模式
         */
        static final Node SHARED = new Node();
        /**
         * 表明這個節點是一個獨佔模式
         */
        static final Node EXCLUSIVE = null;
        /**
         * 由於在同步對列中等待的執行緒等待超時或者被中斷, 需要從同步對列中取消等待,節點進入該狀態將不會變化
         */
        static final int CANCELLED = 1;
        /**
         * 後繼節點的執行緒處於等待狀態,當前面節點的執行緒如果釋放了同步狀態或取消, 將會通知後繼節點,使後繼級節點的執行緒可以執行.
         */
        static final int SIGNAL = -1;
        /**
         * 節點在等待對列中,節點執行緒等待在Condition上, 當其他執行緒對Condition呼叫了 signal()方法後, 該節點將會從等待對列中轉移到同步對列中,加入到同步狀態的獲取
         */
        static final int CONDITION = -2;
        /**
         * 表示下一次共享模式同步狀態獲取將會無條件的被傳播下去,已經釋放了
         */
        static final int PROPAGATE = -3;
        /**
         * 節點的狀態
         */
        volatile int waitStatus;
        /**
         * 前驅節點
         */
        volatile Node prev;
        /**
         * 後驅節點
         */
        volatile Node next;
        /**
         * node對應的執行緒
         */
        volatile Thread thread;
        /**
         * 等待對列中的後繼節點.如果當前節點是共享的那麼這個欄位將是一個SHARED常量, 也是就是說節點型別和等待對列中的後繼節點共用同一個欄位.
         */
        Node nextWaiter;

        /**
         * 是不是共享
         */
        final boolean isShared() {
            return nextWaiter == SHARED;
        }

        /**
         * 返回前驅節點
         *
         * @return node 物件
         */
        final Node predecessor() {
            Node p = prev;
            if (p == null) {
                throw new NullPointerException();
            }
            return p;
        }


        Node() {
        }

        Node(Thread thread, Node node) {
            this.thread = thread;
            this.nextWaiter = node;
        }

        Node(Thread thread, int waitStatus) {
            this.thread = thread;
            this.waitStatus = waitStatus;
        }
    }

看原始碼有幾個已經猜對了,其中多出來幾個int型別的值(SIGNAl等)和Node型別(SHARED),多出來的int型別的值就是waitSatus可以選擇的值,多出來的Node用來表示是獨佔模式和共享模式。

現在已經知道了同步佇列的節點是怎麼構成的了,接下來的問題就是如何操作這些節點來構成一個同步佇列,又是怎麼樣去操作執行緒的呢。

下面看一段原始碼這段原始碼,這段程式碼的意思是向佇列中新增一個節點,具體的執行流程已經註釋寫的很清楚了。

    /**
     * 佇列的頭結點
     */
    private transient volatile Node head;
    /**
     * 佇列的尾節點 
     */
    private transient volatile Node tail;

    /**
     * 新增一個同步節點
     *
     * @param mode 節點
     * @return 節點返回
     */
    private Node addWaiter(Node mode) {
        //建立一個節點這個節點將當前執行緒封裝了
        Node node = new Node(Thread.currentThread(), mode);
        //獲取佇列的尾部節點
        Node pred = tail;
        //當尾節點不為空是先嚐試替換一下,成功的話直接返回
        if (pred != null) {
            node.prev = pred;
            //這裡要注意了compareAndSetTail是CAS操作
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        //迴圈替換
        enq(node);
        return node;
    }

看完上面的方法可能會有一些問題,沒看到佇列的初始化就直接添加了嗎?在上面原始碼中呼叫了一個方法迴圈替換。下面是原始碼

 /**
     * 新增一個節點到尾部
     *
     * @param node 節點
     * @return 返回這個節點的前驅節點
     */
    private Node enq(Node node) {
        for (; ; ) {
            //獲取當前的尾部節點
            Node t = tail;
            if (t == null) {
                //在這裡進行初始化這樣的話頭節點和尾節點都不為null,當下一次
                //迴圈的時候tail迴圈的時候tail就不為null,這樣就可以將傳遞的引數
                //設定為尾節點了
                if (compareAndSetHead(new Node())) {
                    tail = head;
                }
            } else {
                //尾部節點不為null這時就要添加了
                //將傳入的節點的前驅節點設定為當前的尾節點當第一次初始化的時候
                //指向了頭節點不是第一次的話將前驅結點設定為尾部節點
                node.prev = t;
                //使用CAS設定對列的尾節點 這一步如果失敗的話會繼續for迴圈
                //直到成功替換
                if (compareAndSetTail(t, node)) {
                    //當替換成功尾部節點的時候將尾部節點的後驅節點設定為新增的節點
                    t.next = node;
                    return t;
                }

            }
        }
    }

這個方法中我們可以看到首先判斷了一些佇列的尾部節點是否為空,如果尾節點為空的話,那麼佇列就可能為空,使用CAS設定將頭節點初始化,同時也將尾部節點初始化。這樣的話這個佇列也就被初始化了,接下來就是設定尾部節點,和設定該節點的前驅和後驅節點了。

同步佇列的初始化新增和維護就已經結束了接下來就是資源的競爭了。關於資源獲取就有兩種猜想了

猜想一:執行緒上來就直接獲取,如果獲取成功的話那就執行了,獲取失敗的話被封裝成節點新增到同步佇列的尾部

猜想二:執行緒一上來看看佇列中有沒有要同步的節點,如果有的話那就不獲取資源了直接新增到同步佇列中,等待上一個節點喚醒。

四 總結

1.AQS的整體架構:基於模板方法的設計模式,主要構成有state,同步對列和等待對列

2.AQS的同步佇列的新增節點,與初始化。其中的主要方法是addWaiter()

3.資源獲取的猜想,下一篇總結具體的佇列的節點是如果獲取資源的。