1. 程式人生 > 實用技巧 >【Java 併發程式設計系列】【J.U.C】:CountDownLatch&CyclicBarrier&Semaphore

【Java 併發程式設計系列】【J.U.C】:CountDownLatch&CyclicBarrier&Semaphore

CountDownLatch

CountDownLatch 適用於需要在主執行緒中開啟多個執行緒去並行執行任務並且主執行緒需要等待所有子執行緒執行完後再進行彙總的場景。

使用示例

例項程式碼如下:

public class JoinCountDownLatch {

    // 建立一個CountDownLatch 例項
    private static volatile CountDownLatch countDownLunch = new CountDownLatch(2);
    
    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        
        executorService.submit(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(1000);
                } catch(InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    countDownLunch.countDown();
                }
                System.out.println("child threadOne over!");
            }
        });
        
        executorService.submit(new Runnable() {@
            Override
            public void run() {
                try {
                    Thread.sleep(1000);
                } catch(InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    countDownLunch.countDown();
                }
                System.out.println("child threadTwo over!");
            }
        });
        
        System.out.println("wait all child thread over!");
        
        // 等待子執行緒執行完畢
        countDownLunch.await();
        
        System.out.println("all child thread over!");
        
        executorService.shutdown();
    }
}

輸出如下:

wait all child thread over!
child threadOne over!
child threadTwo over!
all child thread over!

CountDownLatch 與join 方法的區別:

  1. 呼叫一個子執行緒的join() 方法後,該執行緒會一直被阻塞直到子執行緒執行完畢,而CountDownLatch 使用計數器來允許子執行緒執行完畢或者在執行中遞減計數,也就是CountDownLatch 可以在子執行緒執行的任何時候讓await 方法返回而不一定必須等到執行緒結束。
  2. 使用執行緒池來管理執行緒時一般都是直接新增Runable 到執行緒池,這時候就沒有辦法再呼叫執行緒的join 方法,CountDownLatch 相比join 對執行緒同步控制更加靈活。

實現原理

UML

從類圖可以看出,CountDownLatch 是使用AQS 實現的。通過下面的建構函式可知,實際上是把計數器的值賦給了AQS 的狀態變數state ,也就是使用AQS 狀態值來表示計數器值。

public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}

Sync(int count) {
    setState(count);
}

void await 方法

當執行緒呼叫CountDownLatch 物件的await 方法後,當前執行緒會被阻塞,直到下面的情況之一發生才會返回:

  • 當所有執行緒都呼叫了CountDownLatch 物件的 countDown 方法後,也就是計數器值為0 時
  • 其他執行緒呼叫了當前執行緒的interrupt() 方法中斷了當前執行緒,當前執行緒丟擲InterruptedException 異常返回
public void await() throws InterruptedException {
    // 呼叫AQS acquireSharedInterruptibly 方法,其內使用了模板方法,呼叫tryAcquireShared 實現
    sync.acquireSharedInterruptibly(1); 
}

// AQS
public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

// Sync
protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1; // 計數器是0直接返回,不是0則需要阻塞當前執行緒
}

boolean await(long timeout, TimeUnit unit) 方法

當前執行緒會被阻塞,直到下面的情況之一發生才會返回:

  • 當所有執行緒都呼叫了CountDownLatch 物件的 countDown 方法後,也就是計數器值為0 時,返回true
  • 設定的timeout 時間到了,超時返回false
  • 其他執行緒呼叫了當前執行緒的interrupt() 方法中斷了當前執行緒,當前執行緒丟擲InterruptedException 異常返回
public boolean await(long timeout, TimeUnit unit)
    throws InterruptedException {
    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

void countDown 方法

執行緒呼叫該方法後,計數器的值遞減,遞減後如果計數器值為0,則喚醒所有因呼叫await 法而被阻塞的執行緒,否則什麼都不做。

public void countDown() {
    sync.releaseShared(1);
}

// AQS
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        // AQS 釋放資源
        doReleaseShared();
        return true;
    }
    return false;
}

