1. 程式人生 > >定製併發類(八)自定義在 Fork/Join 框架中執行的任務

定製併發類(八)自定義在 Fork/Join 框架中執行的任務

宣告:本文是《 Java 7 Concurrency Cookbook 》的第七章, 作者: Javier Fernández González 譯者:鄭玉婷

自定義在 Fork/Join 框架中執行的任務

執行者框架分開了任務的建立和執行。這樣,你只要實現 Runnable 物件來使用 Executor 物件。你可以傳送 Runnable 任務給執行者,然後它會建立,管理,並終結必要的執行緒來執行這些任務。

Java 7 在 Fork/Join 框架中提供了特殊的執行者。這個框架是設計用來解決那些可以使用 divide 和 conquer 技術分成更小點的任務的問題。在一個任務內,你要檢查你要解決的問題的大小,如果它比設定的大小還大,你就把問題分成2個或多個任務,再使用框架來執行這些任務。

如果問題的大小比設定的大小要小,你可以在任務中直接解決問題,可選擇返回結果。Fork/Join 框架實現 work-stealing 演算法來提高這類問題的整體表現。

Fork/Join 框架的主要類是 ForkJoinPool 類。它內部有以下2個元素:

  • 一個等待執行的任務queue
  • 一個執行任務的執行緒池

預設情況,被 ForkJoinPool類執行的任務是 ForkJoinTask 類的物件。你也可以傳送 Runnable 和 Callable 物件給 ForkJoinPool 類,但是他們就不能獲得所以 Fork/Join 框架的好處。通常情況,你將傳送ForkJoinTask 類的這2個子類中的一個給 ForkJoinPool 物件:

  • RecursiveAction: 如果你的任務沒有返回結果
  • RecursiveTask: 如果你的任務返回結果

在這個指南,你將學習如何為 Fork/Join 框架實現你自己的任務,實現一個任務擴充套件ForkJoinTask類。你將要實現的任務是計量執行時間並寫入操控臺,這樣你可以控制它的進展(evolution)。你也可以實現你自己的 Fork/Join 任務來寫日誌資訊,為了獲得在這個任務中使用的資源,或者來 post-process 任務的結果。

怎麼做呢…

按照這些步驟來實現下面的例子::

//1.   建立一個類,名為 MyWorkerTask,並特別擴充套件 ForkJoinTask 類引數化 Void type。
public abstract class MyWorkerTask extends ForkJoinTask<Void> {

//2.   宣告一個私有 String 屬性,名為 name,用來儲存任務的名字。
private String name;

//3.   實現類的建構函式,初始化它的屬性。
public MyWorkerTask(String name) {
this.name=name;
}

//4.   實現 getRawResult() 方法。這是 ForkJoinTask 類的抽象方法之一。由於任務不會返回任何結果,此方法返回的一定是null值。
@Override
public Void getRawResult() {
return null;
}

//5.   實現 setRawResult() 方法。這是 ForkJoinTask 類的另一個抽象方法。由於任務不會返回任何結果,方法留白即可。
@Override
protected void setRawResult(Void value) {

}

//6.   實現 exec() 方法。這是任務的主要方法。在這個例子,把任務的演算法委託給 compute() 方法。計算方法的執行時間並寫入操控臺。

@Override
protected boolean exec() {
	Date startDate=new Date();
	compute();
	Date finishDate=new Date();
	long diff=finishDate.getTime()-startDate.getTime();
	System.out.printf("MyWorkerTask: %s : %d Milliseconds to complete.\n",name,diff);
	return true;
}

//7.	實現 getName() 方法來返回任務的名字。
public String getName(){
	return name;
}

//8.   宣告抽象方法 compute()。像我們之前提到的,此方法實現任務的演算法,必須是由 MyWorkerTask 類的子類實現。
protected abstract void compute();

//9.   建立一個類,名為 Task,延伸 MyWorkerTask 類。
public class Task extends MyWorkerTask {

//10. 宣告一個私有 int 值 array,名為 array。
private int array[];

//11. 實現類的建構函式,初始化它的屬性值。
public Task(String name, int array[], int start, int end){
	super(name);
	this.array=array;
	this.start=start;
	this.end=end;
}

//12. 實現 compute() 方法。此方法通過 start 和 end 屬性來決定增加array的元素塊。如果元素塊的元素超過100個,把它分成2部分,並建立2個Task物件來處理各個部分。再使用 invokeAll() 方法把這些任務傳送給池。
protected void compute() {
	if (end-start>100){
		int mid=(end+start)/2;
		Task task1=new Task(this.getName()+"1",array,start,mid);
		Task task2=new Task(this.getName()+"2",array,mid,end);
		invokeAll(task1,task2);

//13.如果元素塊的元素少於100,使用for迴圈增加全部的元素。
} else {
	for (int i=start; i<end; i++) {
		array[i]++;
	}

//14. 最後,讓正在執行任務的執行緒進入休眠50毫秒。
try {
	Thread.sleep(50);
} catch (InterruptedException e) {
	e.printStackTrace();
}
}
}

//15. 建立例子的主類通過建立一個類,名為 Main 並新增 main()方法。
public class Main {
	public static void main(String[] args) throws Exception {

//16. 建立一個10,000元素的 int array。
int array[]=new int[10000];

//17.  建立一個 ForkJoinPool 物件,名為 pool。
ForkJoinPool pool=new ForkJoinPool();

//18. Create a 建立一個 Task 物件來增加array的全部元素。建構函式的引數是:任務的名字 Task,array物件,和0 和10000來向這個任務表示要處整個array.
Task task=new Task("Task",array,0,array.length);

//19.  使用 execute() 方法傳送任務給池。
pool.invoke(task);

//20. 使用 shutdown() 方法關閉池。
pool.shutdown();

//21. 在操控臺寫個資訊表明程式結束。
System.out.printf("Main: End of the program.\n");

它是怎麼工作的…

在這個指南,你實現了擴充套件 ForkJoinTask 類的 MyWorkerTask 類。這是你的基本類,它可以在 ForkJoinPool 執行者中執行,並且可以獲得這個執行者的所以好處,如 work-stealing 演算法。這個類等同於 RecursiveAction 和 RecursiveTask 類。

當你擴充套件 ForkJoinTask 類,你必須實現以下3個方法:

  • setRawResult(): 此方法是用來設立任務的結果。由於你的任務不返回任何結果,所以留白即可。
  • getRawResult(): 此方法是用來返回任務的結果。由於你的任務不返回任何結果,此方法會返回null值。
  • exec(): 此方法實現了任務的演算法。在這個例子,你把演算法委託給抽象方法 compute() (如 RecursiveAction 類和 RecursiveTask 類),且在exec()方法你計算了方法的執行時間,並寫入到操控臺。

最後,在例子的主類,你建立了一個有10,000個元素的array,一個 ForkJoinPool 執行者,和一個處理整個array的 Task 物件。執行程式,你可以發現不同的任務執行後都在操控臺寫入他們的執行時間。

參見