1. 程式人生 > >監控fork/join池

監控fork/join池

Java 9併發程式設計指南 目錄

監控fork/join池

Executor框架將任務實現與執行緒建立和執行任務的管理分離開來。Java提供的fork/join框架擴充套件了執行器框架,用於解決特定型別的問題,以提高其它解決方案(直接使用Thread物件或者執行器框架)的效能。

fork/join框架旨在解決使用fork()和join()操作能夠分解為更小任務的問題。實現此特性的主類是ForkJoinPool。

本節將學習關於ForkJoinPool執行器的狀態資訊,以及如何獲取這些資訊。

準備工作

本範例通過Eclipse開發工具實現。如果使用諸如NetBeans的開發工具,開啟並建立一個新的Java專案。

實現過程

通過如下步驟實現範例:

  1. 建立名為Task的類,繼承RecursiveAction類:

    public class Task extends RecursiveAction{
    
  2. 宣告名為array的私有int陣列屬性,儲存待增加的元素陣列:

    	private final int array[];
    
  3. 宣告名為start和end的兩個私有int屬性,儲存需要處理的任務元素塊的開始和結束位置:

    	private final
    int start; private final int end;
  4. 實現類建構函式,初始化屬性:

    	public Task (int array[], int start, int end) {
    		this.array=array;
    		this.start=start;
    		this.end=end;
    	}
    
  5. 實現compute()方法,包含任務的主要邏輯。如果任務需要處理超過100個元素,首先將元素拆分成兩部分,建立兩個任務分別執行每部分,使用fork()方法開始執行任務,最後使用join()方法等待任務結束:

    	@Override
    	protected void compute
    () { if (end-start>100) { int mid=(start+end)/2; Task task1=new Task(array,start,mid); Task task2=new Task(array,mid,end); task1.fork(); task2.fork(); task1.join(); task2.join();
  6. 否則,執行增加元素的操作,並在每次操作之後,設定執行緒休眠5毫秒:

    		} else {
    			for (int i=start; i<end; i++) {
    				array[i]++;
    				try {
    					Thread.sleep(5);
    				} catch (InterruptedException e) {
    					e.printStackTrace();
    				}
    			}
    		}
    	}
    }
    
  7. 實現本範例主類,建立名為Main的類,包含main()方法:

    public class Main {
    	public static void main(String[] args) throws Exception{
    
  8. 建立名為pool的ForkJoinPool物件:

    		ForkJoinPool pool=new ForkJoinPool();
    
  9. 建立名為array的整型數字陣列,包含10000個元素:

    		int array[]=new int[10000];
    
  10. 建立新Task物件,處理整個陣列:

    		Task task1=new Task(array,0,array.length);
    
  11. 使用execute()犯法傳送執行任務到池中:

    		pool.execute(task1);
    
  12. 如果任務尚未執行結束,呼叫showLog()方法輸出ForkJoinPool類狀態資訊到控制檯,然後設定執行緒休眠1秒鐘:

    		while (!task1.isDone()) {
    			showLog(pool);
    			TimeUnit.SECONDS.sleep(1);
    		}
    
  13. 使用shutdown()方法關閉池:

    		pool.shutdown();
    
  14. 使用awaitTermination()等待池執行結束:

    		pool.awaitTermination(1, TimeUnit.DAYS);
    
  15. 呼叫showLog()方法輸出ForkJoinPool類狀態資訊到控制檯,然後輸出指明程式結束的資訊到控制檯:

    		showLog(pool);
    		System.out.printf("Main: End of the program.\n");
    	}
    
  16. 實現showLog()方法,將ForkJoinPool物件作為引數接收,輸出其狀態和執行緒,以及正在執行的任務資訊到控制檯:

    	private static void showLog(ForkJoinPool pool) {
    		System.out.printf("**********************\n");
    		System.out.printf("Main: Fork/Join Pool log\n");
    		System.out.printf("Main: Fork/Join Pool: Parallelism: %d\n", pool.getParallelism());
    		System.out.printf("Main: Fork/Join Pool: Pool Size: %d\n", pool.getPoolSize());
    		System.out.printf("Main: Fork/Join Pool: Active Thread Count: %d\n", pool.getActiveThreadCount());
    		System.out.printf("Main: Fork/Join Pool: Running Thread Count: %d\n", pool.getRunningThreadCount());
    		System.out.printf("Main: Fork/Join Pool: Queued Submission: %d\n", pool.getQueuedSubmissionCount());
    		System.out.printf("Main: Fork/Join Pool: Queued Tasks: %d\n", pool.getQueuedTaskCount());
    		System.out.printf("Main: Fork/Join Pool: Queued Submissions: %s\n", pool.hasQueuedSubmissions());
    		System.out.printf("Main: Fork/Join Pool: Steal Count: %d\n", pool.getStealCount());
    		System.out.printf("Main: Fork/Join Pool: Terminated : %s\n", pool.isTerminated());
    		System.out.printf("**********************\n");
    	}
    }
    

工作原理

本節實現了遞增陣列元素的任務,使用到ForkJoinPool類和繼承RecursiveAction類的Task類。這是能夠在ForkJoinPool類中執行的任務之一,當任務處理陣列時,輸出ForkJoinPool類的狀態資訊到控制檯。通過使用如下方法得到ForkJoinPool類的狀態資訊:

  • getPoolSize():返回ForkJoinPool類內部池的工作執行緒數量的int值
  • getParallelism():返回為池建立的所需的並行級別
  • getActiveThreadCount():返回當前執行任務的執行緒數量
  • getRunningThreadCount():返回在任何同步機制中均未阻塞的工作執行緒
  • getQueuedSubmissionCount():返回已經進入池中,但尚未開始執行的任務數量
  • getQueuedTaskCount():返回已經進入池中且開始執行的任務數量
  • hasQueuedSubmissions():返回Boolean值,指明池是否有尚未開始執行的佇列任務的
  • getStealCount():返回long值,指定工作執行緒從其它執行緒處竊取任務的次數
  • isTerminated():返回Boolean值,指明fork/join框架是否已經完成執行

更多關注

  • 第五章“Fork/Join框架”中的“建立fork/join池”小節
  • 第八章“定製併發類”中的“實現為fork/join框架生成自定義執行緒的ThreadFactory介面”和“定製在fork/join框架中執行的任務”小節