1. 程式人生 > >Fork Join 併發任務執行框架

Fork Join 併發任務執行框架

Fork Join 體現了分而治之

什麼是分而治之?

  規模為N的問題,如果N<閾值,直接解決,N>閾值,將N分解為K個小規模子問題,子問題互相對立,與原問題形式相同,將子問題的解合併得到原問題的解 

Fork Join 框架:

  就是在必要的情況下,將一個大任務,進行拆分(fork)成若干了小任務(拆到不可再拆時),再將一個個的小任務運算的結果進行join彙總

 

 

Fork Join的另一大特點:工作密取

什麼是工作密取?

  就是在按指定閾值拆分後,的多個執行緒,如果執行緒A的任務執行的比較快,獲得到的CPU時間片比較多,那麼在他執行完畢後,就會從未執行完畢的執行緒的任務中的尾部,進行任務竊取,任務完成後再把結果放回去,不會造成任務競爭,因為自身執行執行緒的任務是從頭部開始獲取的,而空閒的執行緒是從尾部竊取的.

 

 

 Fork Join使用的標準正規化

 

 

 在使用的過程中我們是無法直接new 一個ForkJoinTask類的,他是一個抽象類,但是他提供了兩個子類,RecursiveTask和ResursiveAction兩個子抽象類.我們使用的時候,如果需要有返回值,我們就繼承RecursiveTask,如果不需要返回值我們就繼承RecursiveAction

Fork Join實戰

  Fork Join的同步用法同時演示返回結果值:統計整數陣列中所有元素的和

先建立一個工具類用於製作整數陣列

package org.dance.day2.forkjoin.sum;

import java.util.Random;

/**
 * 陣列製作類
 * @author ZYGisComputer
 */
public class MarkArray {

    public static final int ARRAY_LENGTH = 4000;

    /**
     * int陣列生成器
     * @return int陣列
     */
    public static int[] markArray(){

        Random random = new Random();

        int[] array = new int[ARRAY_LENGTH];

        for (int i = 0; i < ARRAY_LENGTH; i++) {
            array[i] = random.nextInt(ARRAY_LENGTH*3);
        }

        return array;

    }

}

然後建立一個單執行緒的求和類,用於和多執行緒的對比

package org.dance.day2.forkjoin.sum;

import org.dance.tools.SleepTools;

/**
 * 單執行緒實現求和
 * @author ZYGisComputer
 */
public class SumNormal {

    public static void main(String[] args) {
        int count = 0;

        // 獲取陣列
        int[] src = MarkArray.markArray();

        long l = System.currentTimeMillis();

        for (int i = 0; i < src.length; i++) {
            // 執行一毫秒的休眠
            SleepTools.ms(1);
            count += src[i];
        }

        System.out.println("The count is "+count+" spend time "+(System.currentTimeMillis() - l));
    }

}

使用繼承RecursiveTask的ForkJoin框架類,完成多執行緒的求和計算

package org.dance.day2.forkjoin.sum;

import org.dance.tools.SleepTools;

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;

/**
 * 使用ForkJoin框架實現求和
 * @author ZYGisComputer
 */
public class SumArray {

    /**
     * 因為需要返回值所以繼承RecursiveTask類
     *  因為計算的是整型,所以泛型是Integer
     */
    private static class SumTask extends RecursiveTask<Integer> {

        // 計算閾值
        private final static int THRESHOLD = MarkArray.ARRAY_LENGTH/10;

        // 源陣列
        private int[] src;

        // 開始座標
        private int fromIndex;

        // 結束座標
        private int toIndex;

        /**
         * 通過建立時傳入
         * @param src 元素組
         * @param fromIndex 開始座標
         * @param toIndex 結束座標
         */
        public SumTask(int[] src, int fromIndex, int toIndex) {
            this.src = src;
            this.fromIndex = fromIndex;
            this.toIndex = toIndex;
        }