// Sync
protected boolean tryReleaseShared(int releases) {
    // 迴圈進行CAS,直到當前執行緒成功完成CAS使計數器值(狀態值state )減1 並更新到state
    for (;;) {
        int c = getState();
        if (c == 0) // 如果計數器為0直接返回
            return false;
        int nextc = c-1;
        if (compareAndSetState(c, nextc)) // CAS讓state減1
            return nextc == 0;
    }
}

long getCount 方法

獲取當前計數器的值,也就是AQS state 值,一般在測試時使用該方法。

public long getCount() {
    return sync.getCount();
}

迴環屏障 CyclicBarrier

由於CountDownLatch 計數器是一次性的,計數器值變為0 後,再呼叫await 和countDown 方法都會立刻返回。為了滿足計數器可以重置的需要,JDK 提供了CyclicBarrier,但CyclicBarrier 類的功能並不限於CountDownLatch 的功能。

從字面意思理解,CyclicBarrier 是迴環屏障的意思,CyclicBarrier 可以讓一組執行緒全部達到一個狀態後再全部同時執行,之所以叫作迴環是因為當所有等待執行緒執行完畢,並重置CyclicBarrier 的狀態後它可以被重用。之所以叫作屏障是因為執行緒呼叫await 方法後就會被阻塞,這個阻塞點就稱為屏障點,等所有執行緒都呼叫了await 方法後,執行緒們就會衝破屏障,繼續向下執行。

使用示例

public class CyclicBarrierTest {

    private static CyclicBarrier cyclicBarrier = new CyclicBarrier(2, new Runnable() {
        @Override
        public void run() {
            System.out.println(Thread.currentThread() + " task merge result");
        }
    });
    
    public static void main(String[] args) {
    
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        
        executorService.execute(() - > {
            System.out.println(Thread.currentThread() + " task-1 start");
            System.out.println(Thread.currentThread() + " task-1 enter in barrier");
            try {
                cyclicBarrier.await();
            } catch(InterruptedException | BrokenBarrierException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread() + " task-1 enter out barrier");
        });
        
        executorService.execute(() - > {
            System.out.println(Thread.currentThread() + " task-2 start");
            System.out.println(Thread.currentThread() + " task-2 enter in barrier");
            try {
                cyclicBarrier.await();
            } catch(InterruptedException | BrokenBarrierException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread() + " task-2 enter out barrier");
        });
        
        executorService.shutdown();
    }
}

輸出結果:

Thread[pool-1-thread-1,5,main] task-1 start
Thread[pool-1-thread-1,5,main] task-1 enter in barrier
Thread[pool-1-thread-2,5,main] task-2 start
Thread[pool-1-thread-2,5,main] task-2 enter in barrier
Thread[pool-1-thread-2,5,main] task merge result
Thread[pool-1-thread-2,5,main] task-2 enter out barrier
Thread[pool-1-thread-1,5,main] task-1 enter out barrier

實現原理

UML

由類圖可知,CyclicBarrier 基於獨佔鎖實現,其底層還是基於AQS 的。parties 用來記錄執行緒個數,這裡表示多少執行緒呼叫await 後,所有執行緒才會衝破屏障。而count 一開始等於parties ,每當有執行緒呼叫await 就減1 ,當count 為0 表示所有執行緒都到了屏障點。

barrierCommand 任務,這個任務的執行時機是當所有執行緒都到達屏障點後。

在變數 generation 部有一 變數 broken ,其用來記錄當前屏障是否被打破。

int await 方法

當前執行緒呼叫CyclicBarrier 該方法後會被阻塞,直到滿足下面條件之一才會返回:

  • parties 個執行緒都呼叫了await 方法,也就是執行緒都達到了屏障點
  • 其他執行緒呼叫了當前執行緒的interrupt 方法中斷了當前執行緒,則當前執行緒會丟擲InterruptedException 異常而返回
  • 與當前屏障點關聯的Generation 物件的broken 標誌被設定true 時,會拋BrokenBarrierException 異常後返回
public int await() throws InterruptedException, BrokenBarrierException {
    try {
        return dowait(false, 0L);
    } catch (TimeoutException toe) {
        throw new Error(toe); // cannot happen
    }
}

