1. 程式人生 > >【高併發】什麼是ForkJoin?看這一篇就夠了!

【高併發】什麼是ForkJoin?看這一篇就夠了!

寫在前面

在JDK中,提供了這樣一種功能:它能夠將複雜的邏輯拆分成一個個簡單的邏輯來並行執行,待每個並行執行的邏輯執行完成後,再將各個結果進行彙總,得出最終的結果資料。有點像Hadoop中的MapReduce。

ForkJoin是由JDK1.7之後提供的多執行緒併發處理框架。ForkJoin框架的基本思想是分而治之。什麼是分而治之?分而治之就是將一個複雜的計算,按照設定的閾值分解成多個計算,然後將各個計算結果進行彙總。相應的,ForkJoin將複雜的計算當做一個任務,而分解的多個計算則是當做一個個子任務來並行執行。

Java併發程式設計的發展

對於Java語言來說,生來就支援多執行緒併發程式設計,在併發程式設計領域也是在不斷髮展的。Java在其發展過程中對併發程式設計的支援越來越完善也正好印證了這一點。

  • Java 1 支援thread,synchronized。
  • Java 5 引入了 thread pools, blocking queues, concurrent collections,locks, condition queues。
  • Java 7 加入了fork-join庫。
  • Java 8 加入了 parallel streams。

併發與並行

併發和並行在本質上還是有所區別的。

併發

併發指的是在同一時刻,只有一個執行緒能夠獲取到CPU執行任務,而多個執行緒被快速的輪換執行,這就使得在巨集觀上具有多個執行緒同時執行的效果,併發不是真正的同時執行,併發可以使用下圖表示。

並行

並行指的是無論何時,多個執行緒都是在多個CPU核心上同時執行的,是真正的同時執行。

分治法

基本思想

把一個規模大的問題劃分為規模較小的子問題,然後分而治之,最後合併子問題的解得到原問題的解。

步驟

①分割原問題;

②求解子問題;

③合併子問題的解為原問題的解。

我們可以使用如下虛擬碼來表示這個步驟。

if(任務很小){
    直接計算得到結果
}else{
    分拆成N個子任務
    呼叫子任務的fork()進行計算
    呼叫子任務的join()合併計算結果
}

在分治法中,子問題一般是相互獨立的,因此,經常通過遞迴呼叫演算法來求解子問題。

典型應用

  • 二分搜尋
  • 大整數乘法
  • Strassen矩陣乘法
  • 棋盤覆蓋
  • 合併排序
  • 快速排序
  • 線性時間選擇
  • 漢諾塔

ForkJoin並行處理框架

ForkJoin框架概述

Java 1.7 引入了一種新的併發框架—— Fork/Join Framework,主要用於實現“分而治之”的演算法,特別是分治之後遞迴呼叫的函式。

ForkJoin框架的本質是一個用於並行執行任務的框架, 能夠把一個大任務分割成若干個小任務,最終彙總每個小任務結果後得到大任務的計算結果。在Java中,ForkJoin框架與ThreadPool共存,並不是要替換ThreadPool

其實,在Java 8中引入的並行流計算,內部就是採用的ForkJoinPool來實現的。例如,下面使用並行流實現列印陣列元組的程式。

public class SumArray {
    public static void main(String[] args){
        List<Integer> numberList = Arrays.asList(1,2,3,4,5,6,7,8,9);
        numberList.parallelStream().forEach(System.out::println);
    }
}

這段程式碼的背後就使用到了ForkJoinPool。

說到這裡,可能有讀者會問:可以使用執行緒池的ThreadPoolExecutor來實現啊?為什麼要使用ForkJoinPool啊?ForkJoinPool是個什麼鬼啊?! 接下來,我們就來回答這個問題。

ForkJoin框架原理

ForkJoin框架是從jdk1.7中引入的新特性,它同ThreadPoolExecutor一樣,也實現了Executor和ExecutorService介面。它使用了一個無限佇列來儲存需要執行的任務,而執行緒的數量則是通過建構函式傳入,如果沒有向建構函式中傳入指定的執行緒數量,那麼當前計算機可用的CPU數量會被設定為執行緒數量作為預設值。

