java-forkjoin帶有返回值
阿新 • • 發佈:2017-07-27
over start join random ide eal 獲取 return group
來自並發編程網: http://ifeve.com/fork-join-3/
如果這個任務必須解決一個超過預定義大小的問題,你應該將這個任務分解成更多的子任務,並且用Fork/Join框架來執行這些子任務。當這些子任務完成執行,發起的任務將獲得所有子任務產生的結果 ,對這些結果進行分組,並返回最終的結果。最終,當在池中執行的發起的任務完成它的執行,你將獲取整個問題地最終結果。
1, 生成二維數組模擬文檔:
package com.wenbronk.forkjoin.withresult; import java.util.Random; /** * Created by wenbronk on 2017/7/26.*/ public class Document { private String words[] = {"the", "hello", "goodbye", "pack", "java", "thread", "pool", "random", "class", "main"}; public String[][] generateDocument(int numLines, int numWords, String word) { int counter = 0; String[][] document = new String[numLines][numWords]; Random random= new Random(); // 填充數組 for (int i=0; i<numLines; i++){ for (int j=0; j<numWords; j++) { int index=random.nextInt(words.length); document[i][j]=words[index]; if (document[i][j].equals(word)){ counter++; } } } System.out.println(document.length + ": " + document[document.length - 1].length); System.out.println("DocumentMock: The word appears " + counter + " times in the document"); return document; } }
2, 對模擬文檔的進行行拆分
package com.wenbronk.forkjoin.withresult; import java.util.ArrayList; import java.util.concurrent.ExecutionException; import java.util.concurrent.RecursiveTask; /** * Created by wenbronk on 2017/7/26. */ public class DocumentTask extends RecursiveTask<Integer> { private String[][] document; private int start, end; private String word; public DocumentTask(String[][] document, int start, int end, String word) { this.document = document; this.start = start; this.end = end; this.word = word; } @Override protected Integer compute() { int result = 0; if (end - start < 10) { result = processLines(document, start, end, word); } else { int mid = (start + end) / 2; DocumentTask task1 = new DocumentTask(document, start, mid, word); DocumentTask task2 = new DocumentTask(document, mid, end, word); invokeAll(task1, task2); try { result = groupResults(task1.get(), task2.get()); }catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } return result; } private int groupResults(Integer integer, Integer integer1) { return integer + integer1; } /** * 要查找的單詞 */ private int processLines(String[][] document, int start, int end, String word) { ArrayList<LineTask> tasks = new ArrayList<>(); for (int i = start; i < end; i++) { LineTask lineTask = new LineTask(document[i], 0, document[i].length, word); tasks.add(lineTask); } invokeAll(tasks); int result = 0; // for (LineTask task : tasks) { for (int i = 0; i < tasks.size(); i++) { LineTask task = tasks.get(i); try { // Thread.sleep(100); result += task.get(); } catch (Exception e) { e.printStackTrace(); } } return result; } }
3, 對行進行單詞拆分
package com.wenbronk.forkjoin.withresult; import java.util.concurrent.RecursiveTask; /** * 統計單詞在一行出現的次數 * Created by wenbronk on 2017/7/27. */ public class LineTask extends RecursiveTask<Integer> { private static final long seriaVersionUID = 1L; private String[] line; private int start, end; private String word; public LineTask(String[] line, int start, int end, String word) { this.line = line; this.start = start; this.end = end; this.word = word; } @Override protected Integer compute() { Integer result = null; if (end - start < 100) { result = count(line, start, end, word); }else { int mid = (start + end) / 2; LineTask task1 = new LineTask(line, start, mid, word); LineTask task2 = new LineTask(line, mid, end, word); invokeAll(task1, task2); try { result = groupResult(task1.get(), task2.get()); } catch (Exception e) { e.printStackTrace(); } } return result; } /** * 合並2個數值的值, 返回結果 * @return */ private Integer groupResult(Integer num1, Integer num2) { return num1 + num2; } /** * 查找行中出現word的次數 */ private Integer count(String[] line, int start, int end, String word) { int counter = 0; for (int i = start; i < end; i++) { if (word.equals(line[i])) { counter ++; } } // try { // Thread.sleep(10); // } catch (Exception e) { // e.printStackTrace(); // } return counter; } }
4, 入口執行類
package com.wenbronk.forkjoin.withresult; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.TimeUnit; /** * Created by wenbronk on 2017/7/27. */ public class CountMain { public static void main(String[] args) { Document document = new Document(); String[][] documents = document.generateDocument(100, 1000, "random"); DocumentTask task = new DocumentTask(documents, 0, 100, "random"); ForkJoinPool pool = new ForkJoinPool(); pool.execute(task); do { System.out.printf("******************************************\n"); System.out.printf("Main: Parallelism: %d\n",pool.getParallelism()); System.out.printf("Main: Active Threads: %d\n",pool.getActiveThreadCount()); System.out.printf("Main: Task Count: %d\n",pool.getQueuedTaskCount()); System.out.printf("Main: Steal Count: %d\n",pool.getStealCount()); System.out.printf("******************************************\n"); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } while (!task.isDone()); pool.shutdown(); try { System.out.printf("Main: The word appears %d in the document ", task.get()); } catch (Exception e) { e.printStackTrace(); } } }
java-forkjoin帶有返回值