1. 程式人生 > >java中等待所有線程都執行結束

java中等待所有線程都執行結束

main 線程池 靈活 問題 method timeunit 自動調用 trac block

轉自:http://blog.csdn.net/liweisnake/article/details/12966761

今天看到一篇文章,是關於java中如何等待所有線程都執行結束,文章總結得很好,原文如下http://software.intel.com/zh-cn/blogs/2013/10/15/java-countdownlatchcyclicbarrier/?utm_campaign=CSDN&utm_source=intel.csdn.net&utm_medium=Link&utm_content=others-%20Java

看過之後在想java中有很大的靈活性,應該有更多的方式可以做這件事。

這個事情的場景是這樣的:許多線程並行的計算一堆問題,然後每個計算存在一個隊列,在主線程要等待所有計算結果完成後排序並展示出來。這樣的問題其實很常見。

1. 使用join。這種方式其實並不是那麽的優雅,將所有線程啟動完之後還需要將所有線程都join,但是每次join都會阻塞,直到被join線程完成,很可能所有被阻塞線程已經完事了,主線程還在不斷地join,貌似有點浪費,而且兩個循環也不太好看。

  1.  1 public void testThreadSync1() {  
     2   
     3     final Vector<Integer> list = new Vector<Integer>();  
    
    4 Thread[] threads = new Thread[TEST_THREAD_COUNT]; 5 try { 6 for (int i = 0; i < TEST_THREAD_COUNT; i++) { 7 final int num = i; 8 threads[i] = new Thread(new Runnable() { 9 public void run() { 10 try {
    11 Thread.sleep(random.nextInt(100)); 12 } catch (InterruptedException e) { 13 e.printStackTrace(); 14 } 15 list.add(num); 16 System.out.print(num + " add.\t"); 17 } 18 }); 19 threads[i].start(); 20 } 21 for (int i = 0; i < threads.length; i++) { 22 threads[i].join(); 23 System.out.print(i + " end.\t"); 24 } 25 } catch (InterruptedException ie) { 26 ie.printStackTrace(); 27 } 28 printSortedResult(list); 29 }

  1. 1 9 add.  7 add.  3 add.  5 add.  4 add.  1 add.  0 add.  0 end.  1 end.  8 add.  2 add.  2 end.  3 end.  4 end.  5 end.  6 add.  6 end.  7 end.  8 end.  9 end.    
    2 before sort  
    3 9   7   3   5   4   1   0   8   2   6     
    4 after sort  
    5 0   1   2   3   4   5   6   7   8   9  

2. 使用wait/notifyAll,這個方式其實跟上面是類似的,只是比較底層些吧(join實際上也是wait)。

  1.  1 @Test  
     2 public void testThreadSync2() throws IOException, InterruptedException {  
     3     final Object waitObject = new Object();  
     4     final AtomicInteger count = new AtomicInteger(TEST_THREAD_COUNT);  
     5     final Vector<Integer> list = new Vector<Integer>();  
     6     Thread[] threads = new Thread[TEST_THREAD_COUNT];  
     7     for (int i = 0; i < TEST_THREAD_COUNT; i++) {  
     8         final int num = i;  
     9         threads[i] = new Thread(new Runnable() {  
    10             public void run() {  
    11                 try {  
    12                     Thread.sleep(random.nextInt(100));  
    13                 } catch (InterruptedException e) {  
    14                     e.printStackTrace();  
    15                 }  
    16                 list.add(num);  
    17                 System.out.print(num + " add.\t");  
    18                 synchronized (waitObject) {  
    19                     int cnt = count.decrementAndGet();  
    20                     if (cnt == 0) {  
    21                         waitObject.notifyAll();  
    22                     }  
    23                 }  
    24             }  
    25         });  
    26         threads[i].start();  
    27     }  
    28     synchronized (waitObject) {  
    29         while (count.get() != 0) {  
    30             waitObject.wait();  
    31         }  
    32     }  
    33     printSortedResult(list);  
    34 }  