boolean await(long timeout, TimeUnit unit) 方法

當前執行緒呼叫CyclicBarrier 該方法後會被阻塞,直到滿足下面條件之一才會返回:

  • parties 個執行緒都呼叫了await 方法,也就是執行緒都達到了屏障點,返回true
  • 設定的超時時間到了後返回false
  • 其他執行緒呼叫了當前執行緒的interrupt 方法中斷了當前執行緒,則當前執行緒會丟擲InterruptedException 異常而返回
  • 與當前屏障點關聯的Generation 物件的broken 標誌被設定true 時,會拋BrokenBarrierException 異常後返回
public int await(long timeout, TimeUnit unit)
    throws InterruptedException,
           BrokenBarrierException,
           TimeoutException {
    return dowait(true, unit.toNanos(timeout));
}

int dowait(boolean timed, long nanos) 方法

該方法實現了CyclicBarrer 的核心功能,其程式碼如下:

private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
           TimeoutException {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        final Generation g = generation;

        if (g.broken)
            throw new BrokenBarrierException();

        if (Thread.interrupted()) {
            breakBarrier();
            throw new InterruptedException();
        }

        int index = --count;
        if (index == 0) {  // index==0說明所有執行緒都到了屏障點,此時執行初始化時傳遞的任務
            boolean ranAction = false;
            try {
                final Runnable command = barrierCommand;
                if (command != null)
                    command.run(); // 執行任務
                ranAction = true;
                nextGeneration(); // 啟用其他因呼叫await方法而阻塞的執行緒,並重置CyclicBarrier
                return 0;
            } finally {
                if (!ranAction)
                    breakBarrier();
            }
        }

        // loop until tripped, broken, interrupted, or timed out
        // index != 0
        for (;;) {
            try {
                if (!timed) // 沒有設定超時時間
                    trip.await();
                else if (nanos > 0L) // 設定了超時時間
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                if (g == generation && ! g.broken) {
                    breakBarrier();
                    throw ie;
                } else {
                    // We're about to finish waiting even if we had not
                    // been interrupted, so this interrupt is deemed to
                    // "belong" to subsequent execution.
                    Thread.currentThread().interrupt();
                }
            }

            if (g.broken)
                throw new BrokenBarrierException();

            if (g != generation)
                return index;

            if (timed && nanos <= 0L) {
                breakBarrier();
                throw new TimeoutException();
            }
        }
    } finally {
        lock.unlock();
    }
}

private void nextGeneration() {
    // signal completion of last generation
    trip.signalAll(); // 喚醒條件佇列中的阻塞佇列
    // set up next generation
    count = parties; // 重置CyclicBarrier
    generation = new Generation();
}

訊號量 Semaphore

Semaphore 訊號量也是Java 中的一個同步器,與CountDownLatch 和CycleBarrier 不同的是,它內部的計數器是遞增的,並且在一開始初始化Semaphore 時可以指定一個初始值,但是並不需要知道需要同步的執行緒個數,而是在需要同步的地方呼叫acquire 方法時指定需要同步的執行緒個數。

使用示例

在主執行緒中開啟兩個子執行緒讓它們執行,等所有子執行緒執行完畢後主執行緒再繼續向下執行。

public class SemaphoreTest {

    // 建立訊號量示例,引數0 表示當前訊號量計數器值為0
    private static Semaphore semaphore = new Semaphore(0);
    
    public static void main(String[] args) throws InterruptedException {
        
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        
        executorService.execute(() - > {
            System.out.println(Thread.currentThread() + " over");
            semaphore.release(); // 訊號量計數器加1
        });
        
        executorService.execute(() - > {
            System.out.println(Thread.currentThread() + " over");
            semaphore.release();
        });
        
        // 阻塞直到訊號量計數為2
        semaphore.acquire(2);
        
        System.out.println("all child thread over");
        
        executorService.shutdown();
    }
}

輸出結果:

Thread[pool-1-thread-1,5,main] over
Thread[pool-1-thread-2,5,main] over
all child thread over

