監控fork/join池
監控fork/join池
Executor框架將任務實現與執行緒建立和執行任務的管理分離開來。Java提供的fork/join框架擴充套件了執行器框架,用於解決特定型別的問題,以提高其它解決方案(直接使用Thread物件或者執行器框架)的效能。
fork/join框架旨在解決使用fork()和join()操作能夠分解為更小任務的問題。實現此特性的主類是ForkJoinPool。
本節將學習關於ForkJoinPool執行器的狀態資訊,以及如何獲取這些資訊。
準備工作
本範例通過Eclipse開發工具實現。如果使用諸如NetBeans的開發工具,開啟並建立一個新的Java專案。
實現過程
通過如下步驟實現範例:
-
建立名為Task的類,繼承RecursiveAction類:
public class Task extends RecursiveAction{
-
宣告名為array的私有int陣列屬性,儲存待增加的元素陣列:
private final int array[];
-
宣告名為start和end的兩個私有int屬性,儲存需要處理的任務元素塊的開始和結束位置:
private final
-
實現類建構函式,初始化屬性:
public Task (int array[], int start, int end) { this.array=array; this.start=start; this.end=end; }
-
實現compute()方法,包含任務的主要邏輯。如果任務需要處理超過100個元素,首先將元素拆分成兩部分,建立兩個任務分別執行每部分,使用fork()方法開始執行任務,最後使用join()方法等待任務結束:
@Override protected void compute
-
否則,執行增加元素的操作,並在每次操作之後,設定執行緒休眠5毫秒:
} else { for (int i=start; i<end; i++) { array[i]++; try { Thread.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } } } } }
-
實現本範例主類,建立名為Main的類,包含main()方法:
public class Main { public static void main(String[] args) throws Exception{
-
建立名為pool的ForkJoinPool物件:
ForkJoinPool pool=new ForkJoinPool();
-
建立名為array的整型數字陣列,包含10000個元素:
int array[]=new int[10000];
-
建立新Task物件,處理整個陣列:
Task task1=new Task(array,0,array.length);
-
使用execute()犯法傳送執行任務到池中:
pool.execute(task1);
-
如果任務尚未執行結束,呼叫showLog()方法輸出ForkJoinPool類狀態資訊到控制檯,然後設定執行緒休眠1秒鐘:
while (!task1.isDone()) { showLog(pool); TimeUnit.SECONDS.sleep(1); }
-
使用shutdown()方法關閉池:
pool.shutdown();
-
使用awaitTermination()等待池執行結束:
pool.awaitTermination(1, TimeUnit.DAYS);
-
呼叫showLog()方法輸出ForkJoinPool類狀態資訊到控制檯,然後輸出指明程式結束的資訊到控制檯:
showLog(pool); System.out.printf("Main: End of the program.\n"); }
-
實現showLog()方法,將ForkJoinPool物件作為引數接收,輸出其狀態和執行緒,以及正在執行的任務資訊到控制檯:
private static void showLog(ForkJoinPool pool) { System.out.printf("**********************\n"); System.out.printf("Main: Fork/Join Pool log\n"); System.out.printf("Main: Fork/Join Pool: Parallelism: %d\n", pool.getParallelism()); System.out.printf("Main: Fork/Join Pool: Pool Size: %d\n", pool.getPoolSize()); System.out.printf("Main: Fork/Join Pool: Active Thread Count: %d\n", pool.getActiveThreadCount()); System.out.printf("Main: Fork/Join Pool: Running Thread Count: %d\n", pool.getRunningThreadCount()); System.out.printf("Main: Fork/Join Pool: Queued Submission: %d\n", pool.getQueuedSubmissionCount()); System.out.printf("Main: Fork/Join Pool: Queued Tasks: %d\n", pool.getQueuedTaskCount()); System.out.printf("Main: Fork/Join Pool: Queued Submissions: %s\n", pool.hasQueuedSubmissions()); System.out.printf("Main: Fork/Join Pool: Steal Count: %d\n", pool.getStealCount()); System.out.printf("Main: Fork/Join Pool: Terminated : %s\n", pool.isTerminated()); System.out.printf("**********************\n"); } }
工作原理
本節實現了遞增陣列元素的任務,使用到ForkJoinPool類和繼承RecursiveAction類的Task類。這是能夠在ForkJoinPool類中執行的任務之一,當任務處理陣列時,輸出ForkJoinPool類的狀態資訊到控制檯。通過使用如下方法得到ForkJoinPool類的狀態資訊:
- getPoolSize():返回ForkJoinPool類內部池的工作執行緒數量的int值
- getParallelism():返回為池建立的所需的並行級別
- getActiveThreadCount():返回當前執行任務的執行緒數量
- getRunningThreadCount():返回在任何同步機制中均未阻塞的工作執行緒
- getQueuedSubmissionCount():返回已經進入池中,但尚未開始執行的任務數量
- getQueuedTaskCount():返回已經進入池中且開始執行的任務數量
- hasQueuedSubmissions():返回Boolean值,指明池是否有尚未開始執行的佇列任務的
- getStealCount():返回long值,指定工作執行緒從其它執行緒處竊取任務的次數
- isTerminated():返回Boolean值,指明fork/join框架是否已經完成執行
更多關注
- 第五章“Fork/Join框架”中的“建立fork/join池”小節
- 第八章“定製併發類”中的“實現為fork/join框架生成自定義執行緒的ThreadFactory介面”和“定製在fork/join框架中執行的任務”小節