ForkJoinPool主要使用分治法(Divide-and-Conquer Algorithm)來解決問題。典型的應用比如快速排序演算法。這裡的要點在於,ForkJoinPool能夠使用相對較少的執行緒來處理大量的任務。比如要對1000萬個資料進行排序,那麼會將這個任務分割成兩個500萬的排序任務和一個針對這兩組500萬資料的合併任務。以此類推,對於500萬的資料也會做出同樣的分割處理,到最後會設定一個閾值來規定當資料規模到多少時,停止這樣的分割處理。比如,當元素的數量小於10時,會停止分割,轉而使用插入排序對它們進行排序。那麼到最後,所有的任務加起來會有大概200萬+個。問題的關鍵在於,對於一個任務而言,只有當它所有的子任務完成之後,它才能夠被執行。

所以當使用ThreadPoolExecutor時,使用分治法會存在問題,因為ThreadPoolExecutor中的執行緒無法向任務佇列中再新增一個任務並在等待該任務完成之後再繼續執行。而使用ForkJoinPool就能夠解決這個問題,它就能夠讓其中的執行緒建立新的任務,並掛起當前的任務,此時執行緒就能夠從佇列中選擇子任務執行。

那麼使用ThreadPoolExecutor或者ForkJoinPool,效能上會有什麼差異呢?

首先,使用ForkJoinPool能夠使用數量有限的執行緒來完成非常多的具有父子關係的任務,比如使用4個執行緒來完成超過200萬個任務。但是,使用ThreadPoolExecutor時,是不可能完成的,因為ThreadPoolExecutor中的Thread無法選擇優先執行子任務,需要完成200萬個具有父子關係的任務時,也需要200萬個執行緒,很顯然這是不可行的,也是很不合理的!!

工作竊取演算法

假如我們需要做一個比較大的任務,我們可以把這個任務分割為若干互不依賴的子任務,為了減少執行緒間的競爭,於是把這些子任務分別放到不同的佇列裡,併為每個佇列建立一個單獨的執行緒來執行佇列裡的任務,執行緒和佇列一一對應,比如A執行緒負責處理A佇列裡的任務。但是有的執行緒會先把自己佇列裡的任務幹完,而其他執行緒對應的佇列裡還有任務等待處理。幹完活的執行緒與其等著,不如去幫其他執行緒幹活,於是它就去其他執行緒的佇列裡竊取一個任務來執行。而在這時它們會訪問同一個佇列,所以為了減少竊取任務執行緒和被竊取任務執行緒之間的競爭,通常會使用雙端佇列,被竊取任務執行緒永遠從雙端佇列的頭部拿任務執行,而竊取任務的執行緒永遠從雙端佇列的尾部拿任務執行。

工作竊取演算法的優點:
充分利用執行緒進行平行計算,並減少了執行緒間的競爭。

工作竊取演算法的缺點:
在某些情況下還是存在競爭,比如雙端佇列裡只有一個任務時。並且該演算法會消耗更多的系統資源,比如建立多個執行緒和多個雙端佇列。

Fork/Join框架侷限性:

對於Fork/Join框架而言,當一個任務正在等待它使用Join操作建立的子任務結束時,執行這個任務的工作執行緒查詢其他未被執行的任務,並開始執行這些未被執行的任務,通過這種方式,執行緒充分利用它們的執行時間來提高應用程式的效能。為了實現這個目標,Fork/Join框架執行的任務有一些侷限性。

(1)任務只能使用Fork和Join操作來進行同步機制,如果使用了其他同步機制,則在同步操作時,工作執行緒就不能執行其他任務了。比如,在Fork/Join框架中,使任務進行了睡眠,那麼,在睡眠期間內,正在執行這個任務的工作執行緒將不會執行其他任務了。
(2)在Fork/Join框架中,所拆分的任務不應該去執行IO操作,比如:讀寫資料檔案。
(3)任務不能丟擲檢查異常,必須通過必要的程式碼來出來這些異常。