實現原理

UML

由類圖可知,Semaphor 還是使用AQS 實現的。 Sync 只是對AQS 的一個修飾,並且Sync 有兩個實現類,用來指定獲取訊號量時是否採用公平策略。例如,下面的程式碼在建立Semaphore 時會使用一個變數指定是否使用非公平策略。

public Semaphore(int permits) {
    sync = new NonfairSync(permits);
}

public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

Sync(int permits) {
    setState(permits);
}

在如上程式碼中Semaphore 預設採用非公平策略,如果需要使用公平策略則可以使用帶兩個引數的建構函式來構造Semaphore 物件。另外,如CountDownLatch 建構函式傳遞的初始化訊號量permits 被賦給了AQS state 狀態變數一樣,這裡AQS state 表示當前持有的訊號量個數。

void acquire 方法

當前執行緒呼叫該方法的目的是希望獲取一個訊號量資源。 如果當前訊號量個數大於0,則訊號量的個數會減1,然後該方法直接返回。否則如果當前訊號量個數等於0 ,則當前執行緒會被放入AQS 的阻塞佇列。當其他執行緒呼叫了當前執行緒interrupt 方法中斷了當前執行緒時,則當前執行緒會丟擲InterruptedEception 異常返回。

public void acquire() throws InterruptedException {
	sync.acquireSharedInterruptibly(1); // AQS 內部呼叫tryAcquireShared
}

// 非公平策略 NonfairSync
protected int tryAcquireShared(int acquires) {
	return nonfairTryAcquireShared(acquires);
}

final int nonfairTryAcquireShared(int acquires) {
    for (;;) {
        int available = getState(); // 當前訊號量值
        int remaining = available - acquires; // 剩餘值
        if (remaining < 0 ||
            compareAndSetState(available, remaining)) // 如果當前剩餘值小於0或者CAS設定成功則返回
            return remaining;
    }
}

// 公平策略 FairSync
protected int tryAcquireShared(int acquires) {
    for (;;) {
        if (hasQueuedPredecessors()) // 公平策略,看當前執行緒節點的前驅節點是否也在等待獲取此資源,如果是則當前執行緒會被放到AQS阻塞佇列,否則直接獲取
            return -1;
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}

void acquire(int permits) 方法

該方法與acquire方法不同,後者只需要獲取一個訊號量值,而前者則獲取permits 個。

public void acquire(int permits) throws InterruptedException {
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireSharedInterruptibly(permits);
}

void acquireUninterruptibly() 方法

該方法與acquire 方法相似,不同之處在於該方法對中斷不響應。

public void acquireUninterruptibly() {
    sync.acquireShared(1);
}

void acquireUninterruptibly(int permits) 方法

該方法與acquire(int permits) 方法的不同之處在於該方法對中斷不響應。

public void acquireUninterruptibly(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireShared(permits);
}

void release() 方法

該方法的作用是把當前Semaphore 訊號量值增加1 ,如果當前有執行緒因為呼叫aquire 方法被阻塞而被放入了AQS 阻塞佇列,則會根據公平策略選擇一個訊號量個數能被滿足的執行緒進行啟用,啟用的執行緒會嘗試獲取剛增加的訊號。

public void release() {
    sync.releaseShared(1);
}

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) { // 嘗試釋放資源
        doReleaseShared(); // 資源釋放後呼叫park方法喚醒AQS佇列中最先掛起的執行緒
        return true;
    }
    return false;
}

protected final boolean tryReleaseShared(int releases) {
    for (;;) {
        int current = getState(); // 當前訊號量值
        int next = current + releases; // 當前訊號量+1
        if (next < current) // overflow
            throw new Error("Maximum permit count exceeded");
        if (compareAndSetState(current, next)) // CAS修改訊號量值
            return true;
    }
}

void release(int permits) 方法

該方法與不帶引數的release 方法的不同之處在於,前者每次呼叫會在原訊號量值的基礎上增加 permit ,而後者每次增加1 。

public void release(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    sync.releaseShared(permits);
}