1. 程式人生 > >執行緒基礎:多工處理——Fork/Join框架(基本使用)

執行緒基礎:多工處理——Fork/Join框架(基本使用)

1. 概述

在進行系統儲存專題的學習、總結、整理和寫作的過程中感覺有點累了,加上最近在做書稿的第二次校稿工作,系統儲存專題的學習和寫作進度明顯有些滯後,特別是編寫的Ceph MON知識點。所以後續1、2個月的時間打算換一個學習整理方向,之後再繼續恢復系統儲存專題的寫作。換一個什麼方向呢?嗯~~之前的執行緒專欄還有一部分坑沒有填上,就繼續填這部分的坑吧。

java.util.concurrent.ForkJoinPool由Java大師Doug Lea主持編寫,它可以將一個大的任務拆分成多個子任務進行並行處理,最後將子任務結果合併成最後的計算結果,並進行輸出。本文中對Fork/Join框架的講解,基於JDK1.8+中的Fork/Join框架實現,參考的Fork/Join框架主要原始碼也基於JDK1.8+。

這幾篇文章將試圖解釋Fork/Join框架的知識點,以便對自己、對各位讀者在併發程式的設計思路上進行一些啟發。文章將首先講解Fork/Join框架的基本使用,以及其中需要注意的使用要點;接著使用Fork/Join框架解決一些實際問題;最後再講解Fork/Join框架的工作原理。

2. Fork/Join框架基本使用

這裡是一個簡單的Fork/Join框架使用示例,在這個示例中我們計算了1-1001累加後的值:

/**
 * 這是一個簡單的Join/Fork計算過程,將1—1001數字相加
 */
public class TestForkJoinPool {

