測試併發應用(四)監控Fork/Join池
宣告:本文是《 Java 7 Concurrency Cookbook 》的第八章, 作者: Javier Fernández González 譯者:鄭玉婷
監控Fork/Join池
Executor 框架提供了執行緒的建立和管理,來實現任務的執行機制。Java 7 包括了一個 Executor 框架的延伸為了一種特別的問題而使用的,將比其他解決方案的表現有所提升(可以直接使用 Thread 物件或者 Executor 框架)。它是 Fork/Join 框架。
此框架是為了解決可以使用 divide 和 conquer 技術,使用 fork() 和 join() 操作把任務分成小塊的問題而設計的。主要的類實現這個行為的是 ForkJoinPool 類。
在這個指南,你將學習從ForkJoinPool類可以獲取的資訊和如何獲取這些資訊。
準備
指南中的例子是使用Eclipse IDE 來實現的。如果你使用Eclipse 或者其他的IDE,例如NetBeans, 開啟並建立一個新的java任務。
怎麼做呢…
按照這些步驟來實現下面的例子:
//1. 建立一個類,名為 Task, 擴充套件 RecursiveAction 類。
public class Task extends RecursiveAction { // 2. 宣告一個私有 int array 屬性,名為 array,用來儲存你要增加的 array 的元素。 private int[] array; // 3. 宣告2個私有 int 屬性,名為 start 和 end,用來儲存 此任務已經處理的元素塊的頭和尾的位置。 private int start; private int end; // 4. 實現類的建構函式,初始化屬性值。 public Task(int array[], int start, int end) { this.array = array; this.start = start; this.end = end; } // 5. 用任務的中心邏輯來實現 compute() // 方法。如果此任務已經處理了超過100任務,那麼把元素集分成2部分,再建立2個任務分別來執行這些部分,使用 fork() 方法開始執行,並使用 // join() 方法等待它的終結。 protected void compute() { if (end - start > 100) { int mid = (start + end) / 2; Task task1 = new Task(array, start, mid); Task task2 = new Task(array, mid, end); task1.fork(); task2.fork(); task1.join(); task2.join(); // 6. 如果任務已經處理了100個元素或者更少,那麼在每次操作之後讓執行緒進入休眠5毫秒來增加元素。 } else { for (int i = start; i < end; i++) { array[i]++; try { Thread.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } } } } }
//7. 建立例子的主類通過建立一個類,名為 Main 並新增 main()方法。
import java.util.concurrent.ForkJoinPool; import java.util.concurrent.TimeUnit; public class Main { public static void main(String[] args) throws Exception { // 8. 建立 ForkJoinPool 物件,名為 pool。 ForkJoinPool pool = new ForkJoinPool(); // 9. 建立 10,000個元素的整數 array ,名為 array。 int array[] = new int[10000]; // 10. 建立新的 Task 物件來處理整個array。 Task task1 = new Task(array, 0, array.length); // 11. 使用 execute() 方法 把任務傳送到pool裡執行。 pool.execute(task1); // 12. 當任務還未結束執行,呼叫 showLog() 方法來把 ForkJoinPool 類的狀態資訊寫入,然後讓執行緒休眠一秒。 while (!task1.isDone()) { showLog(pool); TimeUnit.SECONDS.sleep(1); } // 13. 使用 shutdown() 方法關閉pool。 pool.shutdown(); // 14. 使用 awaitTermination() 方法 等待pool的終結。 pool.awaitTermination(1, TimeUnit.DAYS); // 15. 呼叫 showLog() 方法寫關於 ForkJoinPool 類狀態的資訊並寫一條資訊到操控臺表明結束程式。 showLog(pool); System.out.printf("Main: End of the program.\n"); } // 16. 實現 showLog() 方法。它接收 ForkJoinPool 物件作為引數和寫關於執行緒和任務的執行的狀態的資訊。 private static void showLog(ForkJoinPool pool) { System.out.printf("**********************\n"); System.out.printf("Main: Fork/Join Pool log\n"); System.out.printf("Main: Fork/Join Pool: Parallelism:%d\n", pool.getParallelism()); System.out.printf("Main: Fork/Join Pool: Pool Size:%d\n", pool.getPoolSize()); System.out.printf("Main: Fork/Join Pool: Active Thread Count:%d\n", pool.getActiveThreadCount()); System.out.printf("Main: Fork/Join Pool: Running Thread Count:%d\n", pool.getRunningThreadCount()); System.out.printf("Main: Fork/Join Pool: Queued Submission:%d\n", pool.getQueuedSubmissionCount()); System.out.printf("Main: Fork/Join Pool: Queued Tasks:%d\n", pool.getQueuedTaskCount()); System.out.printf("Main: Fork/Join Pool: Queued Submissions:%s\n", pool.hasQueuedSubmissions()); System.out.printf("Main: Fork/Join Pool: Steal Count:%d\n", pool.getStealCount()); System.out.printf("Main: Fork/Join Pool: Terminated :%s\n", pool.isTerminated()); System.out.printf("**********************\n"); } }
它是如何工作的…
在這個指南,你實現了任務 使用 ForkJoinPool 類和一個擴充套件 RecursiveAction 類的 Task 類來增加array的元素;它是 ForkJoinPool 類執行的任務種類之一。當任務還在處理array時,你就把關於 ForkJoinPool 類的狀態資訊列印到操控臺。
你使用了ForkJoinPool類中的以下方法:
- getPoolSize(): 此方法返回 int 值,它是ForkJoinPool內部執行緒池的worker執行緒們的數量。
- getParallelism(): 此方法返回池的並行的級別。
- getActiveThreadCount(): 此方法返回當前執行任務的執行緒的數量。
- getRunningThreadCount():此方法返回沒有被任何同步機制阻塞的正在工作的執行緒。
- getQueuedSubmissionCount(): 此方法返回已經提交給池還沒有開始他們的執行的任務數。
- getQueuedTaskCount(): 此方法返回已經提交給池已經開始他們的執行的任務數。
- hasQueuedSubmissions(): 此方法返回 Boolean 值,表明這個池是否有queued任務還沒有開始他們的執行。
- getStealCount(): 此方法返回 long 值,worker 執行緒已經從另一個執行緒偷取到的時間數。
- isTerminated(): 此方法返回 Boolean 值,表明 fork/join 池是否已經完成執行。
參見
第五章,Fork/Join 框架: Fork/Join 池的建立
第七章,自定義併發類:實現 ThreadFactory 介面生成Fork/Join 框架的自定義執行緒
第七章,自定義併發類:自定義在the Fork/Join框架的執行任務