Fork/Join框架(三)加入任務的結果
宣告:本文是《 Java 7 Concurrency Cookbook 》的第五章,作者: Javier Fernández González 譯者:許巧輝 校對:方騰飛
加入任務的結果
Fork/Join框架提供了執行返回一個結果的任務的能力。這些任務的型別是實現了RecursiveTask類。這個類繼承了ForkJoinTask類和實現了執行者框架提供的Future介面。
在任務中,你必須使用Java API方法推薦的結構:
If (problem size < size){ tasks=Divide(task); execute(tasks); groupResults() return result; } else { resolve problem; return result; }
如果這個任務必須解決一個超過預定義大小的問題,你應該將這個任務分解成更多的子任務,並且用Fork/Join框架來執行這些子任務。當這些子任務完成執行,發起的任務將獲得所有子任務產生的結果 ,對這些結果進行分組,並返回最終的結果。最終,當在池中執行的發起的任務完成它的執行,你將獲取整個問題地最終結果。
在這個指南中,你將學習如何使用Fork/Join框架解決這種問題,開發一個在文件中查詢單詞的應用程式。你將會實現以下兩種任務型別:
- 一個文件任務,將在文件中的行集合中查詢一個單詞。
- 一個行任務,將在文件的一部分資料中查詢一個單詞。
所有任務將返回單詞在文件的一部分中或行中出現的次數。
如何做…
根據以下這些步驟來實現這個例子:
1.建立一個Document類,它將產生用來模擬文件的字串的二維陣列。
public class Document {
2.建立一個帶有一些單詞的字串陣列。這個陣列將被用來生成字串二維陣列。
private String words[]={"the","hello","goodbye","packt", "java","thread","pool","random","class","main"};
3.實現generateDocument()方法。它接收以下引數:行數、每行的單詞數。這個例子返回一個字串二維陣列,來表示將要查詢的單詞。
public String[][] generateDocument(int numLines, int numWords,String word){
4.首先,建立生成這個文件必需的物件:字串二維物件和生成隨機數的Random物件。
int counter=0; String document[][]=new String[numLines][numWords]; Random random=new Random();
5.用字串填充這個陣列。儲存在每個位置的字串是單詞陣列的隨機位置,統計這個程式將要在生成的陣列中查詢的單詞出現的次數。你可以使用這個值來檢查程式是否執行正確。
for (int i=0; i<numLines; i++){ for (int j=0; j<numWords; j++) { int index=random.nextInt(words.length); document[i][j]=words[index]; if (document[i][j].equals(word)){ counter++; } } }
6.將單詞出現的次數寫入控制檯,並返回生成的二維陣列。
System.out.println("DocumentMock: The word appears "+counter+" times in the document"); return document;
7.建立一個DocumentTask類,指定它繼承RecursiveTask類,並引數化為Integer型別。該類將實現統計單詞在一組行中出現的次數的任務。
public class DocumentTask extends RecursiveTask<Integer> {
8.宣告一個私有的String型別的二維陣列document,兩個私有的int型別的屬性名為start和end,一個私有的String型別的屬性名為word。
private String document[][]; private int start, end; private String word;
9.實現這個類的構造器,用來初始化這些屬性。
public DocumentTask (String document[][], int start, int end, String word){ this.document=document; this.start=start; this.end=end; this.word=word; }
10.實現compute()方法。如果屬性end和start的差小於10,那麼這個任務統計單詞位於行在呼叫processLines()方法的這些位置中出現的次數。
@Override protected Integer compute() { int result; if (end-start<10){ result=processLines(document, start, end, word);
11.否則,用兩個物件分解行組,建立兩個新的DocumentTask物件用來處理這兩個組,並且在池中使用invokeAll()方法來執行它們。
} else { int mid=(start+end)/2; DocumentTask task1=new DocumentTask(document,start,mid,word); DocumentTask task2=new DocumentTask(document,mid,end,word); invokeAll(task1,task2);
12.然後,使用groupResults()方法將這兩個任務返回的結果相加。最後,返回這個任務統計的結果。
try { result=groupResults(task1.get(),task2.get()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } return result;
13.實現processLines()方法。它接收以下引數:字串二維陣列、start屬性、end屬性、任務將要查詢的word屬性。
private Integer processLines(String[][] document, int start, int end,String word) {
14.對於任務要處理的每行,建立LineTask物件來處理整行,並且將它們儲存在任務數列中。
List<LineTask> tasks=new ArrayList<LineTask>(); for (int i=start; i<end; i++){ LineTask task=new LineTask(document[i], 0, document[i]. length, word); tasks.add(task); }
15.在那個數列中使用invokeAll()執行所有任務。
invokeAll(tasks);
16.合計所有這些任務返回的值,並返回這個結果。
int result=0; for (int i=0; i<tasks.size(); i++) { LineTask task=tasks.get(i); try { result=result+task.get(); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } return new Integer(result);
17.實現groupResults()方法。它相加兩個數,並返回這個結果。
private Integer groupResults(Integer number1, Integer number2) { Integer result; result=number1+number2; return result; }
18.建立LineTask類,指定它繼承RecursiveTask類,並引數化為Integer型別。這個類將實現統計單詞在一行中出現的次數的任務。
public class LineTask extends RecursiveTask<Integer>{
19.宣告這個類的序列號版本UID。這個元素是必需的,因為RecursiveTask類的父類,ForkJoinTask類實現了Serializable介面。宣告一個私有的、String型別的屬性line,兩個私有的、int型別的屬性start和end,一個私有的、String型別的屬性word。
private static final long serialVersionUID = 1L; private String line[]; private int start, end; private String word;
20.實現這個類的構造器,初始化這些屬性。
public LineTask(String line[], int start, int end, String word) { this.line=line; this.start=start; this.end=end; this.word=word; }
21.實現這個類的compute()方法。如果屬性end和start之差小於100,這個任務在行中由start和end屬性使用count()方法決定的片斷中查詢單詞。
@Override protected Integer compute() { Integer result=null; if (end-start<100) { result=count(line, start, end, word);
22.否則,將行中的單片語分成兩部分,建立兩個新的LineTask物件來處理這兩個組,在池中使用invokeAll()方法執行它們。
} else { int mid=(start+end)/2; LineTask task1=new LineTask(line, start, mid, word); LineTask task2=new LineTask(line, mid, end, word); invokeAll(task1, task2);
23.然後,使用groupResults()方法將這兩個任務返回的值相加。最後,返回這個任務計算的結果。
try { result=groupResults(task1.get(),task2.get()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } return result;
24.實現count()方法。它接收以下引數:完整行的字串陣列、start屬性、end屬性、任務將要查詢的word屬性。
private Integer count(String[] line, int start, int end, String word) {
25.比較這個任務將要查詢的word屬性中的在start和end屬性之間的位置的單詞,如果它們相等,則增加count變數。
int counter; counter=0; for (int i=start; i<end; i++){ if (line[i].equals(word)){ counter++; } }
26.為了顯示示例的執行,令任務睡眠10毫秒。
try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); }
27.返回counter變數的值。
return counter;
28.實現groupResults()方法。它合計兩個數的值,並返回這個結果。
private Integer groupResults(Integer number1, Integer number2) { Integer result; result=number1+number2; return result; }
29.實現示例的主類,通過建立Main類,並實現main()方法。
public class Main{ public static void main(String[] args) {
30.使用DocumentMock類,建立一個帶有100行,每行1000個單詞的Document。
DocumentMock mock=new DocumentMock(); String[][] document=mock.generateDocument(100, 1000, "the");
31.建立一個新的DocumentTask物件,用來更新整個文件的產品。引數start值為0,引數end值為100。
DocumentTask task=new DocumentTask(document, 0, 100, "the");
32.使用無參構造器建立一個ForkJoinPool物件,在池中使用execute()方法執行這個任務。
ForkJoinPool pool=new ForkJoinPool(); pool.execute(task);
33.實現一個程式碼塊,用來顯示關於池變化的資訊。每秒向控制檯寫入池的某些引數的值,直到任務完成它的執行。
do { System.out.printf("******************************************\n"); System.out.printf("Main: Parallelism: %d\n",pool.getParallelism()); System.out.printf("Main: Active Threads: %d\n",pool.getActiveThreadCount()); System.out.printf("Main: Task Count: %d\n",pool.getQueuedTaskCount()); System.out.printf("Main: Steal Count: %d\n",pool.getStealCount()); System.out.printf("******************************************\n"); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } while (!task.isDone());
34.使用shutdown()方法關閉這個池。
pool.shutdown();
35.使用awaitTermination()方法等待任務的結束。
try { System.out.printf("Main: The word appears %d in the document",task.get()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); }
36.列印單詞在文件中出現的次數。檢查這個數是否與DocumentMock類中寫入的數一樣。
try { System.out.printf("Main: The word appears %d in the document",task.get()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); }
它是如何工作的…
在這個示例中,你已經實現了兩種不同的任務:
- DocumentTask類:這個類的任務將處理由start和end屬性決定的文件中的行組。如果這個行組的大小小於10,它為每行建立LineTask物件,並且當它們完成它們的執行時,它合計這些任務的結果,並返回這個合計值。如果這個任務要處理的行組大小不小於10,它將這個組分成兩個並建立兩個DocumentTask物件來處理這些新組。當這些任務完成它們的執行時,這個任務合計它們的結果,並返回這個合計值。
- LineTask類:這個類的任務將處理文件中的一行的單片語。如果這個單片語小於10,這個任務直接在這個單片語中查詢單詞,並且返回這個單詞出現的次數。否則,它將這個單片語分成兩個並建立兩個LineTask物件來處理。當這些任務完成它們的執行,這個任務合計這些任務的結果並返回這個合計值。
在Main類中,你已經使用預設構造器一個ForkJoinPool物件,並且你在它裡面執行一個DocumentTask類,這個類將處理一個擁有100行,每行有1000個單詞的文件。這個任務將使用其他的DocumentTask物件和LineTask物件來分解這個問題,當所有任務完成它們的執行,你可以使用啟動任務來獲取單詞在整個文件中出現的總次數。由於任務返回一個結果,所以它們繼承RecursiveTask類。
為了獲取Task返回的結果,你已經使用了get()方法 。這個方法是在Future介面中宣告的,由RecursiveTask類實現的。
當你執行這個程式,你可以比較在控制檯中的第一行和最後一行。第一行是文件生成時計算的單詞出現的次數,最後一行是由Fork/Join任務計算的。
不止這些…
ForkJoinTask類提供其他的方法來完成一個任務的執行,並返回一個結果,這就是complete()方法。這個方法接收一個RecursiveTask類的引數化型別的物件,並且當join()方法被呼叫時,將這個物件作為任務的結果返回。 它被推薦使用在:提供非同步任務結果。
由於RecursiveTask類實現了Future介面,get()方法其他版本如下:
- get(long timeout, TimeUnit unit):這個版本的get()方法,如果任務的結果不可用,在指定的時間內等待它。如果超時並且結果不可用,那麼這個方法返回null值。TimeUnit類是一個列舉類,它有以下常量:DAYS, HOURS,MICROSECONDS,MILLISECONDS, MINUTES, NANOSECONDS和SECONDS。
參見
- 在第5章,Fork/Join框架中的建立一個Fork/Join池的指南
- 在第8章,測試併發應用程式中的監控Fork/Join池的指南