    private static final Integer MAX = 200
; static class MyForkJoinTask extends RecursiveTask<Integer> { // 子任務開始計算的值 private Integer startValue; // 子任務結束計算的值 private Integer endValue; public MyForkJoinTask(Integer startValue , Integer endValue) { this.startValue = startValue; this
.endValue = endValue; } @Override protected Integer compute() { // 如果條件成立,說明這個任務所需要計算的數值分為足夠小了 // 可以正式進行累加計算了 if(endValue - startValue < MAX) { System.out.println("開始計算的部分:startValue = " + startValue + ";endValue = " + endValue); Integer totalValue = 0; for(int index = this.startValue ; index <= this.endValue ; index++) { totalValue += index; } return totalValue; } // 否則再進行任務拆分,拆分成兩個任務 else { MyForkJoinTask subTask1 = new MyForkJoinTask(startValue, (startValue + endValue) / 2); subTask1.fork(); MyForkJoinTask subTask2 = new MyForkJoinTask((startValue + endValue) / 2 + 1 , endValue); subTask2.fork(); return subTask1.join() + subTask2.join(); } } } public static void main(String[] args) { // 這是Fork/Join框架的執行緒池 ForkJoinPool pool = new ForkJoinPool(); ForkJoinTask<Integer> taskFuture = pool.submit(new MyForkJoinTask(1,1001)); try { Integer result = taskFuture.get(); System.out.println("result = " + result); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(System.out); } } }

以上程式碼很簡單,在關鍵的位置有相關的註釋說明。這裡本文再對以上示例中的要點進行說明。首先看看以上示例程式碼的可能執行結果:

開始計算的部分:startValue = 1;endValue = 126
開始計算的部分:startValue = 127;endValue = 251
開始計算的部分:startValue = 252;endValue = 376
開始計算的部分:startValue = 377;endValue = 501
開始計算的部分:startValue = 502;endValue = 626
開始計算的部分:startValue = 627;endValue = 751
開始計算的部分:startValue = 752;endValue = 876
開始計算的部分:startValue = 877;endValue = 1001
result = 501501

2-1. 工作順序圖

下圖展示了以上程式碼的工作過程概要,但實際上Fork/Join框架的內部工作過程要比這張圖複雜得多,例如如何決定某一個recursive task是使用哪條執行緒進行執行;再例如如何決定當一個任務/子任務提交到Fork/Join框架內部後,是建立一個新的執行緒去執行還是讓它進行佇列等待。

所以如果不深入理解Fork/Join框架的執行原理,只是根據之上最簡單的使用例子觀察執行效果,那麼我們只能知道子任務在Fork/Join框架中被拆分得足夠小後,並且其內部使用多執行緒並行完成這些小任務的計算後再進行結果向上的合併動作,最終形成頂層結果。不急,一步一步來,我們先從這張概要的過程圖開始討論。

這裡寫圖片描述

圖中最頂層的任務使用submit方式被提交到Fork/Join框架中,後者將前者放入到某個執行緒中執行,工作任務中的compute方法的程式碼開始對這個任務T1進行分析。如果當前任務需要累加的數字範圍過大(程式碼中設定的是大於200),則將這個計算任務拆分成兩個子任務(T1.1和T1.2),每個子任務各自負責計算一半的資料累加,請參見程式碼中的fork方法。如果當前子任務中需要累加的數字範圍足夠小(小於等於200),就進行累加然後返回到上層任務中。

2-2. ForkJoinPool建構函式

ForkJoinPool有四個建構函式,其中引數最全的那個建構函式如下所示:

public ForkJoinPool(int parallelism,
                        ForkJoinWorkerThreadFactory factory,
                        UncaughtExceptionHandler handler,
                        boolean asyncMode)
  • parallelism:可並行級別,Fork/Join框架將依據這個並行級別的設定,決定框架內並行執行的執行緒數量。並行的每一個任務都會有一個執行緒進行處理,但是千萬不要將這個屬性理解成Fork/Join框架中最多存在的執行緒數量,也不要將這個屬性和ThreadPoolExecutor執行緒池中的corePoolSize、maximumPoolSize屬性進行比較,因為ForkJoinPool的組織結構和工作方式與後者完全不一樣。而後續的討論中,讀者還可以發現Fork/Join框架中可存在的執行緒數量和這個引數值的關係並不是絕對的關聯(有依據但並不全由它決定)。

  • factory:當Fork/Join框架建立一個新的執行緒時,同樣會用到執行緒建立工廠。只不過這個執行緒工廠不再需要實現ThreadFactory介面,而是需要實現ForkJoinWorkerThreadFactory介面。後者是一個函式式介面,只需要實現一個名叫newThread的方法。在Fork/Join框架中有一個預設的ForkJoinWorkerThreadFactory介面實現:DefaultForkJoinWorkerThreadFactory。

  • handler:異常捕獲處理器。當執行的任務中出現異常,並從任務中被丟擲時,就會被handler捕獲。

  • asyncMode:這個引數也非常重要,從字面意思來看是指的非同步模式,它並不是說Fork/Join框架是採用同步模式還是採用非同步模式工作。Fork/Join框架中為每一個獨立工作的執行緒準備了對應的待執行任務佇列,這個任務佇列是使用陣列進行組合的雙向佇列。即是說存在於佇列中的待執行任務,即可以使用先進先出的工作模式,也可以使用後進先出的工作模式。

    這裡寫圖片描述

    當asyncMode設定為ture的時候,佇列採用先進先出方式工作;反之則是採用後進先出的方式工作,該值預設為false:

    ......
    asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
    ......

ForkJoinPool還有另外兩個建構函式,一個建構函式只帶有parallelism引數,既是可以設定Fork/Join框架的最大並行任務數量;另一個建構函式則不帶有任何引數,對於最大並行任務數量也只是一個預設值——當前作業系統可以使用的CPU核心數量(Runtime.getRuntime().availableProcessors())。實際上ForkJoinPool還有一個私有的、原生建構函式,之上提到的三個建構函式都是對這個私有的、原生建構函式的呼叫。

......
private ForkJoinPool(int parallelism,
                         ForkJoinWorkerThreadFactory factory,
                         UncaughtExceptionHandler handler,
                         int mode,
                         String workerNamePrefix) {
        this.workerNamePrefix = workerNamePrefix;
        this.factory = factory;
        this.ueh = handler;
        this.config = (parallelism & SMASK) | mode;
        long np = (long)(-parallelism); // offset ctl counts
        this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
    }
......

如果你對Fork/Join框架沒有特定的執行要求,可以直接使用不帶有任何引數的建構函式。也就是說推薦基於當前作業系統可以使用的CPU核心數作為Fork/Join框架內最大並行任務數量,這樣可以保證CPU在處理並行任務時,儘量少發生任務執行緒間的執行狀態切換(實際上單個CPU核心上的執行緒間狀態切換基本上無法避免,因為作業系統同時執行多個執行緒和多個程序)。

2-3. fork方法和join方法

Fork/Join框架中提供的fork方法和join方法,可以說是該框架中提供的最重要的兩個方法,它們和parallelism“可並行任務數量”配合工作,可以導致拆分的子任務T1.1、T1.2甚至TX在Fork/Join框架中不同的執行效果。例如TX子任務或等待其它已存在的執行緒執行關聯的子任務,或在執行TX的執行緒中“遞迴”執行其它任務,又或者啟動一個新的執行緒執行子任務……

fork方法用於將新建立的子任務放入當前執行緒的work queue佇列中,Fork/Join框架將根據當前正在併發執行ForkJoinTask任務的ForkJoinWorkerThread執行緒狀態,決定是讓這個任務在佇列中等待,還是建立一個新的ForkJoinWorkerThread執行緒執行它,又或者是喚起其它正在等待任務的ForkJoinWorkerThread執行緒執行它。

這裡面有幾個元素概念需要注意,ForkJoinTask任務是一種能在Fork/Join框架中執行的特定任務,也只有這種型別的任務可以在Fork/Join框架中被拆分執行和合並執行。ForkJoinWorkerThread執行緒是一種在Fork/Join框架中執行的特性執行緒,它除了具有普通執行緒的特性外,最主要的特點是每一個ForkJoinWorkerThread執行緒都具有一個獨立的任務等待佇列(work queue),這個任務佇列用於儲存在本執行緒中被拆分的若干子任務。

這裡寫圖片描述

join方法用於讓當前執行緒阻塞,直到對應的子任務完成執行並返回執行結果。或者,如果這個子任務存在於當前執行緒的任務等待佇列(work queue)中,則取出這個子任務進行“遞迴”執行。其目的是儘快得到當前子任務的執行結果,然後繼續執行。

關注微信公眾號和今日頭條,精彩文章持續更新中。。。。。




相關推薦

執行基礎處理——Fork/Join框架基本使用

1. 概述在進行系統儲存專題的學習、總結、整理和寫作的過程中感覺有點累了,加上最近在做書稿的第二次校稿工作,系統儲存專題的學習和寫作進度明顯有些滯後,特別是編寫的Ceph MON知識點。所以後續1、2個月的時間打算換一個學習整理方向,之後再繼續恢復系統儲存專題的寫作。換一個什

執行基礎處理18——MESI協議以及帶來的問題volatile關鍵字

===================== (接上文《執行緒基礎:多工處理(18)——MESI協議以及帶來的問題:偽共享》) 4、volatile關鍵字及其使用 4.1、volatile關鍵字使用場景 volatile關鍵字有以下幾大場景: 用於多執行緒狀態下只

執行基礎處理18——MESI協議以及帶來的問題偽共享

1、概述 本文和後續文章將著眼CPU的工作原理闡述偽共享的解決方法和volatile關鍵字的應用。 2、複習CPU工作原理 2.1、CPU工作原理 要清楚理解本文後續內容,就需要首先重新概述一下JVM的記憶體工作原理。當然JVM的記憶體模型是一個可以專門作為另一

Fork/Join框架非同步執行任務

宣告:本文是《 Java 7 Concurrency Cookbook 》的第五章,作者: Javier Fernández González     譯者:許巧輝 校對:方騰飛 非同步執行任務 當你在ForkJoinPool中執行ForkJoinTask時,你可以使用同步或非同步方式來實現。

Fork/Join框架加入任務的結果

宣告:本文是《 Java 7 Concurrency Cookbook 》的第五章,作者: Javier Fernández González     譯者:許巧輝 校對:方騰飛 加入任務的結果 Fork/Join框架提供了執行返回一個結果的任務的能力。這些任務的型別是實現了Recursive

Java Fork Join 框架實現

作者:Doug Lea  譯者:Alex  校對:方騰飛 這個框架是由大約800行純Java程式碼組成,主要的類是FJTaskRunner,它是java.lang.Thread的子類。FJTasks 自己僅僅維持一個關於結束狀態的布林值,所有其他的操作都是通過當前的工作執行緒來代理完成的。J

Fork/Join框架在任務中丟擲異常

宣告:本文是《 Java 7 Concurrency Cookbook 》的第五章,作者: Javier Fernández González     譯者:許巧輝 校對:方騰飛 在任務中丟擲異常 在Java中有兩種異常: 已檢查異常(Checked exceptions):這些異常必須在一

Fork/Join框架建立一個Fork/Join

宣告:本文是《 Java 7 Concurrency Cookbook 》的第五章,作者: Javier Fernández González     譯者:許巧輝 校對:方騰飛 建立一個Fork/Join池 在這個指南中,你將學習如何使用Fork/Join框架的基本元素。它包括: 建立一個

Java Fork Join 框架效能

4效能 如今,隨著編譯器與Java虛擬機器效能的不斷提升,效能測試結果也僅僅只能適用一時。但是,本節中所提到的測試結果資料卻能揭示Fork/join框架的基本特性。 下面表格中簡單介紹了在下文將會用到的一組fork/join測試程式。這些程式是從util.concurrent包裡的示例程式碼

Fork/Join框架引言

宣告:本文是《 Java 7 Concurrency Cookbook 》的第五章,作者: Javier Fernández González     譯者:許巧輝 校對:方騰飛 在這個章節中,我們將覆蓋: 引言 通常,當你實現一個簡單的併發應用程式,你實現一些Runnable物件和相應的

Fork/Join框架取消任務

宣告:本文是《 Java 7 Concurrency Cookbook 》的第五章,作者: Javier Fernández González     譯者:許巧輝 校對:方騰飛 取消任務 當你在一個ForkJoinPool類中執行ForkJoinTask物件,在它們開始執行之前,你可以取消執

執行進階處理17——Java中的鎖Unsafe基礎

1. 概述 本專題在之前的文章中詳細介紹了Java中最常使用的一種鎖機制——同步鎖。但是同步鎖肯定是不適合在所有應用場景中使用的。所以從本文開始,筆者將試圖通過兩到三篇文章的篇幅向讀者介紹Java中鎖的分類、原理和底層實現。以便大家在實際工作中根據應用場景進行

C# 基礎十四C#單例模式首先介紹 單執行執行、加鎖 單例模式。然後介紹單例模式的執行同步執行有序訪問共享記憶體。

一、簡介 本篇文章將介紹如何使用單例模式,也就是類的例項化,在整個專案的生命週期內,只例項化一次。在單例模式中,往往可以看到如SourceCode.cs:這樣的結構的。 SourceCode.cs: public class Singleton { private static

執行基礎7 執行與併發庫

多執行緒與併發庫 BlockingQueue佇列 BlockingQueue,如果BlockQueue是空的,從BlockingQueue取東西的操作將會被阻斷進入等待狀態,直到BlockingQueue進了東西才會被喚醒.同樣,如果BlockingQueue是滿的,任何試圖往裡存東西的操作

執行執行的問題

1、競態條件         當計算結果的正確性取決於相對時間或者排程器控制的多執行緒交叉時,就會發生競態條件。這句話可能對初次接觸執行緒的讀者來說不太好理解,其實競態條件有兩個相對比較好理解的描述,一個是check-then-act,另外一個是read-modify-wr

Java執行11執行同步操作

什麼情況下需要同步 1、當多執行緒併發,有多段程式碼同時執行時,有時希望某一段程式碼執行的過程中CPU不要切換到其他執行緒工作。這時就需要執行緒同步。 2、如果兩段程式碼是同步的,那麼同一段時間只能執行

Java執行12執行的死鎖

Java執行緒死鎖是一個經典的多執行緒問題,因為不同的執行緒都在等待根本不可能被釋放的鎖,從而導致所有的任務都無法繼續完成。在多執行緒技術中,“死鎖”是必須避免的,因為這會造成執行緒的“假死”。 pac

Java執行-----執行池的使用,原理以及舉例實現使用樣例及如何配置執行池大小

三.使用示例   前面我們討論了關於執行緒池的實現原理,這一節我們來看一下它的具體使用: public class Test { public static void main(String[] args) { ThreadPoolExe

執行基礎執行5——基本使用

執行緒基礎:執行緒池(5)——基本使用(上) :http://blog.csdn.net/yinwenjie(未經允許嚴禁用於商業用途!) https://blog.csdn.net/yinwenjie/article/details/50522458 1、概述 從本文開始,我將用兩篇文

Java執行20執行下的其他元件之CyclicBarrier、Callable、Future和FutureTask

CyclicBarrier         接著講多執行緒下的其他元件,第一個要講的就是CyclicBarrier。CyclicBarrier從字面理解是指迴圈屏障,它可以協同多個執行緒,讓多個執行緒在這個屏障前等待,直到所有執行緒都達到了這個屏障時,再一