1. 程式人生 > >java-forkjoin帶有返回值

java-forkjoin帶有返回值

over start join random ide eal 獲取 return group

來自並發編程網: http://ifeve.com/fork-join-3/

如果這個任務必須解決一個超過預定義大小的問題,你應該將這個任務分解成更多的子任務,並且用Fork/Join框架來執行這些子任務。當這些子任務完成執行,發起的任務將獲得所有子任務產生的結果 ,對這些結果進行分組,並返回最終的結果。最終,當在池中執行的發起的任務完成它的執行,你將獲取整個問題地最終結果。

1, 生成二維數組模擬文檔:

package com.wenbronk.forkjoin.withresult;

import java.util.Random;

/**
 * Created by wenbronk on 2017/7/26.
 
*/ public class Document { private String words[] = {"the", "hello", "goodbye", "pack", "java", "thread", "pool", "random", "class", "main"}; public String[][] generateDocument(int numLines, int numWords, String word) { int counter = 0; String[][] document = new String[numLines][numWords]; Random random
= new Random(); // 填充數組 for (int i=0; i<numLines; i++){ for (int j=0; j<numWords; j++) { int index=random.nextInt(words.length); document[i][j]=words[index]; if (document[i][j].equals(word)){ counter++; } } } System.
out.println(document.length + ": " + document[document.length - 1].length); System.out.println("DocumentMock: The word appears " + counter + " times in the document"); return document; } }

2, 對模擬文檔的進行行拆分

package com.wenbronk.forkjoin.withresult;

import java.util.ArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.RecursiveTask;

/**
 * Created by wenbronk on 2017/7/26.
 */
public class DocumentTask extends RecursiveTask<Integer> {

    private String[][] document;
    private int start, end;
    private String word;

    public DocumentTask(String[][] document, int start, int end, String word) {
        this.document = document;
        this.start = start;
        this.end = end;
        this.word = word;
    }

    @Override
    protected Integer compute() {
        int result = 0;

        if (end - start < 10) {
            result = processLines(document, start, end, word);
        } else {
            int mid = (start + end) / 2;
            DocumentTask task1 = new DocumentTask(document, start, mid, word);
            DocumentTask task2 = new DocumentTask(document, mid, end, word);

            invokeAll(task1, task2);
            try {
                result = groupResults(task1.get(), task2.get());
            }catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        }
        return result;
    }

    private int groupResults(Integer integer, Integer integer1) {
        return integer + integer1;
    }

    /**
     * 要查找的單詞
     */
    private int processLines(String[][] document, int start, int end, String word) {
        ArrayList<LineTask> tasks = new ArrayList<>();
        for (int i = start; i < end; i++) {
            LineTask lineTask = new LineTask(document[i], 0, document[i].length, word);
            tasks.add(lineTask);
        }

        invokeAll(tasks);
        int result = 0;
//        for (LineTask task : tasks) {
        for (int i = 0; i < tasks.size(); i++) {
            LineTask task = tasks.get(i);
            try {
//                Thread.sleep(100);
                result += task.get();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        return result;
    }
}

3, 對行進行單詞拆分

package com.wenbronk.forkjoin.withresult;

import java.util.concurrent.RecursiveTask;

/**
 * 統計單詞在一行出現的次數
 * Created by wenbronk on 2017/7/27.
 */
public class LineTask extends RecursiveTask<Integer> {
    private static final long seriaVersionUID = 1L;
    private String[] line;
    private int start, end;
    private String word;

    public LineTask(String[] line, int start, int end, String word) {
        this.line = line;
        this.start = start;
        this.end = end;
        this.word = word;
    }

    @Override
    protected Integer compute() {
        Integer result = null;
        if (end - start < 100) {
            result = count(line, start, end, word);
        }else {
            int mid = (start + end) / 2;
            LineTask task1 = new LineTask(line, start, mid, word);
            LineTask task2 = new LineTask(line, mid, end, word);
            invokeAll(task1, task2);
            try {
                result = groupResult(task1.get(), task2.get());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        return result;
    }

    /**
     * 合並2個數值的值, 返回結果
     * @return
     */
    private Integer groupResult(Integer num1, Integer num2) {
        return num1 + num2;
    }

    /**
     * 查找行中出現word的次數
     */
    private Integer count(String[] line, int start, int end, String word) {
        int counter = 0;
        for (int i = start; i < end; i++) {
            if (word.equals(line[i])) {
                counter ++;
            }
        }
//        try {
//            Thread.sleep(10);
//        } catch (Exception e) {
//            e.printStackTrace();
//        }
        return counter;
    }
}

4, 入口執行類

package com.wenbronk.forkjoin.withresult;

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;

/**
 * Created by wenbronk on 2017/7/27.
 */
public class CountMain {

    public static void main(String[] args) {
        Document document = new Document();
        String[][] documents = document.generateDocument(100, 1000, "random");

        DocumentTask task = new DocumentTask(documents, 0, 100, "random");
        ForkJoinPool pool = new ForkJoinPool();
        pool.execute(task);

        do {
            System.out.printf("******************************************\n");
            System.out.printf("Main: Parallelism: %d\n",pool.getParallelism());
            System.out.printf("Main: Active Threads: %d\n",pool.getActiveThreadCount());
            System.out.printf("Main: Task Count: %d\n",pool.getQueuedTaskCount());
            System.out.printf("Main: Steal Count: %d\n",pool.getStealCount());
            System.out.printf("******************************************\n");
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        } while (!task.isDone());

        pool.shutdown();

        try {
            System.out.printf("Main: The word appears %d in the document ", task.get());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

java-forkjoin帶有返回值