3. 使用CountDownLatch,這其實是最優雅的寫法了,每個線程完成後都去將計數器減一,最後完成時再來喚醒。

例1

  1.  1 @Test  
     2 public void testThreadSync3() {  
     3     final Vector<Integer> list = new Vector<Integer>();  
     4     Thread[] threads = new Thread[TEST_THREAD_COUNT];  
     5     final CountDownLatch latch = new CountDownLatch(TEST_THREAD_COUNT);  
     6     for (int i = 0; i < TEST_THREAD_COUNT; i++) {  
     7         final int num = i;  
     8         threads[i] = new Thread(new Runnable() {  
     9             public void run() {  
    10                 try {  
    11                     Thread.sleep(random.nextInt(100));  
    12                 } catch (InterruptedException e) {  
    13                     e.printStackTrace();  
    14                 }  
    15                 list.add(num);  
    16                 System.out.print(num + " add.\t");  
    17                 latch.countDown();  
    18             }  
    19         });  
    20         threads[i].start();  
    21     }  
    22     try {  
    23         latch.await();  
    24     } catch (InterruptedException e) {  
    25         e.printStackTrace();  
    26     }  
    27     printSortedResult(list);  
    28 }  

例2

CountDownLatch 初始化設置count,即等待(await)count個線程或一個線程count次計數,通過工作線程來countDown計數減一,直到計數為0,await阻塞結束。

設置的count不可更改,如需要動態設置計數的線程數,可以使用CyclicBarrier.

