1. 程式人生 > 其它 >併發程式設計 — CountDownLatch 詳解

併發程式設計 — CountDownLatch 詳解

技術標籤:併發程式設計CountDownLatch併發程式設計多程序

一、概述

類 CountDownLatch 是一個同步功能的輔助類,使用效果是給定一個計數,當使用這個CountDownLatch類的執行緒判斷計數不為0時,則呈wait狀態,如果為0時則繼續執行。實現等待與繼續執行的效果分別需要使用await()和countDown()方法來進行。呼叫await()方法時判斷計數是否為0,如果不為0則呈等待狀態。其他執行緒可以呼叫countDown()方法將計數減1,當計數減到為0時,呈等待的執行緒繼續執行。而方法getCount()就是獲得當前的計數個數。

二、使用場景

1、某一執行緒在開始執行前等待n個執行緒執行完畢

比如實現一個機票比價場景,需要呼叫各個航空公司的機票價格,然後進行價格排序,就可以通過 CountDownLatch 來實現,程式碼如下所示:


public class CountDownLatchExample1 {

    public static void main(String[] args) {
        List<String> airs = Arrays.asList("東方航空", "南方航空", "成都航空", "北京航空");

        //定義一個CountDownLatch物件,指定數量為航空公司的個數
        CountDownLatch latch = new CountDownLatch(airs.size());

        List<Pair<String, Integer>> list = new ArrayList<>();

        //定義四個執行緒模擬呼叫 航空公司的外部介面
        Thread[] threads = new Thread[airs.size()];
        for(int i = 0; i < airs.size(); i++){
            threads[i] = new Thread(() -> {
                try {
                    // 模擬呼叫耗時
                    TimeUnit.SECONDS.sleep(current().nextInt(10));
                    //構建價格
                    Pair<String, Integer> pair = new Pair<>(Thread.currentThread().getName(), current().nextInt(100));
                    list.add(pair);
                    System.out.println(pair.toString());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    latch.countDown(); //執行完後 是計數器減1
                }
            }, airs.get(i));
        }
        //啟動所有的執行緒
        Stream.of(threads).forEach(Thread::start);
        System.out.println("等待所有的執行緒執行完");
        try {
            latch.await(); // 在此等待
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        list.sort((o1, o2) -> {
            if(o1.getValue() > o2.getValue()){
               return 1;
            }else if (o1.getValue().equals(o2.getValue())){
                return 0;
            }else {
                return -1;
            }
        });

        System.out.println("========最終結果=======");
        list.forEach(System.out::println);
    }


}

執行結果:

2、實現多個執行緒開始執行任務的最大並行性


public class CountDownLatchExample2 {
    //裁判類
    static class Referee{
        private CountDownLatch downLatch = new CountDownLatch(1);
        private CountDownLatch latch;

         Referee(int count){
            latch = new CountDownLatch(count);
        }

        // 各就位
         void prepare()  {
            try {
                System.out.printf("執行緒[%s]準備就緒 \n", Thread.currentThread().getName());
                latch.countDown(); //運動員就位
                downLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.printf("執行緒[%s]結束 \n", Thread.currentThread().getName());
        }
        //開始執行
        public void start(){
            try {
                latch.await(); //等待運動員就位
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("開始");
            downLatch.countDown();
        }
    }

    // 運動員類
    static class SportsMan extends  Thread{
        private Referee referee;

        SportsMan(Referee referee) {
            this.referee = referee;
        }

        @Override
        public void run() {
            referee.prepare(); // 運動員準備
        }
    }


    public static void main(String[] args) {
        //定義裁判指定10個運動員
        Referee referee = new Referee(10);
        //定義 10 個運動員
        SportsMan[] mans = new SportsMan[10];
        for (int i = 0; i < 10; i++){
            mans[i] = new SportsMan(referee);
            mans[i].setName("Thread-" + i);
        }
        Stream.of(mans).forEach(Thread::start);
        referee.start();
    }
}

執行結果:

三、原始碼解析

1、類結構

2、Sync 一個靜態內部類

//構造方法 CountDownLatch 的構造方法最終呼叫的是 Sync的構造。
    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

        Sync(int count) {
            setState(count); //初始化計數器
        }

        // 獲取當前計數器
        int getCount() {
            return getState();
        }

        // 試圖在共享模式下獲取物件狀態
        // 申請資源 如果 數量為 0,則不進行阻塞, 否則進入阻塞
        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

       // 試圖設定狀態來反映共享模式下的一個釋放
        // 減少計數器數量,直到數量為0,
        protected boolean tryReleaseShared(int releases) {

             // 死迴圈保證最終這個狀態值能設定成功
            for (;;) {
                // 獲取當前狀態值,CountDownLatch中的鎖存器計數
                int c = getState();
                // 如果狀態為0,表示CountDownLatch中的鎖存器計數為0,就直接返回
                if (c == 0)
                    return false;
                // 如果狀態不為0,則將狀態減一
                int nextc = c-1;
                /*
                 * 更新state值,即鎖存器計數值,通過CAS保證執行緒安全
                 * CAS操作有3個運算元,記憶體值M,預期值E,新值U,如果M==E,則將記憶體值修改為B,否則啥都不做
                 * compareAndSetState(c, nextc)方法:
                 *      c:表示預期值
                 *       nextc:要更新的值
                 *   在此處,表示c==getState()時返回true,並更新state值為nextc
                 *   如果c!=getState()時,表示已經有其他執行緒更新了state值,
                 *   所以這裡不進行更新,直接返回false,通過死迴圈再重新獲取最新的getState()值
                 */
                if (compareAndSetState(c, nextc))
                    // 返回狀態是否為0判斷,為0表示鎖存器計數為0,可以喚醒await的程序了
                    // 這裡喚醒程序的操作也是通過AQS進行實現的
                    return nextc == 0;
            }
        }
    }

從原始碼可知 CountDownLatch 內部是通過 AQS 的共享模式實現的

3、await() 和 await(long timeout, TimeUnit unit) 方法

public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

此函式將會使當前執行緒在鎖存器倒計數至零之前一直等待,除非執行緒被中斷。

public boolean await(long timeout, TimeUnit unit)
        throws InterruptedException {
     return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

此函式將會使當前執行緒在鎖存器倒計數至零之前一直等待,除非超時或者被中斷。

4、countDown() 方法

此函式將遞減鎖存器的計數,如果計數到達零,則釋放所有等待的執行緒 

 /**   
     * count值減 1,直到計數達到零,釋放所有等待的執行緒 。
     *      
     *  <p>如果當前計數大於零,則遞減。
     *   如果新計數為零,則重新啟用所有等待的執行緒 ,達到執行緒排程的目的。
     *      
     * <p>如果當前計數等於零,則沒有任何反應。
     */    
public void countDown() {
    sync.releaseShared(1);
}

注意:

  1. 初始化CountDownLatch 時的計數器必須大於0,只有當計數器等於 0 的時候,呼叫 await() 方法是不會阻塞。
  2. 任務的結束並不一定代表著正常的結束,有可能是在運算的過程中出現錯誤,因此為了能夠正確地執行countDown(),需要將該方法的呼叫放在finally程式碼塊中,否則就會出現主執行緒(任務)await()方法永遠不會退出阻塞的問題。