定製併發類(八)自定義在 Fork/Join 框架中執行的任務
宣告:本文是《 Java 7 Concurrency Cookbook 》的第七章, 作者: Javier Fernández González 譯者:鄭玉婷
自定義在 Fork/Join 框架中執行的任務
執行者框架分開了任務的建立和執行。這樣,你只要實現 Runnable 物件來使用 Executor 物件。你可以傳送 Runnable 任務給執行者,然後它會建立,管理,並終結必要的執行緒來執行這些任務。
Java 7 在 Fork/Join 框架中提供了特殊的執行者。這個框架是設計用來解決那些可以使用 divide 和 conquer 技術分成更小點的任務的問題。在一個任務內,你要檢查你要解決的問題的大小,如果它比設定的大小還大,你就把問題分成2個或多個任務,再使用框架來執行這些任務。
如果問題的大小比設定的大小要小,你可以在任務中直接解決問題,可選擇返回結果。Fork/Join 框架實現 work-stealing 演算法來提高這類問題的整體表現。
Fork/Join 框架的主要類是 ForkJoinPool 類。它內部有以下2個元素:
- 一個等待執行的任務queue
- 一個執行任務的執行緒池
預設情況,被 ForkJoinPool類執行的任務是 ForkJoinTask 類的物件。你也可以傳送 Runnable 和 Callable 物件給 ForkJoinPool 類,但是他們就不能獲得所以 Fork/Join 框架的好處。通常情況,你將傳送ForkJoinTask 類的這2個子類中的一個給 ForkJoinPool 物件:
- RecursiveAction: 如果你的任務沒有返回結果
- RecursiveTask: 如果你的任務返回結果
在這個指南,你將學習如何為 Fork/Join 框架實現你自己的任務,實現一個任務擴充套件ForkJoinTask類。你將要實現的任務是計量執行時間並寫入操控臺,這樣你可以控制它的進展(evolution)。你也可以實現你自己的 Fork/Join 任務來寫日誌資訊,為了獲得在這個任務中使用的資源,或者來 post-process 任務的結果。
怎麼做呢…
按照這些步驟來實現下面的例子::
//1. 建立一個類,名為 MyWorkerTask,並特別擴充套件 ForkJoinTask 類引數化 Void type。 public abstract class MyWorkerTask extends ForkJoinTask<Void> { //2. 宣告一個私有 String 屬性,名為 name,用來儲存任務的名字。 private String name; //3. 實現類的建構函式,初始化它的屬性。 public MyWorkerTask(String name) { this.name=name; } //4. 實現 getRawResult() 方法。這是 ForkJoinTask 類的抽象方法之一。由於任務不會返回任何結果,此方法返回的一定是null值。 @Override public Void getRawResult() { return null; } //5. 實現 setRawResult() 方法。這是 ForkJoinTask 類的另一個抽象方法。由於任務不會返回任何結果,方法留白即可。 @Override protected void setRawResult(Void value) { } //6. 實現 exec() 方法。這是任務的主要方法。在這個例子,把任務的演算法委託給 compute() 方法。計算方法的執行時間並寫入操控臺。 @Override protected boolean exec() { Date startDate=new Date(); compute(); Date finishDate=new Date(); long diff=finishDate.getTime()-startDate.getTime(); System.out.printf("MyWorkerTask: %s : %d Milliseconds to complete.\n",name,diff); return true; } //7. 實現 getName() 方法來返回任務的名字。 public String getName(){ return name; } //8. 宣告抽象方法 compute()。像我們之前提到的,此方法實現任務的演算法,必須是由 MyWorkerTask 類的子類實現。 protected abstract void compute(); //9. 建立一個類,名為 Task,延伸 MyWorkerTask 類。 public class Task extends MyWorkerTask { //10. 宣告一個私有 int 值 array,名為 array。 private int array[]; //11. 實現類的建構函式,初始化它的屬性值。 public Task(String name, int array[], int start, int end){ super(name); this.array=array; this.start=start; this.end=end; } //12. 實現 compute() 方法。此方法通過 start 和 end 屬性來決定增加array的元素塊。如果元素塊的元素超過100個,把它分成2部分,並建立2個Task物件來處理各個部分。再使用 invokeAll() 方法把這些任務傳送給池。 protected void compute() { if (end-start>100){ int mid=(end+start)/2; Task task1=new Task(this.getName()+"1",array,start,mid); Task task2=new Task(this.getName()+"2",array,mid,end); invokeAll(task1,task2); //13.如果元素塊的元素少於100,使用for迴圈增加全部的元素。 } else { for (int i=start; i<end; i++) { array[i]++; } //14. 最後,讓正在執行任務的執行緒進入休眠50毫秒。 try { Thread.sleep(50); } catch (InterruptedException e) { e.printStackTrace(); } } } //15. 建立例子的主類通過建立一個類,名為 Main 並新增 main()方法。 public class Main { public static void main(String[] args) throws Exception { //16. 建立一個10,000元素的 int array。 int array[]=new int[10000]; //17. 建立一個 ForkJoinPool 物件,名為 pool。 ForkJoinPool pool=new ForkJoinPool(); //18. Create a 建立一個 Task 物件來增加array的全部元素。建構函式的引數是:任務的名字 Task,array物件,和0 和10000來向這個任務表示要處整個array. Task task=new Task("Task",array,0,array.length); //19. 使用 execute() 方法傳送任務給池。 pool.invoke(task); //20. 使用 shutdown() 方法關閉池。 pool.shutdown(); //21. 在操控臺寫個資訊表明程式結束。 System.out.printf("Main: End of the program.\n");
它是怎麼工作的…
在這個指南,你實現了擴充套件 ForkJoinTask 類的 MyWorkerTask 類。這是你的基本類,它可以在 ForkJoinPool 執行者中執行,並且可以獲得這個執行者的所以好處,如 work-stealing 演算法。這個類等同於 RecursiveAction 和 RecursiveTask 類。
當你擴充套件 ForkJoinTask 類,你必須實現以下3個方法:
- setRawResult(): 此方法是用來設立任務的結果。由於你的任務不返回任何結果,所以留白即可。
- getRawResult(): 此方法是用來返回任務的結果。由於你的任務不返回任何結果,此方法會返回null值。
- exec(): 此方法實現了任務的演算法。在這個例子,你把演算法委託給抽象方法 compute() (如 RecursiveAction 類和 RecursiveTask 類),且在exec()方法你計算了方法的執行時間,並寫入到操控臺。
最後,在例子的主類,你建立了一個有10,000個元素的array,一個 ForkJoinPool 執行者,和一個處理整個array的 Task 物件。執行程式,你可以發現不同的任務執行後都在操控臺寫入他們的執行時間。
參見