        /**
         * 覆蓋執行方法
         * @return 整型
         */
        @Override
        protected Integer compute() {
            // 如果 結束下標減去開始下標小於閾值的時候,那麼任務就可以開始執行了
            if( toIndex - fromIndex < THRESHOLD ){
                int count = 0;
                // 從開始下標開始迴圈,迴圈到結束下標
                for (int i = fromIndex; i < toIndex; i++) {
                    // 休眠1毫秒
                    SleepTools.ms(1);
                    count += src[i];
                }
                return count;
            }else{
                // 大於閾值 繼續拆分任務
                // 從formIndex---------------------->到toIndex
                // 計算中間值,從formIndex----------計算mid------------>到toIndex
                int mid = (fromIndex + toIndex) / 2;
                // 左側任務 從formIndex------------>到mid結束
                SumTask left = new SumTask(src, fromIndex, mid);
                // 右側任務 從mid+1開始------------->到toIndex結束
                SumTask right = new SumTask(src, mid+1,toIndex);
                // 呼叫任務
                invokeAll(left,right);
                // 獲取結果
                return left.join() + right.join();
            }
        }
    }

    public static void main(String[] args) {

        // 建立ForkJoin任務池
        ForkJoinPool forkJoinPool = new ForkJoinPool();

        // 製作源陣列
        int[] src = MarkArray.markArray();

        long l = System.currentTimeMillis();

        // 建立一個任務 下標因為從0開始所以結束下標需要-1
        SumTask sumTask = new SumTask(src, 0, src.length - 1);

        // 提交同步任務
        Integer invoke = forkJoinPool.invoke(sumTask);

        // 無論是接收invoke方法的返回值還是呼叫任務的Join方法都可以獲取到結果值
        System.out.println("The count is "+invoke+" spend time "+(System.currentTimeMillis() - l));
        System.out.println("The count is "+sumTask.join()+" spend time "+(System.currentTimeMillis() - l));

    }

}

執行結果對比:

現在是4000大小的陣列,每次迴圈休眠1毫秒

單執行緒執行的結果:

The count is 23751855 spend time 5395

多執行緒執行的結果:

The count is 23387745 spend time 1487
The count is 23387745 spend time 1487

結果對比多執行緒比單執行緒快大概3倍的時間

接下來我們去掉休眠時間,再次進行結果對比:

單執行緒執行結果:

The count is 23460518 spend time 0

多執行緒執行結果:

The count is 24078313 spend time 3
The count is 24078313 spend time 3

然後我們驚奇的發現,多執行緒比單執行緒還要慢,為什麼呢,是因為在小資料量的情況下,單執行緒,執行期間沒有花費上下文切換時間,多執行緒執行期間是需要花費執行緒之間上下文切換的時間的,每次上下文切換時間之前說過,大概花費5000-20000個時鐘週期的,所以多執行緒執行會比單執行緒慢一些,所以說我們在用多執行緒的時候,就需要考慮執行緒之間的上下文切換問題,並不一定多執行緒就一定是好,我們只是看需求,而選擇,就像Redis一樣設計的時候就是單執行緒的,但是他的強大,卻是比多執行緒的memcached更加強大,所以說沒有肯定的結論,只有適合和不適合.

接下來我們往大調整整型陣列的大小

4000調整為1億,然後對比結果

單執行緒執行結果:

The count is -331253431 spend time 51

多執行緒執行結果:

The count is 75277814 spend time 49
The count is 75277814 spend time 50

我們可以發現,所用的執行時間,已經大概一致了

繼續調大1億調整為3億,繼續對比結果

單執行緒執行結果:

The count is 57724808 spend time 205

多執行緒執行結果:

The count is 1028352167 spend time 106
The count is 1028352167 spend time 106

現在單執行緒已經是多執行緒的執行時間的兩倍了,由此可見,當資料量越來越大的時候,單執行緒的效能往往就會逐漸降低,而多執行緒的優勢就漸漸體現出來了

所謂的同步用法就是在呼叫

forkJoinPool.invoke(sumTask);