下面的例子,所有的工作線程中準備就緒以後,並不是直接運行,而是等待主線程的信號後再執行具體的操作。

  1.   1 package com.example.multithread;  
      2   
      3 import java.util.concurrent.CountDownLatch;  
      4   
      5 class Driver  
      6 {  
      7     private static final int TOTAL_THREADS = 10;  
      8     private final CountDownLatch mStartSignal = new CountDownLatch(1);  
      9     private final CountDownLatch mDoneSignal = new CountDownLatch(TOTAL_THREADS);  
     10   
     11     void main()  
     12     {  
     13         for (int i = 0; i < TOTAL_THREADS; i++)  
     14         {  
     15             new Thread(new Worker(mStartSignal, mDoneSignal, i)).start();  
     16         }  
     17         System.out.println("Main Thread Now:" + System.currentTimeMillis());  
     18         doPrepareWork();// 準備工作   
     19         mStartSignal.countDown();// 計數減一為0,工作線程真正啟動具體操作   
     20         doSomethingElse();//做點自己的事情   
     21         try  
     22         {  
     23             mDoneSignal.await();// 等待所有工作線程結束   
     24         }  
     25         catch (InterruptedException e)  
     26         {  
     27             // TODO Auto-generated catch block   
     28             e.printStackTrace();  
     29         }  
     30         System.out.println("All workers have finished now.");  
     31         System.out.println("Main Thread Now:" + System.currentTimeMillis());  
     32     }  
     33   
     34     void doPrepareWork()  
     35     {  
     36         System.out.println("Ready,GO!");  
     37     }  
     38   
     39     void doSomethingElse()  
     40     {  
     41         for (int i = 0; i < 100000; i++)  
     42         {  
     43             ;// delay   
     44         }  
     45         System.out.println("Main Thread Do something else.");  
     46     }  
     47 }  
     48   
     49 class Worker implements Runnable  
     50 {  
     51     private final CountDownLatch mStartSignal;  
     52     private final CountDownLatch mDoneSignal;  
     53     private final int mThreadIndex;  
     54   
     55     Worker(final CountDownLatch startSignal, final CountDownLatch doneSignal,  
     56             final int threadIndex)  
     57     {  
     58         this.mDoneSignal = doneSignal;  
     59         this.mStartSignal = startSignal;  
     60         this.mThreadIndex = threadIndex;  
     61     }  
     62   
     63     @Override  
     64     public void run()  
     65     {  
     66         // TODO Auto-generated method stub   
     67         try  
     68         {  
     69             mStartSignal.await();// 阻塞,等待mStartSignal計數為0運行後面的代碼   
     70                                     // 所有的工作線程都在等待同一個啟動的命令   
     71             doWork();// 具體操作   
     72             System.out.println("Thread " + mThreadIndex + " Done Now:"  
     73                     + System.currentTimeMillis());  
     74             mDoneSignal.countDown();// 完成以後計數減一   
     75         }  
     76         catch (InterruptedException e)  
     77         {  
     78             // TODO Auto-generated catch block   
     79             e.printStackTrace();  
     80         }  
     81     }  
     82   
     83     public void doWork()  
     84     {  
     85         for (int i = 0; i < 1000000; i++)  
     86         {  
     87             ;// 耗時操作   
     88         }  
     89         System.out.println("Thread " + mThreadIndex + ":do work");  
     90     }  
     91 }  
     92   
     93 public class CountDownLatchTest  
     94 {  
     95     public static void main(String[] args)  
     96     {  
     97         // TODO Auto-generated method stub   
     98         new Driver().main();  
     99     }  
    100   
    101 }  

    通過Executor啟動線程:

    1.  1 class CountDownLatchDriver2  
       2 {  
       3     private static final int TOTAL_THREADS = 10;  
       4     private final CountDownLatch mDoneSignal = new CountDownLatch(TOTAL_THREADS);  
       5  
       6 
       7   
       8     void main()  
       9     {  
      10         System.out.println("Main Thread Now:" + System.currentTimeMillis());  
      11         doPrepareWork();// 準備工作   
      12   
      13         Executor executor = Executors.newFixedThreadPool(TOTAL_THREADS);  
      14         for (int i = 0; i < TOTAL_THREADS; i++)  
      15         {  
      16             // 通過內建的線程池維護創建的線程   
      17             executor.execute(new RunnableWorker(mDoneSignal, i));  
      18         }  
      19         doSomethingElse();// 做點自己的事情   
      20         try  
      21         {  
      22             mDoneSignal.await();// 等待所有工作線程結束   
      23         }  
      24         catch (InterruptedException e)  
      25         {  
      26             // TODO Auto-generated catch block   
      27             e.printStackTrace();  
      28         }  
      29         System.out.println("All workers have finished now.");  
      30         System.out.println("Main Thread Now:" + System.currentTimeMillis());  
      31     }  
      32   
      33     void doPrepareWork()  
      34     {  
      35         System.out.println("Ready,GO!");  
      36     }  
      37   
      38     void doSomethingElse()  
      39     {  
      40         for (int i = 0; i < 100000; i++)  
      41         {  
      42             ;// delay   
      43         }  
      44         System.out.println("Main Thread Do something else.");  
      45     }  
      46 }  
      47   
      48 class RunnableWorker implements Runnable  
      49 {  
      50   
      51     private final CountDownLatch mDoneSignal;  
      52     private final int mThreadIndex;  
      53   
      54     RunnableWorker(final CountDownLatch doneSignal, final int threadIndex)  
      55     {  
      56         this.mDoneSignal = doneSignal;  
      57         this.mThreadIndex = threadIndex;  
      58     }  
      59   
      60     @Override  
      61     public void run()  
      62     {  
      63         // TODO Auto-generated method stub   
      64   
      65         doWork();// 具體操作   
      66         System.out.println("Thread " + mThreadIndex + " Done Now:"  
      67                 + System.currentTimeMillis());  
      68         mDoneSignal.countDown();// 完成以後計數減一   
      69                                 // 計數為0時,主線程接觸阻塞,繼續執行其他任務   
      70         try  
      71         {  
      72             // 可以繼續做點其他的事情,與主線程無關了   
      73             Thread.sleep(5000);  
      74             System.out.println("Thread " + mThreadIndex  
      75                     + " Do something else after notifing main thread");  
      76   
      77         }  
      78         catch (InterruptedException e)  
      79         {  
      80             // TODO Auto-generated catch block   
      81             e.printStackTrace();  
      82         }  
      83   
      84     }  
      85   
      86     public void doWork()  
      87     {  
      88         for (int i = 0; i < 1000000; i++)  
      89         {  
      90             ;// 耗時操作   
      91         }  
      92         System.out.println("Thread " + mThreadIndex + ":do work");  
      93     }  
      94 }  

      輸出:

       1 Main Thread Now:1359959480786
       2 Ready,GO!
       3 Thread 0:do work
       4 Thread 0 Done Now:1359959480808
       5 Thread 1:do work
       6 Thread 1 Done Now:1359959480811
       7 Thread 2:do work
       8 Thread 2 Done Now:1359959480813
       9 Main Thread Do something else.
      10 Thread 3:do work
      11 Thread 3 Done Now:1359959480825
      12 Thread 5:do work
      13 Thread 5 Done Now:1359959480827
      14 Thread 7:do work
      15 Thread 7 Done Now:1359959480829
      16 Thread 9:do work
      17 Thread 9 Done Now:1359959480831
      18 Thread 4:do work
      19 Thread 4 Done Now:1359959480833
      20 Thread 6:do work
      21 Thread 6 Done Now:1359959480835
      22 Thread 8:do work
      23 Thread 8 Done Now:1359959480837
      24 All workers have finished now.
      25 Main Thread Now:1359959480838
      26 Thread 0 Do something else after notifing main thread
      27 Thread 1 Do something else after notifing main thread
      28 Thread 2 Do something else after notifing main thread
      29 Thread 3 Do something else after notifing main thread
      30 Thread 9 Do something else after notifing main thread
      31 Thread 7 Do something else after notifing main thread
      32 Thread 5 Do something else after notifing main thread
      33 Thread 4 Do something else after notifing main thread
      34 Thread 6 Do something else after notifing main thread
      35 Thread 8 Do something else after notifing main thread

