1. 程式人生 > >【JDK源碼分析】深入源碼分析CountDownLatch

【JDK源碼分析】深入源碼分析CountDownLatch

代碼 compare signal views 類繼承 clas ref 關聯 ati

前言

CountDownLatch是一個閉鎖實現,它可以使一個或者多個線程等待一組事件發生。它包含一個計數器,用來表示需要等待的事件數量,coutDown方法用於表示一個事件發生,計數器隨之遞減,而await方法等待計數器為0之前一直阻塞。它是基於AQS的共享鎖來實現的,其中使用了較多的AQS的方法,所以在這之前最好閱讀過AQS的源碼,不嫌棄也可以查看本人之前AQS的源碼分析,有些AQS方法沒有在之前分析過的這裏涉及到了會進行分析。

源碼

我們先看它的屬性和構造器,

    // Sync為其內部類
    private final Sync sync;

    // 唯一的一個構造器
    
// 構造參數count就是需要等待事件的數量 public CountDownLatch(int count) { // 為了保證count >= 0 if (count < 0) throw new IllegalArgumentException("count < 0"); // 構造sync this.sync = new Sync(count); }

現在來看內部類Sync,它繼承了AQS,實現了共享鎖方法,下面來看其源碼,代碼行數不多很好理解

    private static final class
Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; Sync(int count) { // setState 為AQS更改其state變量的方法 // 將AQS state變量設置成count setState(count); } int getCount() { // AQS的獲取state鎖狀態值
return getState(); } // 嘗試獲取共享鎖 protected int tryAcquireShared(int acquires) { // 返回1表示此時鎖狀態值為0表示鎖已釋放 // -1表示此時鎖狀態值大於0,表示出於鎖定狀態 return (getState() == 0) ? 1 : -1; } // 嘗試釋放共享鎖(計數器遞減releases次) protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero // 等待鎖狀態值為0或者更改鎖狀態值成功 for (;;) { // 將state賦值給變量c int c = getState(); if (c == 0) // 此時鎖已清除 return false; // 遞減 int nextc = c-1; // 比較state的狀態值是否等於C,等於將state狀態值改為nextc if (compareAndSetState(c, nextc)) // 更改成功後,如果nextc為0則返回true return nextc == 0; } } }

await方法

await方法就是當state狀態值不為0時將當前線程阻塞,然後等待喚醒

    public void await() throws InterruptedException {
        //調用的AQS獲取共享鎖可中斷方法
        sync.acquireSharedInterruptibly(1);
    }

我們來看看AQS的acquireSharedInterruptibly方法

    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        // 此方法調用的是CountDownLatch內部類Sync的方法
        // 如果鎖狀態不為0,則執行doAcquireSharedInterruptibly方法
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

doAcquireSharedInterruptibly方法也是由AQS實現的

    private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        // 添加一個共享鎖節點到隊列
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            // 直到線程被喚醒或者線程被中斷時跳出循環
            for (;;) {
                // node節點的前驅節點
                final Node p = node.predecessor();
                if (p == head) {
                    // 調用CountDownLatch內部類Sync的方法
                    // 如果鎖狀態值為0,則返回值大於0
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        // 當鎖狀態值為0,開始將note節點設置為頭節點並喚醒後繼節點
                        // 也就是隊列不斷的出列,然後喚醒後繼節點,後繼節點被喚醒後由於前驅節點被設置成頭節點,又會調用該方法進行後繼節點的喚醒
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }

                /*
                 shouldParkAfterFailedAcquire用於清除已中斷/或者取消的線程以及判斷此次循環是否需要掛起線程
                 parkAndCheckInterrupt 掛機當前線程
                 shouldParkAfterFailedAcquire 和 parkAndCheckInterrupt 在AQS之前博文裏分析過這裏就不再分析了
                 */
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                // 表示當前線程中斷,取消獲取鎖
                // 之前分析過,略過源碼分析
                cancelAcquire(node);
        }
    }

setHeadAndPropagate方法,主要作用是喚醒後繼節點線程

    private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; 
        // 當前節點設置為頭節點,節點關聯的線程設置為空
        setHead(node)
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
                // 節點等待狀態為signal時,喚醒後繼節點線程
                doReleaseShared();
        }
    }

doReleaseShared很巧妙,當當前節點等待狀態為signal時,喚醒後繼節點線程

    private void doReleaseShared() {
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                // 當前線程等待狀態為signal時表示後繼節點需要喚醒
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        // 表示h節點的狀態替換失敗,會再次循環判斷h節點的狀態
                        continue;            // loop to recheck cases
                    // 喚醒後繼節點
                    unparkSuccessor(h);
                }
                // 狀態為0時,將其改成PROPAGATE,更改失敗會再次循環判斷h節點的狀態
          // 這種情況發生在一個線程調用await方法,節點的等待狀態還是初始值0未來得及被修改,剛好state被置為0然後調用了doReleaseShared方法
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }

countDown方法

countDown方法遞減state值,當值為0時,依次喚醒等待的線程

    public void countDown() {
        // 遞減一次state值,知道state為0時喚醒等待中的線程
        sync.releaseShared(1);
    }
public final boolean releaseShared(int arg) { // 嘗試將state減去arg if (tryReleaseShared(arg)) { // state為0時喚醒線程 doReleaseShared(); return true; } return false; }

到此分析完畢。

總結

  1. 通過源碼知道CountDownLatch 不能像CyclicBarrier那樣使用完畢後還可以復用;
  2. CountDownLatch 是通過共享鎖來實現的,它的構造參數就是AQS state的值;
  3. 由於內部類繼承了AQS,所以它內部也是FIFO隊列,同時也一樣是前驅節點喚醒後繼節點。

【JDK源碼分析】深入源碼分析CountDownLatch