1. 程式人生 > >Java多執行緒等待所有執行緒結束(CountDownLatch/CyclicBarrier)

Java多執行緒等待所有執行緒結束(CountDownLatch/CyclicBarrier)

本文主要是參考官方文件做一學習用途。

官方連結:

http://docs.oracle.com/javase/1.5.0/docs/api/java/util/concurrent/CountDownLatch.html
http://docs.oracle.com/javase/1.5.0/docs/api/java/util/concurrent/CyclicBarrier.html 

多執行緒設計過程中,經常會遇到需要等待其它執行緒結束以後再做其他事情的情況,比如多執行緒下載檔案,每個執行緒都會下載檔案的一部分,在所有執行緒結束以後,需要將各部分再次拼接成一個完整的檔案。

有幾種方案:

1.在主執行緒中設定一自定義全域性計數標誌,在工作執行緒完成時,計數減一。主執行緒偵測該標誌是否為0,一旦為0,表示所有工作執行緒已經完成。

2.使用Java標準的類CountDownLatch來完成這項工作,原理是一樣的,計數。

CountDownLatch

CountDownLatch 初始化設定count,即等待(await)count個執行緒或一個執行緒count次計數,通過工作執行緒來countDown計數減一,直到計數為0,await阻塞結束。

設定的count不可更改,如需要動態設定計數的執行緒數,可以使用CyclicBarrier.

下面的例子,所有的工作執行緒中準備就緒以後,並不是直接執行,而是等待主執行緒的訊號後再執行具體的操作。

package com.example.multithread;  
  
import java.util.concurrent.CountDownLatch;  
  
class Driver  
{  
    private static final int TOTAL_THREADS = 10;  
    private final CountDownLatch mStartSignal = new CountDownLatch(1);  
    private final CountDownLatch mDoneSignal = new CountDownLatch(TOTAL_THREADS);  
  
    void main()  
    {  
        for (int i = 0; i < TOTAL_THREADS; i++)  
        {  
            new Thread(new Worker(mStartSignal, mDoneSignal, i)).start();  
        }  
        System.out.println("Main Thread Now:" + System.currentTimeMillis());  
        doPrepareWork();// 準備工作   
        mStartSignal.countDown();// 計數減一為0,工作執行緒真正啟動具體操作   
        doSomethingElse();//做點自己的事情   
        try  
        {  
            mDoneSignal.await();// 等待所有工作執行緒結束   
        }  
        catch (InterruptedException e)  
        {  
            // TODO Auto-generated catch block   
            e.printStackTrace();  
        }  
        System.out.println("All workers have finished now.");  
        System.out.println("Main Thread Now:" + System.currentTimeMillis());  
    }  
  
    void doPrepareWork()  
    {  
        System.out.println("Ready,GO!");  
    }  
  
    void doSomethingElse()  
    {  
        for (int i = 0; i < 100000; i++)  
        {  
            ;// delay   
        }  
        System.out.println("Main Thread Do something else.");  
    }  
}  
  
class Worker implements Runnable  
{  
    private final CountDownLatch mStartSignal;  
    private final CountDownLatch mDoneSignal;  
    private final int mThreadIndex;  
  
    Worker(final CountDownLatch startSignal, final CountDownLatch doneSignal,  
            final int threadIndex)  
    {  
        this.mDoneSignal = doneSignal;  
        this.mStartSignal = startSignal;  
        this.mThreadIndex = threadIndex;  
    }  
  
    @Override  
    public void run()  
    {  
        // TODO Auto-generated method stub   
        try  
        {  
            mStartSignal.await();// 阻塞,等待mStartSignal計數為0執行後面的程式碼   
                                              // 所有的工作執行緒都在等待同一個啟動的命令   
            doWork();// 具體操作   
            System.out.println("Thread " + mThreadIndex + " Done Now:"  
                    + System.currentTimeMillis());  
            mDoneSignal.countDown();// 完成以後計數減一   
        }  
        catch (InterruptedException e)  
        {  
            // TODO Auto-generated catch block   
            e.printStackTrace();  
        }  
    }  
  
    public void doWork()  
    {  
        for (int i = 0; i < 1000000; i++)  
        {  
            ;// 耗時操作   
        }  
        System.out.println("Thread " + mThreadIndex + ":do work");  
    }  
}  
  
public class CountDownLatchTest  
{  
    public static void main(String[] args)  
    {  
        // TODO Auto-generated method stub   
        new Driver().main();  
    }  
  
}  

通過Executor啟動執行緒:

class CountDownLatchDriver2  
{  
    private static final int TOTAL_THREADS = 10;  
    private final CountDownLatch mDoneSignal = new CountDownLatch(TOTAL_THREADS);  
  
    void main()  
    {  
        System.out.println("Main Thread Now:" + System.currentTimeMillis());  
        doPrepareWork();// 準備工作   
  
        Executor executor = Executors.newFixedThreadPool(TOTAL_THREADS);  
        for (int i = 0; i < TOTAL_THREADS; i++)  
        {  
            // 通過內建的執行緒池維護建立的執行緒   
            executor.execute(new RunnableWorker(mDoneSignal, i));  
        }  
        doSomethingElse();// 做點自己的事情   
        try  
        {  
            mDoneSignal.await();// 等待所有工作執行緒結束   
        }  
        catch (InterruptedException e)  
        {  
            // TODO Auto-generated catch block   
            e.printStackTrace();  
        }  
        System.out.println("All workers have finished now.");  
        System.out.println("Main Thread Now:" + System.currentTimeMillis());  
    }  
  
    void doPrepareWork()  
    {  
        System.out.println("Ready,GO!");  
    }  
  
    void doSomethingElse()  
    {  
        for (int i = 0; i < 100000; i++)  
        {  
            ;// delay   
        }  
        System.out.println("Main Thread Do something else.");  
    }  
}  
  
class RunnableWorker implements Runnable  
{  
  
    private final CountDownLatch mDoneSignal;  
    private final int mThreadIndex;  
  
    RunnableWorker(final CountDownLatch doneSignal, final int threadIndex)  
    {  
        this.mDoneSignal = doneSignal;  
        this.mThreadIndex = threadIndex;  
    }  
  
    @Override  
    public void run()  
    {  
        // TODO Auto-generated method stub   
  
        doWork();// 具體操作   
        System.out.println("Thread " + mThreadIndex + " Done Now:"  
                + System.currentTimeMillis());  
        mDoneSignal.countDown();// 完成以後計數減一   
                                                    // 計數為0時,主執行緒結束阻塞,繼續執行其他任務   
        try  
        {  
            // 可以繼續做點其他的事情,與主執行緒無關了   
            Thread.sleep(5000);  
            System.out.println("Thread " + mThreadIndex  
                    + " Do something else after notifing main thread");  
  
        }  
        catch (InterruptedException e)  
        {  
            // TODO Auto-generated catch block   
            e.printStackTrace();  
        }  
  
    }  
  
    public void doWork()  
    {  
        for (int i = 0; i < 1000000; i++)  
        {  
            ;// 耗時操作   
        }  
        System.out.println("Thread " + mThreadIndex + ":do work");  
    }  
}  

輸出: