1. 程式人生 > 實用技巧 >學習JUC原始碼(1)——AQS同步佇列(原始碼分析結合圖文理解)

學習JUC原始碼(1)——AQS同步佇列(原始碼分析結合圖文理解)

前言

  最近結合書籍《Java併發程式設計藝術》一直在看AQS的原始碼,發現AQS核心就是:利用內建的FIFO雙向佇列結構來實現執行緒排隊獲取int變數的同步狀態,以此奠定了很多併發包中大部分實現基礎,比如ReentranLock等。今天又是週末,便來總結下最近看的消化後的內容。

  主要參考資料《Java併發程式設計藝術》(有需要的小夥伴可以找我,我這裡只有電子PDF)結合ReentranLock、AQS等原始碼。

  博文中的流程圖,結構圖等都是我理解之後一步步親自畫的,如果轉載,請標明謝謝!


一、同步佇列的結構與實現

1、同步佇列的結構

(1)結構介紹

  AQS使用的同步佇列是基於一種CLH鎖演算法來實現

(引用網上資料對CLH簡單介紹):

  CLH鎖也是一種基於連結串列的可擴充套件、高效能、公平的自旋鎖,申請執行緒只在本地變數上自旋,它不斷輪詢前驅的狀態,如果發現前驅釋放了鎖就結束自旋

  結點之間是通過隱形的連結串列相連,之所以叫隱形的連結串列是由於這些結點之間沒有明顯的next指標,而是通過myPred所指向的結點的變化情況來影響myNode的行為;

  當一個執行緒須要獲取鎖時,會建立一個新的QNode。將當中的locked設定為true表示須要獲取鎖。然後執行緒對tail域呼叫getAndSet方法,使自己成為佇列的尾部。同一時候獲取一個指向其前趨的引用myPred,然後該執行緒就在前趨結點的locked欄位上旋轉。直到前趨結點釋放鎖。

  當一個執行緒須要釋放鎖時,將當前結點的locked域設定為false,同一時候回收前趨結點。執行緒A須要獲取鎖。其myNode域為true。些時tail指向執行緒A的結點,然後執行緒B也增加到執行緒A後面。tail指向執行緒B的結點。然後執行緒A和B都在它的myPred域上旋轉,一旦它的myPred結點的locked欄位變為false,它就能夠獲取鎖。

而在原始碼中也有這樣的介紹:

