淺談ForkJoinPool
ForkJoinPool是什麼?
談到執行緒池,很多人會想到Executors提供的一些預設的執行緒池,比如單執行緒執行緒池SingleThreadExecutor
,固定大小的執行緒池FixedThreadPool
,但是很少有人會注意到其中還提供了一種特殊的執行緒池:WorkStealingPool
,我們點進這個方法,會看到和其他方法不同的是,這種執行緒池並不是通過ThreadPoolExecutor
來建立的,而是ForkJoinPool
來建立的:
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),ForkJoinPool.defaultForkJoinWorkerThreadFactory,null,true );
}
複製程式碼
這兩種執行緒池之間並不是繼承關係,而是平級關係:
ThreadPoolExecutor應該都很瞭解了,就是一個基本的儲存執行緒的執行緒池,需要執行任務的時候就從執行緒池中拿一個執行緒來執行。而ForkJoinPool則不僅僅是這麼簡單,同樣也不是ThreadPoolExecutor的代替品,這種執行緒池是為了實現“分治法”這一思想而建立的,通過把大任務拆分成小任務,然後再把小任務的結果彙總起來就是最終的結果,和MapReduce的思想很類似舉個例子,我們要統計1-100的累加和,如果使用ForkJoinPool來實現的話,就可以將1-100每5位劃分一段,劃分出20段,當作20個任務,每個任務只計算自己區間內的結果,最後將這20個任務的結果彙總起來就是1-100的累加和
ForkJoinPool怎麼使用?
ForkJoinPool的本質就是兩點:
- 如果任務很小:直接計算得出結果
-
如果任務很大:
- 拆分成N個子任務
- 呼叫子任務的fork()進行計算
- 呼叫子任務的join()合併結果
接來下我們來做一個1-100的累加例子:
- 首先定義我們需要執行的任務:
class Task extends RecursiveTask<Integer> {
private int start;
private int end;
private int mid;
public Task(int start,int end) {
this.start = start;
this.end = end;
}
@Override
protected Integer compute () {
int sum = 0;
if (end - start < 6) {
// 當任務很小時,直接進行計算
for (int i = start; i <= end; i++) {
sum += i;
}
System.out.println(Thread.currentThread().getName() + " count sum: " + sum);
} else {
// 否則,將任務進行拆分
mid = (end - start) / 2 + start;
Task left = new Task(start,mid);
Task right = new Task(mid + 1,end);
// 執行上一步拆分的子任務
left.fork();
right.fork();
// 拿到子任務的執行結果
sum += left.join();
sum += right.join();
}
return sum;
}
}
複製程式碼
這裡的RecursiveTask
是ForkJoinTask
的子類,ForkJoinTask
又是Future
的子類,不瞭解Future類的可以認為是一個非同步執行,並且可以有返回值的Runnable類
我們首先在Task類中定義了任務需要的一些資料,比如開始位置和結束位置。重點是其中的compute方法,在其中實現了我們剛才說到的步驟,如果任務很小(通過任務資料來判斷),就進行計算,否則將任務拆分,使用fork()執行,並通過join()拿到計算結果
- 將任務提交到執行緒池
剛才我們定義了任務類,接下來就需要把這個任務提交到執行緒池:
public static void main(String[] args) throws ExecutionException,InterruptedException {
ForkJoinPool forkJoinPool = new ForkJoinPool();
Task countTask = new Task(1,100);
ForkJoinTask<Integer> result = forkJoinPool.submit(countTask);
System.out.println("result: " + result.get());
forkJoinPool.shutdown();
}
複製程式碼
注意,這裡ForkJoinPool初始化可以傳入一個並行引數,如果不傳入該引數的話會預設使用處理器個數來作為並行引數
建立任務物件和執行緒池之後,使用submit方法來提交任務,該方法會返回一個ForkJoinTask<T>
型別的物件,呼叫其get方法即可拿到執行結果
同時要注意,該執行緒池也需要呼叫shutdown方法來關閉
ForkJoinPool的原理
ForkJoinPool中有三個重要角色:
- ForkJoinWorkerThread:工作執行緒,在內部對Thread進行的封裝
- WorkQueue:任務佇列
- ForkJoinTask:任務,繼承自Future,在含義上分為submission和task兩種
線上程池中,任務佇列使用陣列來儲存,其中儲存了所有提交進來的任務:
- 在奇數位置儲存submission
- 在偶數位置儲存task
submission
指的是本地提交的任務,如submit、execute提交的任務;而task
則是通過fork方法新增的子任務。這兩種任務僅僅在含義上有所區別,所以一同儲存在任務佇列中,通過位置進行區分
ForkJoinPool的核心
想理解ForkJoinPool的原理,就要理解其核心,一共有兩點,其一是分治法,其二就是工作竊取演演算法。分治法相信就不用多說了,就是通過把大任務拆分成小任務來提高併發度。重點要說的就是工作竊取演演算法,該演演算法的原理:
所有執行緒均嘗試找到並執行已提交的任務,或是通過其他任務建立的子任務
依賴於這種特性,來儘量避免一個執行緒執行完自己的任務後“無所事事”的情況。同時,竊取順序是FIFO的