4. 使用CyclicBarrier。這裏其實類似上面,這個berrier只是在等待完成後自動調用傳入CyclicBarrier的Runnable。

例1

  1.  1 @Test  
     2 public void testThreadSync4() throws IOException {  
     3     final Vector<Integer> list = new Vector<Integer>();  
     4     Thread[] threads = new Thread[TEST_THREAD_COUNT];  
     5     final CyclicBarrier barrier = new CyclicBarrier(TEST_THREAD_COUNT,  
     6             new Runnable() {  
     7                 public void run() {  
     8                     printSortedResult(list);  
     9                 }  
    10             });  
    11     for (int i = 0; i < TEST_THREAD_COUNT; i++) {  
    12         final int num = i;  
    13         threads[i] = new Thread(new Runnable() {  
    14             public void run() {  
    15                 try {  
    16                     Thread.sleep(random.nextInt(100));  
    17                 } catch (InterruptedException e) {  
    18                     e.printStackTrace();  
    19                 }  
    20                 list.add(num);  
    21                 System.out.print(num + " add.\t");  
    22                 try {  
    23                     barrier.await();  
    24                 } catch (InterruptedException e) {  
    25                     e.printStackTrace();  
    26                 } catch (BrokenBarrierException e) {  
    27                     e.printStackTrace();  
    28                 }  
    29             }  
    30         });  
    31         threads[i].start();  
    32     }  
    33     System.in.read();  
    34 }  

    例2

    1.  1 class WalkTarget  
       2 {  
       3     private final int mCount = 5;  
       4     private final CyclicBarrier mBarrier;  
       5     ExecutorService mExecutor;  
       6   
       7     class BarrierAction implements Runnable  
       8     {  
       9         @Override  
      10         public void run()  
      11         {  
      12             // TODO Auto-generated method stub   
      13             System.out.println("所有線程都已經完成任務,計數達到預設值");  
      14             //mBarrier.reset();//恢復到初始化狀態          
      15               
      16         }  
      17     }  
      18   
      19     WalkTarget()  
      20     {  
      21         //初始化CyclicBarrier   
      22         mBarrier = new CyclicBarrier(mCount, new BarrierAction());  
      23         mExecutor = Executors.newFixedThreadPool(mCount);  
      24   
      25         for (int i = 0; i < mCount; i++)  
      26         {  
      27             //啟動工作線程   
      28             mExecutor.execute(new Walker(mBarrier, i));  
      29         }  
      30     }  
      31 }  
      32   
      33 //工作線程   
      34 class Walker implements Runnable  
      35 {  
      36     private final CyclicBarrier mBarrier;  
      37     private final int mThreadIndex;  
      38   
      39     Walker(final CyclicBarrier barrier, final int threadIndex)  
      40  
      41 
      42     {  
      43         mBarrier = barrier;  
      44         mThreadIndex = threadIndex;  
      45     }  
      46   
      47     @Override  
      48     public void run()  
      49     {  
      50         // TODO Auto-generated method stub   
      51         System.out.println("Thread " + mThreadIndex + " is running...");  
      52         // 執行任務   
      53         try  
      54         {  
      55             TimeUnit.MILLISECONDS.sleep(5000);  
      56             // do task   
      57         }  
      58         catch (InterruptedException e)  
      59         {  
      60             // TODO Auto-generated catch block   
      61             e.printStackTrace();  
      62         }  
      63   
      64         // 完成任務以後,等待其他線程完成任務   
      65         try  
      66         {  
      67             mBarrier.await();  
      68         }  
      69         catch (InterruptedException e)  
      70         {  
      71             // TODO Auto-generated catch block   
      72             e.printStackTrace();  
      73         }  
      74         catch (BrokenBarrierException e)  
      75         {  
      76             // TODO Auto-generated catch block   
      77             e.printStackTrace();  
      78         }  
      79         // 其他線程任務都完成以後,阻塞解除,可以繼續接下來的任務   
      80         System.out.println("Thread " + mThreadIndex + " do something else");  
      81     }  
      82   
      83 }  
      84   
      85 public class CountDownLatchTest  
      86 {  
      87     public static void main(String[] args)  
      88     {  
      89         // TODO Auto-generated method stub   
      90         //new CountDownLatchDriver2().main();   
      91         new WalkTarget();  
      92     }  
      93   
      94 }  

      輸出(註意,只有所有的線程barrier.await之後才能繼續執行其他的操作):

      Thread 0 is running... Thread 2 is running... Thread 3 is running... Thread 1 is running... Thread 4 is running... 所有線程都已經完成任務,計數達到預設值 Thread 4 do something else Thread 0 do something else Thread 2 do something else Thread 3 do something else Thread 1 do something else