ForkJoin框架的實現

ForkJoin框架中一些重要的類如下所示。

ForkJoinPool 框架中涉及的主要類如下所示。

1.ForkJoinPool類

實現了ForkJoin框架中的執行緒池,由類圖可以看出,ForkJoinPool類實現了執行緒池的Executor介面。

我們也可以從下圖中看出ForkJoinPool的類圖關係。

其中,可以使用Executors.newWorkStealPool()方法建立ForkJoinPool。

ForkJoinPool中提供瞭如下提交任務的方法。

public void execute(ForkJoinTask<?> task)
public void execute(Runnable task)
public <T> T invoke(ForkJoinTask<T> task)
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) 
public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task)
public <T> ForkJoinTask<T> submit(Callable<T> task)
public <T> ForkJoinTask<T> submit(Runnable task, T result)
public ForkJoinTask<?> submit(Runnable task)

2.ForkJoinWorkerThread類

實現ForkJoin框架中的執行緒。

3.ForkJoinTask

ForkJoinTask封裝了資料及其相應的計算,並且支援細粒度的資料並行。ForkJoinTask比執行緒要輕量,ForkJoinPool中少量工作執行緒能夠執行大量的ForkJoinTask。

ForkJoinTask類中主要包括兩個方法fork()和join(),分別實現任務的分拆與合併。

fork()方法類似於Thread.start(),但是它並不立即執行任務,而是將任務放入工作佇列中。跟Thread.join()方法不同,ForkJoinTask的join()方法並不簡單的阻塞執行緒,而是利用工作執行緒執行其他任務,當一個工作執行緒中呼叫join(),它將處理其他任務,直到注意到目標子任務已經完成。

我們可以使用下圖來表示這個過程。

ForkJoinTask有3個子類:

  • RecursiveAction:無返回值的任務。
  • RecursiveTask:有返回值的任務。
  • CountedCompleter:完成任務後將觸發其他任務。

4.RecursiveTask

有返回結果的ForkJoinTask實現Callable。

5.RecursiveAction類

無返回結果的ForkJoinTask實現Runnable。

6.CountedCompleter

在任務完成執行後會觸發執行一個自定義的鉤子函式。

ForkJoin示例程式

package io.binghe.concurrency.example.aqs;
 
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;
@Slf4j
public class ForkJoinTaskExample extends RecursiveTask<Integer> {
    public static final int threshold = 2;
    private int start;
    private int end;
    public ForkJoinTaskExample(int start, int end) {
        this.start = start;
        this.end = end;
    }
    @Override
    protected Integer compute() {
        int sum = 0;
        //如果任務足夠小就計算任務
        boolean canCompute = (end - start) <= threshold;
        if (canCompute) {
            for (int i = start; i <= end; i++) {
                sum += i;
            }
        } else {
            // 如果任務大於閾值,就分裂成兩個子任務計算
            int middle = (start + end) / 2;
            ForkJoinTaskExample leftTask = new ForkJoinTaskExample(start, middle);
            ForkJoinTaskExample rightTask = new ForkJoinTaskExample(middle + 1, end);
 
            // 執行子任務
            leftTask.fork();
            rightTask.fork();
 
            // 等待任務執行結束合併其結果
            int leftResult = leftTask.join();
            int rightResult = rightTask.join();
 
            // 合併子任務
            sum = leftResult + rightResult;
        }
        return sum;
    }
    public static void main(String[] args) {
        ForkJoinPool forkjoinPool = new ForkJoinPool();
 
        //生成一個計算任務,計算1+2+3+4
        ForkJoinTaskExample task = new ForkJoinTaskExample(1, 100);
 
        //執行一個任務
        Future<Integer> result = forkjoinPool.submit(task);
 
        try {
            log.info("result:{}", result.get());
        } catch (Exception e) {
            log.error("exception", e);
        }
    }
}

寫在最後

