1. 程式人生 > >CountDownLatch實現原理

CountDownLatch實現原理

CountDownLatch用過很多次了,突然有點好奇,它是如何實現阻塞執行緒的,猜想是否跟LockSupport有關。今天瀏覽了一下它的原始碼,發現其實現是十分簡單的,只是簡單的繼承了AbstractQueuedSynchronizer,便實現了其功能。

實現原理:讓需要的暫時阻塞的執行緒,進入一個死迴圈裡面,得到某個條件後再退出迴圈,以此實現阻塞當前執行緒的效果。

下面從構造方法開始,一步步解釋實現的原理:

一、構造方法

下面是實現的原始碼,非常簡短,主要是建立了一個Sync物件。

public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }
二、Sync物件
 private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

        Sync(int count) {
            setState(count);
        }

        int getCount() {
            return getState();
        }

        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }

假設我們是這樣建立的:new CountDownLatch(5)。其實也就相當於new Sync(5),相當於setState(5)。setState我們可以暫時理解為設定一個計數器,當前計數器初始值為5。

tryAcquireShared方法其實就是判斷一下當前計數器的值,是否為0了,如果為0的話返回1(返回1的時候,就表明當前執行緒可以繼續往下走了,不再停留在呼叫countDownLatch.await()這個方法的地方)。

tryReleaseShared方法就是利用CAS的方式,對計數器進行減一的操作,而我們實際上每次呼叫countDownLatch.countDown()方法的時候,最終都會調到這個方法,對計數器進行減一操作,一直減到0為止。

稍微跑偏了一點,我們看看呼叫countDownLatch.await()的時候,做了些什麼。

三、countDownLatch.await()

 public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
程式碼很簡單,就一句話(注意acquireSharedInterruptibly()方法是抽象類:AbstractQueuedSynchronizer的一個方法,我們上面提到的Sync繼承了它),我們跟蹤原始碼,繼續往下看:

四、acquireSharedInterruptibly(int arg)

 public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

原始碼也是非常簡單的,首先判斷了一下,當前執行緒是否有被中斷,如果沒有的話,就呼叫tryAcquireShared(int acquires)方法,判斷一下當前執行緒是否還需要“阻塞”。其實這裡呼叫的tryAcquireShared方法,就是我們上面提到的java.util.concurrent.CountDownLatch.Sync.tryAcquireShared(int)這個方法。

當然,在一開始我們沒有呼叫過countDownLatch.countDown()方法時,這裡tryAcquireShared方法肯定是會返回1的,因為會進入到doAcquireSharedInterruptibly方法。

五、doAcquireSharedInterruptibly(int arg)


這個時候,我們應該對於countDownLatch.await()方法是怎麼“阻塞”當前執行緒的,已經非常明白了。其實說白了,就是當你呼叫了countDownLatch.await()方法後,你當前執行緒就會進入了一個死迴圈當中,在這個死迴圈裡面,會不斷的進行判斷,通過呼叫tryAcquireShared方法,不斷判斷我們上面說的那個計數器,看看它的值是否為0了(為0的時候,其實就是我們呼叫了足夠多次數的countDownLatch.countDown()方法的時候),如果是為0的話,tryAcquireShared就會返回1,程式碼也會進入到圖中的紅框部分,然後跳出了迴圈,也就不再“阻塞”當前執行緒了。需要注意的是,說是在不停的迴圈,其實也並非在不停的執行for迴圈裡面的內容,因為在後面呼叫parkAndCheckInterrupt()方法時,在這個方法裡面是會呼叫 LockSupport.park(this);,來禁用當前執行緒的。

五、關於AbstractQueuedSynchronizer

看到這裡,如果各位對AbstractQueuedSynchronizer沒有了解過的話,可能程式碼還是看得有點迷糊,這是一篇我覺得解釋得非常好的文章,大家可以看一下:http://ifeve.com/introduce-abstractqueuedsynchronizer/