之後主執行緒就在這裡阻塞了,需要等待,執行完成後,主執行緒才能繼續往下執行,接下里我們看非同步用法

  Fork Join的非同步用法同時演示不要求返回值:遍歷指定目錄(含子目錄)尋找指定型別檔案

package org.dance.day2.forkjoin;

import org.dance.tools.SleepTools;

import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;

/**
 * 使用ForkJoin框架實現不定個數的任務執行
 * @author ZYGisComputer
 */
public class FindDirsFiles {

    /**
     * 因為搜尋檔案不需要返回值,所以我們繼承RecursiveAction
     */
    private static class FindFilesByDirs extends RecursiveAction{

        private File path;

        public FindFilesByDirs(File path) {
            this.path = path;
        }

        @Override
        protected void compute() {

            // 建立任務容器
            List<FindFilesByDirs> findFilesByDirs = new ArrayList<>();

            // 獲取資料夾下所有的物件
            File[] files = path.listFiles();

            if(null!=files){

                for (File file : files) {
                    // 判斷是否是資料夾
                    if (file.isDirectory()){
                        // 新增到任務容器中
                        findFilesByDirs.add(new FindFilesByDirs(file));
                    }else{
                        // 如果是一個檔案,那麼檢查這個檔案是否符合需求
                        if(file.getAbsolutePath().endsWith(".txt")){
                            // 如果符合 列印
                            System.out.println("檔案:"+file.getAbsolutePath());
                        }
                    }
                }

                // 判斷任務容器是否為空
                if(!findFilesByDirs.isEmpty()){
                    // 遞交任務組
                    for (FindFilesByDirs filesByDirs : invokeAll(findFilesByDirs)) {
                        // 等待子任務執行完成
                        filesByDirs.join();
                    }

                }

            }

        }
    }

    public static void main(String[] args) {

        // 建立ForkJoin池
        ForkJoinPool forkJoinPool = new ForkJoinPool();

        File path = new File("E:/");

        // 建立任務
        FindFilesByDirs findFilesByDirs = new FindFilesByDirs(path);

        // 非同步呼叫 這個方法是沒有返回值的
        forkJoinPool.execute(findFilesByDirs);

        System.out.println("Task is Running................");
        SleepTools.ms(1);

        // 在這裡做這個只是測試ForkJoin是否為非同步,當執行ForkJoin的時候主執行緒是否繼續執行
        int otherWork = 0;
        for (int i = 0; i < 100; i++) {
            otherWork += i;
        }
        System.out.println("Main thread done sth.......,otherWork:"+otherWork);

        // 如果是有返回值的話,可以獲取,當然這個join方法是一個阻塞式的,因為主執行緒執行的太快了,ForkJoin還沒執行完成主執行緒就死亡了,所以在這裡呼叫一下阻塞,等待ForkJoin執行完成
        findFilesByDirs.join();

        System.out.println("Thread end!");

    }

}

執行結果:

Task is Running................
Main thread done sth.......,otherWork:4950
檔案:E:\dance\activiti-ruoyi\RuoYi-Process\ruoyi-admin\src\main\resources\static\file\rml.txt
檔案:E:\dance\activiti-ruoyi\RuoYi-Process\ruoyi-admin\target\classes\banner.txt
檔案:E:\dance\activiti-ruoyi\RuoYi-Process\ruoyi-admin\target\classes\static\ajax\libs\jquery-ztree\3.5\log v3.x.txt
檔案:E:\dance\activiti-ruoyi\RuoYi-Process\ruoyi-admin\target\classes\static\file\rml.txt
........................
Thread end!

從執行結果中可以看到,主執行緒的執行時在ForkJoin執行之前就執行了,但是程式碼中卻是在ForkJoin執行之後執行的,所以說這是非同步的,執行緒是並行執行的,非同步執行只能通過呼叫任務執行緒的Join方法獲取返回值,execute方法是沒有返回值的

作者:彼岸舞

時間:2020\09\18

內容關於:併發程式設計

本文來源於網路,只做技術分享,一概不負任何