如果覺得文章對你有點幫助,請微信搜尋並關注「 冰河技術 」微信公眾號,跟冰河學習高併發程式設計技術。

最後,附上併發程式設計需要掌握的核心技能知識圖,祝大家在學習併發程式設計時,少走彎路。

相關推薦

併發什麼是ForkJoin

寫在前面 在JDK中,提供了這樣一種功能:它能夠將複雜的邏輯拆分成一個個簡單的邏輯來並行執行,待每個並行執行的邏輯執行完成後,再將各個結果進行彙總,得出最終的結果資料。有點像Hadoop中的MapReduce。 ForkJoin是由JDK1.7之後提供的多執行緒併發處理框架。ForkJoin框架的基本思想是

修真院“善良”系列之十八WEB程序員從零開始到就業的全資料V1.0——只

absolute feed 自己 session rem 好的 ans 一個 css樣式 這是兩年以來,修真院收集整理的學習資料順序。以CSS15個任務,JS15個任務為基礎,分別依據要完成任務的不同的技能點,我們整理出來了這麽一篇在學習的時候需要看到的資料。這是Versi

大資料華為內部狂轉好文,大資料,

來源:華為IT產品解決方案導讀科技的進步在很多的時候總會超出我們的想象,試想如果未來我們一個人擁

K8SService服務詳解,

