java中等待所有線程都執行結束
轉自:http://blog.csdn.net/liweisnake/article/details/12966761
今天看到一篇文章,是關於java中如何等待所有線程都執行結束,文章總結得很好,原文如下http://software.intel.com/zh-cn/blogs/2013/10/15/java-countdownlatchcyclicbarrier/?utm_campaign=CSDN&utm_source=intel.csdn.net&utm_medium=Link&utm_content=others-%20Java
看過之後在想java中有很大的靈活性,應該有更多的方式可以做這件事。
這個事情的場景是這樣的:許多線程並行的計算一堆問題,然後每個計算存在一個隊列,在主線程要等待所有計算結果完成後排序並展示出來。這樣的問題其實很常見。
1. 使用join。這種方式其實並不是那麽的優雅,將所有線程啟動完之後還需要將所有線程都join,但是每次join都會阻塞,直到被join線程完成,很可能所有被阻塞線程已經完事了,主線程還在不斷地join,貌似有點浪費,而且兩個循環也不太好看。
-
1 public void testThreadSync1() { 2 3 final Vector<Integer> list = new Vector<Integer>();
-
1 9 add. 7 add. 3 add. 5 add. 4 add. 1 add. 0 add. 0 end. 1 end. 8 add. 2 add. 2 end. 3 end. 4 end. 5 end. 6 add. 6 end. 7 end. 8 end. 9 end. 2 before sort 3 9 7 3 5 4 1 0 8 2 6 4 after sort 5 0 1 2 3 4 5 6 7 8 9
2. 使用wait/notifyAll,這個方式其實跟上面是類似的,只是比較底層些吧(join實際上也是wait)。
-
1 @Test 2 public void testThreadSync2() throws IOException, InterruptedException { 3 final Object waitObject = new Object(); 4 final AtomicInteger count = new AtomicInteger(TEST_THREAD_COUNT); 5 final Vector<Integer> list = new Vector<Integer>(); 6 Thread[] threads = new Thread[TEST_THREAD_COUNT]; 7 for (int i = 0; i < TEST_THREAD_COUNT; i++) { 8 final int num = i; 9 threads[i] = new Thread(new Runnable() { 10 public void run() { 11 try { 12 Thread.sleep(random.nextInt(100)); 13 } catch (InterruptedException e) { 14 e.printStackTrace(); 15 } 16 list.add(num); 17 System.out.print(num + " add.\t"); 18 synchronized (waitObject) { 19 int cnt = count.decrementAndGet(); 20 if (cnt == 0) { 21 waitObject.notifyAll(); 22 } 23 } 24 } 25 }); 26 threads[i].start(); 27 } 28 synchronized (waitObject) { 29 while (count.get() != 0) { 30 waitObject.wait(); 31 } 32 } 33 printSortedResult(list); 34 }
3. 使用CountDownLatch,這其實是最優雅的寫法了,每個線程完成後都去將計數器減一,最後完成時再來喚醒。
例1
-
1 @Test 2 public void testThreadSync3() { 3 final Vector<Integer> list = new Vector<Integer>(); 4 Thread[] threads = new Thread[TEST_THREAD_COUNT]; 5 final CountDownLatch latch = new CountDownLatch(TEST_THREAD_COUNT); 6 for (int i = 0; i < TEST_THREAD_COUNT; i++) { 7 final int num = i; 8 threads[i] = new Thread(new Runnable() { 9 public void run() { 10 try { 11 Thread.sleep(random.nextInt(100)); 12 } catch (InterruptedException e) { 13 e.printStackTrace(); 14 } 15 list.add(num); 16 System.out.print(num + " add.\t"); 17 latch.countDown(); 18 } 19 }); 20 threads[i].start(); 21 } 22 try { 23 latch.await(); 24 } catch (InterruptedException e) { 25 e.printStackTrace(); 26 } 27 printSortedResult(list); 28 }
例2
CountDownLatch 初始化設置count,即等待(await)count個線程或一個線程count次計數,通過工作線程來countDown計數減一,直到計數為0,await阻塞結束。
設置的count不可更改,如需要動態設置計數的線程數,可以使用CyclicBarrier.
下面的例子,所有的工作線程中準備就緒以後,並不是直接運行,而是等待主線程的信號後再執行具體的操作。
-
1 package com.example.multithread; 2 3 import java.util.concurrent.CountDownLatch; 4 5 class Driver 6 { 7 private static final int TOTAL_THREADS = 10; 8 private final CountDownLatch mStartSignal = new CountDownLatch(1); 9 private final CountDownLatch mDoneSignal = new CountDownLatch(TOTAL_THREADS); 10 11 void main() 12 { 13 for (int i = 0; i < TOTAL_THREADS; i++) 14 { 15 new Thread(new Worker(mStartSignal, mDoneSignal, i)).start(); 16 } 17 System.out.println("Main Thread Now:" + System.currentTimeMillis()); 18 doPrepareWork();// 準備工作 19 mStartSignal.countDown();// 計數減一為0,工作線程真正啟動具體操作 20 doSomethingElse();//做點自己的事情 21 try 22 { 23 mDoneSignal.await();// 等待所有工作線程結束 24 } 25 catch (InterruptedException e) 26 { 27 // TODO Auto-generated catch block 28 e.printStackTrace(); 29 } 30 System.out.println("All workers have finished now."); 31 System.out.println("Main Thread Now:" + System.currentTimeMillis()); 32 } 33 34 void doPrepareWork() 35 { 36 System.out.println("Ready,GO!"); 37 } 38 39 void doSomethingElse() 40 { 41 for (int i = 0; i < 100000; i++) 42 { 43 ;// delay 44 } 45 System.out.println("Main Thread Do something else."); 46 } 47 } 48 49 class Worker implements Runnable 50 { 51 private final CountDownLatch mStartSignal; 52 private final CountDownLatch mDoneSignal; 53 private final int mThreadIndex; 54 55 Worker(final CountDownLatch startSignal, final CountDownLatch doneSignal, 56 final int threadIndex) 57 { 58 this.mDoneSignal = doneSignal; 59 this.mStartSignal = startSignal; 60 this.mThreadIndex = threadIndex; 61 } 62 63 @Override 64 public void run() 65 { 66 // TODO Auto-generated method stub 67 try 68 { 69 mStartSignal.await();// 阻塞,等待mStartSignal計數為0運行後面的代碼 70 // 所有的工作線程都在等待同一個啟動的命令 71 doWork();// 具體操作 72 System.out.println("Thread " + mThreadIndex + " Done Now:" 73 + System.currentTimeMillis()); 74 mDoneSignal.countDown();// 完成以後計數減一 75 } 76 catch (InterruptedException e) 77 { 78 // TODO Auto-generated catch block 79 e.printStackTrace(); 80 } 81 } 82 83 public void doWork() 84 { 85 for (int i = 0; i < 1000000; i++) 86 { 87 ;// 耗時操作 88 } 89 System.out.println("Thread " + mThreadIndex + ":do work"); 90 } 91 } 92 93 public class CountDownLatchTest 94 { 95 public static void main(String[] args) 96 { 97 // TODO Auto-generated method stub 98 new Driver().main(); 99 } 100 101 }
通過Executor啟動線程:
-
1 class CountDownLatchDriver2 2 { 3 private static final int TOTAL_THREADS = 10; 4 private final CountDownLatch mDoneSignal = new CountDownLatch(TOTAL_THREADS); 5 6 7 8 void main() 9 { 10 System.out.println("Main Thread Now:" + System.currentTimeMillis()); 11 doPrepareWork();// 準備工作 12 13 Executor executor = Executors.newFixedThreadPool(TOTAL_THREADS); 14 for (int i = 0; i < TOTAL_THREADS; i++) 15 { 16 // 通過內建的線程池維護創建的線程 17 executor.execute(new RunnableWorker(mDoneSignal, i)); 18 } 19 doSomethingElse();// 做點自己的事情 20 try 21 { 22 mDoneSignal.await();// 等待所有工作線程結束 23 } 24 catch (InterruptedException e) 25 { 26 // TODO Auto-generated catch block 27 e.printStackTrace(); 28 } 29 System.out.println("All workers have finished now."); 30 System.out.println("Main Thread Now:" + System.currentTimeMillis()); 31 } 32 33 void doPrepareWork() 34 { 35 System.out.println("Ready,GO!"); 36 } 37 38 void doSomethingElse() 39 { 40 for (int i = 0; i < 100000; i++) 41 { 42 ;// delay 43 } 44 System.out.println("Main Thread Do something else."); 45 } 46 } 47 48 class RunnableWorker implements Runnable 49 { 50 51 private final CountDownLatch mDoneSignal; 52 private final int mThreadIndex; 53 54 RunnableWorker(final CountDownLatch doneSignal, final int threadIndex) 55 { 56 this.mDoneSignal = doneSignal; 57 this.mThreadIndex = threadIndex; 58 } 59 60 @Override 61 public void run() 62 { 63 // TODO Auto-generated method stub 64 65 doWork();// 具體操作 66 System.out.println("Thread " + mThreadIndex + " Done Now:" 67 + System.currentTimeMillis()); 68 mDoneSignal.countDown();// 完成以後計數減一 69 // 計數為0時,主線程接觸阻塞,繼續執行其他任務 70 try 71 { 72 // 可以繼續做點其他的事情,與主線程無關了 73 Thread.sleep(5000); 74 System.out.println("Thread " + mThreadIndex 75 + " Do something else after notifing main thread"); 76 77 } 78 catch (InterruptedException e) 79 { 80 // TODO Auto-generated catch block 81 e.printStackTrace(); 82 } 83 84 } 85 86 public void doWork() 87 { 88 for (int i = 0; i < 1000000; i++) 89 { 90 ;// 耗時操作 91 } 92 System.out.println("Thread " + mThreadIndex + ":do work"); 93 } 94 }
輸出:
1 Main Thread Now:1359959480786 2 Ready,GO! 3 Thread 0:do work 4 Thread 0 Done Now:1359959480808 5 Thread 1:do work 6 Thread 1 Done Now:1359959480811 7 Thread 2:do work 8 Thread 2 Done Now:1359959480813 9 Main Thread Do something else. 10 Thread 3:do work 11 Thread 3 Done Now:1359959480825 12 Thread 5:do work 13 Thread 5 Done Now:1359959480827 14 Thread 7:do work 15 Thread 7 Done Now:1359959480829 16 Thread 9:do work 17 Thread 9 Done Now:1359959480831 18 Thread 4:do work 19 Thread 4 Done Now:1359959480833 20 Thread 6:do work 21 Thread 6 Done Now:1359959480835 22 Thread 8:do work 23 Thread 8 Done Now:1359959480837 24 All workers have finished now. 25 Main Thread Now:1359959480838 26 Thread 0 Do something else after notifing main thread 27 Thread 1 Do something else after notifing main thread 28 Thread 2 Do something else after notifing main thread 29 Thread 3 Do something else after notifing main thread 30 Thread 9 Do something else after notifing main thread 31 Thread 7 Do something else after notifing main thread 32 Thread 5 Do something else after notifing main thread 33 Thread 4 Do something else after notifing main thread 34 Thread 6 Do something else after notifing main thread 35 Thread 8 Do something else after notifing main thread
-
4. 使用CyclicBarrier。這裏其實類似上面,這個berrier只是在等待完成後自動調用傳入CyclicBarrier的Runnable。
例1
-
1 @Test 2 public void testThreadSync4() throws IOException { 3 final Vector<Integer> list = new Vector<Integer>(); 4 Thread[] threads = new Thread[TEST_THREAD_COUNT]; 5 final CyclicBarrier barrier = new CyclicBarrier(TEST_THREAD_COUNT, 6 new Runnable() { 7 public void run() { 8 printSortedResult(list); 9 } 10 }); 11 for (int i = 0; i < TEST_THREAD_COUNT; i++) { 12 final int num = i; 13 threads[i] = new Thread(new Runnable() { 14 public void run() { 15 try { 16 Thread.sleep(random.nextInt(100)); 17 } catch (InterruptedException e) { 18 e.printStackTrace(); 19 } 20 list.add(num); 21 System.out.print(num + " add.\t"); 22 try { 23 barrier.await(); 24 } catch (InterruptedException e) { 25 e.printStackTrace(); 26 } catch (BrokenBarrierException e) { 27 e.printStackTrace(); 28 } 29 } 30 }); 31 threads[i].start(); 32 } 33 System.in.read(); 34 }
例2
-
1 class WalkTarget 2 { 3 private final int mCount = 5; 4 private final CyclicBarrier mBarrier; 5 ExecutorService mExecutor; 6 7 class BarrierAction implements Runnable 8 { 9 @Override 10 public void run() 11 { 12 // TODO Auto-generated method stub 13 System.out.println("所有線程都已經完成任務,計數達到預設值"); 14 //mBarrier.reset();//恢復到初始化狀態 15 16 } 17 } 18 19 WalkTarget() 20 { 21 //初始化CyclicBarrier 22 mBarrier = new CyclicBarrier(mCount, new BarrierAction()); 23 mExecutor = Executors.newFixedThreadPool(mCount); 24 25 for (int i = 0; i < mCount; i++) 26 { 27 //啟動工作線程 28 mExecutor.execute(new Walker(mBarrier, i)); 29 } 30 } 31 } 32 33 //工作線程 34 class Walker implements Runnable 35 { 36 private final CyclicBarrier mBarrier; 37 private final int mThreadIndex; 38 39 Walker(final CyclicBarrier barrier, final int threadIndex) 40 41 42 { 43 mBarrier = barrier; 44 mThreadIndex = threadIndex; 45 } 46 47 @Override 48 public void run() 49 { 50 // TODO Auto-generated method stub 51 System.out.println("Thread " + mThreadIndex + " is running..."); 52 // 執行任務 53 try 54 { 55 TimeUnit.MILLISECONDS.sleep(5000); 56 // do task 57 } 58 catch (InterruptedException e) 59 { 60 // TODO Auto-generated catch block 61 e.printStackTrace(); 62 } 63 64 // 完成任務以後,等待其他線程完成任務 65 try 66 { 67 mBarrier.await(); 68 } 69 catch (InterruptedException e) 70 { 71 // TODO Auto-generated catch block 72 e.printStackTrace(); 73 } 74 catch (BrokenBarrierException e) 75 { 76 // TODO Auto-generated catch block 77 e.printStackTrace(); 78 } 79 // 其他線程任務都完成以後,阻塞解除,可以繼續接下來的任務 80 System.out.println("Thread " + mThreadIndex + " do something else"); 81 } 82 83 } 84 85 public class CountDownLatchTest 86 { 87 public static void main(String[] args) 88 { 89 // TODO Auto-generated method stub 90 //new CountDownLatchDriver2().main(); 91 new WalkTarget(); 92 } 93 94 }
輸出(註意,只有所有的線程barrier.await之後才能繼續執行其他的操作):
Thread 0 is running... Thread 2 is running... Thread 3 is running... Thread 1 is running... Thread 4 is running... 所有線程都已經完成任務,計數達到預設值 Thread 4 do something else Thread 0 do something else Thread 2 do something else Thread 3 do something else Thread 1 do something else
-
5、
CountDownLatch和CyclicBarrier簡單比較:
|
CountDownLatch |
CyclicBarrier |
---|---|---|
軟件包 |
java.util.concurrent |
java.util.concurrent |
適用情景 |
主線程等待多個工作線程結束 |
多個線程之間互相等待,直到所有線程達到一個障礙點(Barrier point) |
主要方法 |
CountDownLatch(int count) (主線程調用) 初始化計數 CountDownLatch.await (主線程調用) 阻塞,直到等待計數為0解除阻塞 CountDownLatch.countDown 計數減一(工作線程調用) |
CyclicBarrier(int parties, Runnable barrierAction) //初始化參與者數量和障礙點執行Action,Action可選。由主線程初始化 CyclicBarrier.await() //由參與者調用 阻塞,直到所有線程達到屏障點 |
等待結束 |
各線程之間不再互相影響,可以繼續做自己的事情。不再執行下一個目標工作。 |
在屏障點達到後,允許所有線程繼續執行,達到下一個目標。可以重復使用CyclicBarrier |
異常 |
|
如果其中一個線程由於中斷,錯誤,或超時導致永久離開屏障點,其他線程也將拋出異常。 |
其他 |
|
如果BarrierAction不依賴於任何Party中的所有線程,那麽在任何party中的一個線程被釋放的時候,可以直接運行這個Action。 If(barrier.await()==2) { //do action } |
java中等待所有線程都執行結束