5、

CountDownLatch和CyclicBarrier簡單比較:

CountDownLatch

CyclicBarrier

軟件包

java.util.concurrent

java.util.concurrent

適用情景

主線程等待多個工作線程結束

多個線程之間互相等待,直到所有線程達到一個障礙點(Barrier point)

主要方法

CountDownLatch(int count) (主線程調用)

初始化計數

CountDownLatch.await (主線程調用)

阻塞,直到等待計數為0解除阻塞

CountDownLatch.countDown

計數減一(工作線程調用)

CyclicBarrier(int parties, Runnable barrierAction) //初始化參與者數量和障礙點執行Action,Action可選。由主線程初始化

CyclicBarrier.await() //由參與者調用

阻塞,直到所有線程達到屏障點

等待結束

各線程之間不再互相影響,可以繼續做自己的事情。不再執行下一個目標工作。

在屏障點達到後,允許所有線程繼續執行,達到下一個目標。可以重復使用CyclicBarrier

異常

如果其中一個線程由於中斷,錯誤,或超時導致永久離開屏障點,其他線程也將拋出異常。

其他

如果BarrierAction不依賴於任何Party中的所有線程,那麽在任何party中的一個線程被釋放的時候,可以直接運行這個Action。

If(barrier.await()==2)

{

//do action

}

java中等待所有線程都執行結束