k8s用名稱空間namespace把資源進行隔離,預設情況下,相同的名稱空間裡的服務可以相互通訊,反之進行隔離。 1.1 Service Kubernetes中一個應用服務會有一個或多個例項(Pod,Pod可以通過rs進行多複本的建立),每個例項(Pod)的IP地址由網路外掛動態隨機分配(Pod重啟後IP地址

Spring註解驅動開發如何使用@Bean註解指定初始化和銷燬的方法?

## 寫在前面 > 在【[String註解驅動開發專題](https://www.cnblogs.com/binghe001/category/1780611.html)】中,前面的文章我們主要講了有關於如何向Spring容器中註冊bean的知識,大家可以到【[String註解驅動開發專題](http

FastDFSFastDFS 分散式檔案系統的安裝與使用,

## 寫在前面 > 有不少小夥伴在實際工作中,對於如何儲存檔案(圖片、視訊、音訊等)沒有一個很好的解決思路。都明白不能將檔案儲存在單臺伺服器的磁碟上,也知道需要將檔案進行副本備份。如果自己手動寫檔案的副本機制,那就太麻煩了,這會涉及冗餘副本機制、伺服器的排程、副本檢測、伺服器節點檢測、檔案副本存放策略

Nginx如何獲取客戶端真實IP、域名、協議、埠?

## 寫在前面 > Nginx最為最受歡迎的反向代理和負載均衡伺服器,被廣泛的應用於網際網路專案中。這不僅僅是因為Nginx本身比較輕量,更多的是得益於Nginx的高效能特性,以及支援外掛化開發,為此,很多開發者或者公司基於Nginx開發出了眾多的高效能外掛。使用者可以根據自身的需求來為Nginx指定

百萬併發下的Nginx優化,

本文作者主要分享在 Nginx 效能方面的實踐經驗,希望能給大家帶來一些系統化思考,幫助大家更有效地去做 Nginx。 優化方法論 我重點分享如下兩個問題: 保持併發連線數,怎麼樣做到記憶體有效使用。 在高併發的同時保持高吞吐量的重要要點。 實現層面主要是三方面優化,主要聚焦

想做好PPT折線圖,

12月 image 菊花 -c 強調 spa any border 線圖 配圖主題無關今天鄭少跟大家聊聊折線圖的使用方法,或者你有疑問,折線圖很簡單,插入修改數據不就好了嗎?如果你要是這樣想的,恭喜你,有可能你會做出下面這樣的效果。如果你要是稍微懂一點折線圖的使用方法,你就

Java NIO?

現在使用NIO的場景越來越多,很多網上的技術框架或多或少的使用NIO技術,譬如Tomcat,Jetty。學習和掌握NIO技術已經不是一個JAVA攻城獅的加分技能,而是一個必備技能。在前面2篇文章《什麼是Zero-Copy?》和《NIO相關基礎篇》中我們學習了NIO的相關理論知

容器中 Java 應用程式的記憶體和 CPU 如何分配?

出品丨Docker公司(ID:docker-cn)編譯丨小東每週一、三、五,與您不見不散! 隨著2018年的結束,我們將回顧排名前五的最受讀者歡迎的文章。今天分享的第一篇文章,將幫助那些在容器中執行 Java 虛擬機器(JVM)時遇到記憶體和 CPU 大小調整/使用困難的人,本文將解釋如何在 D

Android 必須知道2018年流行的框架庫及開發語言,

導語2017 已經悄悄的走了,2018 也已經匆匆的來了,我們在總結過去的同時,也要展望一下未來,來規劃一下今年要學哪些新技術。這幾年優秀Android的開源庫不斷推出,新技術層出不窮,需要我們不斷去了解和掌握,在提高自身開發水平的同時,我們需要付出更多學習精力和時間。俗話說

Android 必須知道2018年流行的框架庫及開發語言,

本文更新時間:2018年07月12日15:50:40導語    2017 已經悄悄的走了,2018 也已經匆匆的來了,我們在總結過去的同時,也要展望一下未來,來規劃一下今年要學哪些新技術。這幾年優秀Android的開源庫不斷推出,新技術層出不窮,需要我們不斷去了解和掌握,在提

Android EventBus原始碼分析,基於最新3.1.1版本,

Android EventBus原始碼分析,基於最新3.1.1版本,看這一篇就夠了!! 前言 上一篇文章對EventBus進行了一個介紹,並且對它的使用方式作了一個較全面的分析,建議在閱讀本文之前,先看看上篇文章的內容:EventBus使用(全面分析,細節提醒) 本篇文章主要

張小龍 4 小時演講沒時間「附贈張小龍歷年演講實錄 PDF」

本文是首次微信 8 年完整發展史的闡述精選和深入分析。2019 年 01 月 09 日晚 19 點 30 分,微信本年度最重要的演講開始了。現場正式演講前,播放的是 In My Secret Life 這首歌。這首歌就是微信 7.0 版本首次啟動的那首歌,據說也是張小龍最喜歡的一首歌。開場前一直在迴圈播放,隨

掌握 Anko,

平時開發android時,我們的UI佈局程式碼一般都是寫在xml中,當然也有少數寫在Java程式碼中,這就導致了這樣的局面:xml佈局清晰可見,但不能動態改變,Java程式碼佈局比較靈活,但比較難用而且冗餘難維護,所以一般都是用xml先編排出佈局,然後

如何低成本實現Flutter富文字,

作者:閒魚技術-玄川 背景 閒魚是國內最早使用Flutter 的團隊,作為一個電商App商品詳情頁是非常重要場景,其中最主要

JVM垃圾回收?

深入理解JVM垃圾回收機制 1、垃圾回收需要解決的問題及解決的辦法總覽 1、如何判定物件為垃圾物件 引用計數法 可達性分析法 2、如何回收 回收策略 標記-清除演算法 複製演算法 標記-整理演算法 分帶收集演算法 垃圾回收器 serial parnew Cms G1 3、何時回收 下面就

介面限流

## 導讀 - 前幾天和一個朋友討論了他們公司的系統問題,傳統的單體應用,叢集部署,他說近期服務的併發量可能會出現瞬時增加的風險,雖然部署了叢集,但是通過壓測後發現請求延遲仍然是很大,想問問我有什麼改進的地方。我沉思了一會,現在去改架構顯然是不可能的,於是我給出了一個建議,讓他去做個介面限流,這樣能夠保證瞬

關於郵件傳送,只

# 關於郵件傳送,只看這一篇就夠了!!! ## 郵件傳送:使用smtplib、email庫來發送郵件 ## 1.傳送郵件的步驟: ![](https://img2020.cnblogs.com/blog/2074124/202007/2074124-20200728204918420-74974604