/**
* Wait queue node class.
*
* <p>The wait queue is a variant of a "CLH" (Craig, Landin, and
* Hagersten) lock queue. CLH locks are normally used for
* spinlocks.
* ...........
* <p>To enqueue into a CLH lock, you atomically splice it in as new
* tail. To dequeue, you just set the head field.
* <pre>
* +------+ prev +-----+ +-----+
* head | | <---- | | <---- | | tail
* +------+ +-----+ +-----+
* </pre>
* ..............

在AQS中的同步佇列結構以及獲取/釋放鎖都是基於此實現的,這裡我們先放一個我畫的基本結構來理解AQS同步佇列,再進一步介紹一些細節。

根據以上圖我們看到:

  • 該佇列是雙向FIFO佇列,每個節點都有pre、next域;
  • 同步器包含了兩個節點型別的引用,一個指向頭結點,一個指向尾節點;
  • 新加入執行緒被構造成Node通過呼叫compareAndSetTail加入同步佇列中;
  • 使用setHead(Node node)設定頭結點,指向佇列頭。使用compareAndSetTail(Node exceptNode, Node updateNode)指向佇列尾節點。

在原始碼中我們可以看到:

    // 內部類Node節點
    static final class Node{...}
    // 同步佇列的head引用
    private transient volatile Node head;
    // 同步佇列的tail引用
    private transient volatile Node tail;

(2)節點構成

那麼Node結構的具體構成是什麼呢?我們具體看內部類Node的原始碼:

    static final class Node {
        /** Marker to indicate a node is waiting in shared mode */
        static final Node SHARED = new Node();
        /** Marker to indicate a node is waiting in exclusive mode */
        static final Node EXCLUSIVE = null;

        /** waitStatus value to indicate thread has cancelled */
        static final int CANCELLED =  1;
        /** waitStatus value to indicate successor's thread needs unparking */
        static final int SIGNAL    = -1;
        /** waitStatus value to indicate thread is waiting on condition */
        static final int CONDITION = -2;
        /**
         * waitStatus value to indicate the next acquireShared should
         * unconditionally propagate
         */
        static final int PROPAGATE = -3;

        /** 等待狀態:
         * 0 INITAIL: 初始狀態
         * 1 CANCELLED: 由於等待超時或者被中斷,需要從同步佇列中取消等待,節點進入該狀態不會被改變
         * -1 SIGNAL: 當前節點釋放同步狀態或被取消,則等待狀態的後繼節點被通知
         * -2 CONDITION: 節點在等待佇列中,執行緒在Condition上,需要其它執行緒呼叫Condition的signal()方法才能從等待隊轉移到同步佇列
         * -3 PROPAGATE: 表示下一個共享式同步狀態將會無條件被傳播下去
         */
        volatile int waitStatus;
        /** 前驅結點 */
        volatile Node prev;

        /** 後繼節點 */
        volatile Node next;
 
        /** 獲取同步狀態的執行緒 */
        volatile Thread thread;
        
        /** 等待佇列中的後繼節點 */
        Node nextWaiter;
      
        /** 判斷Node是否是共享模式 */
        final boolean isShared() {
            return nextWaiter == SHARED;
        }
 
        /** 返回前驅結點 */
        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }

        Node() {    // Used to establish initial head or SHARED marker
        }

        Node(Thread thread, Node mode) {     // Used by addWaiter
            this.nextWaiter = mode;
            this.thread = thread;
        }

        Node(Thread thread, int waitStatus) { // Used by Condition
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
    }

從原始碼中可以發現:同步佇列中的節點Node用來儲存獲取同步狀態失敗的執行緒引用、等待狀態以及前驅和後繼節點

節點是構成同步佇列的基礎,沒有成功獲取同步狀態的執行緒將成為節點加入該佇列的尾部。當一個執行緒無法獲取同步狀態時,會被構造成節點並加入同步佇列中,通過CAS保證設定尾節點這一步是執行緒安全的,此時才能認為當前節點(執行緒)成功加入同步佇列與尾節點建立聯絡。具體的實現邏輯請看下面介紹!

2、同步狀態獲取與釋放

(1)獨佔式同步狀態獲取與釋放

通過呼叫同步器acquire(int arg)方法可以獲取同步狀態,該方法中斷不敏感,也就是由於執行緒獲取同步狀態失敗後進入同步佇列中,後序執行緒對進行中斷操作時,執行緒不會從同步佇列中移出

  public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

同步狀態獲取主要的流程步驟:

1)首先呼叫自定義同步器實現tryAcquire(int arg)方法,該方法保證執行緒安全的獲取同步狀態

