1. 程式人生 > >【技能庫】--批量任務多執行緒併發執行(324)

【技能庫】--批量任務多執行緒併發執行(324)

擴充套件callable 介面 並且  Futrue<?> 

import com.alibaba.fastjson.JSON;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Uninterruptibles;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.springframework.util.CollectionUtils;
import java.util.List;
import java.util.Random; import java.util.Stack; import java.util.concurrent.*; /** * Created by xiayin on 2017/9/16. */ public class ConcurrentThreadTest { //List 任務 //每次取制定個數任務 //任務擴充套件callable //指定執行緒池執行任務 200ms內沒有執行完畢的下次不再提取 //迴圈執行 private static Stack<TaskWithId> stack = new Stack<>();
private static ExecutorService pool = Executors.newFixedThreadPool(3); private static int countDownNumber = 3; public static void main(String[] args) { moskListTask(); while (true) { List<Integer> doneList = Lists.newArrayList(); CountDownLatch latch = new
CountDownLatch(countDownNumber); List<TaskWithId> taskWithIds = pullFixNumTask(countDownNumber); StringBuilder bi = new StringBuilder(); for (TaskWithId taskWithId : taskWithIds) { bi.append(taskWithId.id).append(" "); } System.out.println("本次需要執行的任務 ids = " + bi.toString()); if (CollectionUtils.isEmpty(taskWithIds)) { break; } List<Future<Integer>> result = Lists.newArrayList(); for (TaskWithId taskWithId : taskWithIds) { taskWithId.setLatch(latch); result.add(pool.submit(taskWithId)); } //等待latch 設定過期時間 Uninterruptibles.awaitUninterruptibly(latch, 20, TimeUnit.MICROSECONDS); //每個任務等待指定時間 不中斷實際任務的執行 獲取結果 for (Future<Integer> future : result) { try { Uninterruptibles.getUninterruptibly(future, 1, TimeUnit.SECONDS);//非中斷獲取結果 doneList.add(future.get()); } catch (ExecutionException e) { e.printStackTrace(); } catch (TimeoutException e) { System.out.println("任務超時"); } catch (InterruptedException e) { e.printStackTrace(); } } StringBuilder ri = new StringBuilder(); for (Integer integer : doneList) { ri.append(integer).append(" "); } System.out.println(); System.out.println("本次完成的任務 ids = " + ri.toString()); } } private static List<TaskWithId> pullFixNumTask(int num) { List<TaskWithId> list = Lists.newArrayList(); while (!stack.isEmpty() && num > 0) { num--; list.add(stack.pop()); // System.out.println("取出任務"); } return list; } private static List<TaskWithId> moskListTask() { for (int i = 0; i < 10; i++) { stack.push(new TaskWithId(i)); } System.out.println("所有的任務集合=" + JSON.toJSONString(stack.elements())); return stack; } static class TaskWithId implements Callable { private int id; private CountDownLatch latch; public void setLatch(CountDownLatch latch) { this.latch = latch; } TaskWithId(int id) { this.id = id; } public void run() { try { Thread.sleep(new Random().nextInt(500) * id); } catch (InterruptedException e) { e.printStackTrace(); } finally { latch.countDown(); } } @Override public Object call() throws Exception { run(); System.out.println("done "+id ); return id; } @Override public String toString() { return ToStringBuilder.reflectionToString(this); } } }