假如有Thread1、Thread2、Thread3、Thread4四條執行緒分別統計C、D、E、F四個盤的大小,所有執行緒都統計完畢交給Thread5執行緒去做彙總,應當如何實現?
1 callable和future
http://blog.csdn.net/zy_281870667/article/details/72047325
一般情況,我們實現多執行緒都是Thread或者Runnable(後者比較多),但是,這兩種都是沒返回值的,所以我們需要使用callable(有返回值的多執行緒)和future(獲得執行緒的返回值)來實現了。
public class TestThread { public static void main(String[] args) { ThreadCount tc = null; ExecutorService es = Executors.newCachedThreadPool();//執行緒池 CompletionService<Integer> cs = new ExecutorCompletionService<Integer>(es); for(int i=0;i<4;i++){ tc = new ThreadCount(i+1); cs.submit(tc); } // 新增結束,及時shutdown,不然主執行緒不會結束 es.shutdown(); int total = 0; for(int i=0;i<4;i++){ try { total+=cs.take().get(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } System.out.println(total); } } class ThreadCount implements Callable<Integer>{ private int type; ThreadCount(int type){ this.type = type; } @Override public Integer call() throws Exception { if(type==1){ System.out.println("C盤統計大小"); return 1; }else if(type==2){ Thread.sleep(20000); System.out.println("D盤統計大小"); return 2; }else if(type==3){ System.out.println("E盤統計大小"); return 3; }else if(type==4){ System.out.println("F盤統計大小"); return 4; } return null; } }
ps:一個需要注意的小細節,cs.take.get()獲取返回值(這裡阻塞?),是按照完成的順序的,即上面案例返回順序是CEFD
tips:
ExecutorCompletionService 是將 Executor和BlockQueue結合的jdk類,其實現的主要目的是:提交任務執行緒,每一個執行緒任務直線完成後,將返回值放在阻塞佇列中,然後可以通過阻塞佇列的take()方法返回 對應執行緒的執行結果!!
2. join阻塞——直接用join把執行緒5加入進去即可
http://blog.csdn.net/u011971132/article/details/62444282
直接用join把執行緒5加入進去即可
public static void main(String[] args) throws InterruptedException
{
Thread t1 = new Thread(new Worker("thread-1"));
Thread t2 = new Thread(new Worker("thread-2"));
Thread t3 = new Thread(new Worker("thread-3"));
Thread t4 = new Thread(new Worker("thread-4"));
Thread t5 = new Thread(new Worker("thread-5"));
t1.start();t2.start();t3.start();t4.start();
t1.join();t2.join();t3.join();t4.join();
t5.start();
t5.join();
}
或者,在t5的run:
t1.start();t2.start();t3.start();t4.start();
t1.join();t2.join();t3.join();t4.join();
t5執行緒中等待四個執行緒返回
3.用java.util.concurrent下的方法解決
http://blog.csdn.net/wenwen360360/article/details/62104612
用CountDownLatch :一個執行緒(或者多個), 等待另外N個執行緒完成某個事情之後才能執行
CountDownLatch 是計數器, 執行緒完成一個就記一個, 就像 報數一樣, 只不過是遞減的.
盜用別人的一個例子:
- public class CountDownLatchDemo {
- final static SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
- public static void main(String[] args) throws InterruptedException {
- CountDownLatch latch=new CountDownLatch(2);//兩個工人的協作
- Worker worker1=new Worker("zhang san", 5000, latch);
- Worker worker2=new Worker("li si", 8000, latch);
- worker1.start();//
- worker2.start();//
- latch.await();//等待所有工人完成工作
- System.out.println("all work done at "+sdf.format(new Date()));
- }
- static class Worker extends Thread{
- String workerName;
- int workTime;
- CountDownLatch latch;
- public Worker(String workerName ,int workTime ,CountDownLatch latch){
- this.workerName=workerName;
- this.workTime=workTime;
- this.latch=latch;
- }
- public void run(){
- System.out.println("Worker "+workerName+" do work begin at "+sdf.format(new Date()));
- doWork();//工作了
- System.out.println("Worker "+workerName+" do work complete at "+sdf.format(new Date()));
- latch.countDown();//工人完成工作,計數器減一
- }
- private void doWork(){
- try {
- Thread.sleep(workTime);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- }
這樣應該就清楚一點了,對於CountDownLatch來說,重點是那個“一個執行緒”, 是它在等待, 而另外那N的執行緒在把“某個事情”做完之後可以繼續等待,可以終止。而對於CyclicBarrier來說,重點是那N個執行緒,他們之間任何一個沒有完成,所有的執行緒都必須等待。
CyclicBarrier更像一個水閘,
執行緒執行就想水流, 在水閘處都會堵住, 等到水滿(執行緒到齊)了, 才開始洩流.
http://blog.csdn.net/gongpulin/article/details/51236398
- import java.util.concurrent.BrokenBarrierException;
- import java.util.concurrent.CyclicBarrier;
- publicclass CyclicBarrierTest {
- publicstaticvoid main(String[] args) {
- //建立CyclicBarrier物件,
- //並設定執行完一組5個執行緒的併發任務後,再執行MainTask任務
- CyclicBarrier cb = new CyclicBarrier(5, new MainTask());
- new SubTask("A", cb).start();
- new SubTask("B", cb).start();
- new SubTask("C", cb).start();
- new SubTask("D", cb).start();
- new SubTask("E", cb).start();
- }
- }
- /**
- * 最後執行的任務
- */
- class MainTask implements Runnable {
- publicvoid run() {
-
//根據jdkdoc裡的描述,哪個執行緒最後執行完,就執行下面的程式碼。
- System.out.println("......執行最後的任務了......");
- }
- }
- /**
- * 一組併發任務
- */
- class SubTask extends Thread {
- private String name;
- private CyclicBarrier cb;
- SubTask(String name, CyclicBarrier cb) {
- this.name = name;
- this.cb = cb;
- }
- publicvoid run() {
- System.out.println("[併發任務" + name + "] 開始執行");
- for (int i = 0; i < 999999; i++) ; //模擬耗時的任務
- System.out.println("[併發任務" + name + "] 開始執行完畢,通知障礙器");
- try {
- //每執行完一項任務就通知障礙器
- cb.await();
- } catch (InterruptedException e) {
- e.printStackTrace();
- } catch (BrokenBarrierException e) {
- e.printStackTrace();
- }
- }
- }
[併發任務A] 開始執行
[併發任務B] 開始執行
[併發任務B] 開始執行完畢,通知障礙器
[併發任務C] 開始執行
[併發任務C] 開始執行完畢,通知障礙器
[併發任務A] 開始執行完畢,通知障礙器
[併發任務D] 開始執行
[併發任務D] 開始執行完畢,通知障礙器
[併發任務E] 開始執行
[併發任務E] 開始執行完畢,通知障礙器
......執行最後的任務了......
4. ExecutorService類提供其他版本的invokeAll()阻塞——這還是一個更詳細future的例子,其機制類似於join,這裡拿出來
執行緒執行者(六)執行多個任務並處理所