2)如果獲取失敗則構造同步節點(獨佔式Node.EXCLUSIVE)並通過addWaiter(Node ndoe)方法將該節點加入到同步佇列的尾部,同時enq(node)通過for(;;)迴圈保證安全設定尾節點。

 private Node addWaiter(Node mode) {
        // 根據給定模式構造Node
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail; // 嘗試在尾部新增
        if (pred != null) {
            node.prev = pred;
            // cas方式保證正確新增尾節點
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        // enq主要是通過for(;;)死迴圈來確保節點正確新增
        // 在for(;;)死迴圈中,通過cas將節點設定為尾節點時,才返回;否則一直嘗試設定
        enq(node);
        return node;
    }
 private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize  當tail節點為null時,必須初始化構造好    head節點
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else { // 否則就通過cas開始新增尾節點
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

假設原佇列中存在Node-1到Node-4節點,此時某個執行緒獲取同步狀態失敗則構成成Node-5通過CAS方式加入佇列(下圖忽略自旋環節)。

          

3)節點進入同步佇列之後“自旋”,即acquireQueued(final Node node, int arg)方法,在這個方法中,當前node死迴圈嘗試獲取鎖狀態,但是隻有node的前驅結點是Head才能嘗試獲取同步狀態,取成功之後立即設定當前節點為Head,併成功返回。否則就會一直自旋。

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                // 當前node節點的前驅是Head時(p == head),才能有資格去嘗試獲取同步狀態(tryAcquire(arg))
                // 這是因為當前節點的前驅結點獲得同步狀態,才能喚醒後繼節點,即當前節點
                if (p == head && tryAcquire(arg)) { // 以上條件滿足之後
                    setHead(node); // 設定當前節點為Head
                    p.next = null; // help GC // 釋放ndoe的前驅節點
                    failed = false;
                    return interrupted;
                }
                // 執行緒被中斷或者前驅結點被釋放,則繼續進入檢查:p == head && tryAcquire(arg
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

此時新加入的Node-5節點也開始自旋,此時的Head(Node-1)已經獲取到了同步狀態,而Node-2退出了自旋,成為了新的Head。

        

文字總結:

1)同步器會維護一個雙向FIFO佇列,獲取同步失敗的執行緒將會被構造成Node加入隊尾(並且做自旋檢查:檢查前驅結點是否是Head);

2)當前執行緒想要獲得同步狀態,前提是其前驅結點是頭結點,並且獲得了同步狀態;

3)當Head呼叫release(int arg)釋放鎖的同時會喚醒後繼節點(即當前節點),後繼節點結束自旋

流程圖總結:

                 

同步器的release方法:釋放鎖的同時,喚醒後繼節點(進而時後繼節點重新獲取同步狀態)

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                // 該方法會喚醒Head節點的後繼節點,使其重試嘗試獲取同步狀態
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

UnparkSuccessor(Node node)方法使用LookSupport(LockSupport.unpark)喚醒處於等待狀態的執行緒(之後會慢慢看原始碼介紹)。

(2)共享式同步狀態獲取與釋放

共享鎖跟獨佔式鎖最大的不同就是:某一時刻有多個執行緒同時獲取到同步狀態,獲取判斷是否獲取同步狀態成功的關鍵,獲取到的同步狀態要大於等於0。而其他步驟基本都是一致的,還是從原始碼開始分析起:帶字尾Share都為共享式同步方法。

1)acquireShared(int arg)獲取同步狀態:如果獲取失敗則加入隊尾,並且檢查是否具備退出自旋的條件(前驅結點是頭結點並且能成功獲取同步狀態)

    public final void acquireShared(int arg) {
        // tryAcquireShared 獲取同步狀態,大於0才是獲取狀態成功,否則就是失敗
        if (tryAcquireShared(arg) < 0)
            // 獲取狀態失敗則構造共享Node,加入佇列;
            // 並且檢查是否具備退出自旋的條件:即preNode為head,並且能獲取到同步狀態
            doAcquireShared(arg);
    }

2)doAcquireShared(arg):獲取失敗的Node加入佇列,如果當前節點的前驅結點是頭結點的話,嘗試獲取同步狀態,如果大於等於0則在for(;;)中退出(退出自旋)。

private void doAcquireShared(int arg) {
        // 構造共享模式的Node
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    // 前驅節點是頭結點,並且能獲取狀態成功,則return返回,退出死迴圈(自旋)
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

3)releaseShared(int arg):釋放同步狀態,通過loop+CAS方式釋放多個執行緒的同步狀態。

    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            // 通過loop+CAS方式釋放多個執行緒的同步狀態
            doReleaseShared();
            return true;
        }
        return false;
    }

二、自定義同步元件(實現Lock,內部類Sync繼承AQS)

1、實現一個不可重入的互斥鎖Mutex

2、實現指定共享數量的共享鎖MyShareLock

--------------------------------未完待續(為了加深理解畫圖寫程式碼花費時間較長,所以慢慢來保證質量,不著急!)-------------------------------------------