1. 程式人生 > 其它 >Java JUC併發之ForkJoin入門

Java JUC併發之ForkJoin入門

十四、ForkJoin

什麼是fork join

JDK1.7開始

並行執行任務!提高效率 => 針對大資料量

大資料 => MapReduce (把把任務拆分成小任務)

ForkJoin 特點 : 工作竊取

已經執行完任務的執行緒會將尚未執行完的執行緒中的任務竊取過來,避免執行緒等待造成資源浪費 => 提高效率

這裡維護的都是雙端佇列 Deque 【佇列兩端都可以存取】

ForkJoin測試

package com.liu.forkjoin;

import java.util.concurrent.RecursiveTask;

/**
 * 求和計算的任務
 * 3000=>for   6000 => forkjoin  9000 => Stream並行流
 */
public class ForkJoinDemo extends RecursiveTask<Long> {

    private Long start;
    private Long end;

    private Long temp = 10000L;

    public ForkJoinDemo(Long start,Long end) {
        this.start = start;
        this.end = end;
    }

    public static void main(String[] args) {

        int sum = 0;

        for (int i = 1; i <= 10_0000_0000; i++) {
            sum += i;
        }
        System.out.println(sum);
    }

    @Override
    protected Long compute() {
        if ((end-start) < temp){ // 一般的求和

            Long sum = 0L;
            for (Long i = start; i <= end ; i++) {
                sum += i;

            }
            return sum;

        }else{
            // 分支合併計算
            Long middle = (start + end)/2; // 中間值

            ForkJoinDemo task1 = new ForkJoinDemo(start, middle);
            task1.fork(); // 拆分任務,把任務壓入執行緒
            ForkJoinDemo task2 = new ForkJoinDemo(middle + 1, end);
            task2.fork();

            return task1.join() + task2.join();

        }
    }
}

測試類 Test

package com.liu.forkjoin;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.stream.DoubleStream;
import java.util.stream.IntStream;
import java.util.stream.LongStream;

// 測試類 求 0-10_0000_0000的和
public class Test {

    //同一個任務 效率高几十倍!!
    public static void main(String[] args) throws ExecutionException, InterruptedException {

      //  test1(); // 6303
      //  test2(); // 5846
        test3(); // 150 ???
    }

    // 普通程式設計師
    public static void test1(){
        Long sum = 0L;
        Long start = System.currentTimeMillis(); // 記錄任務開始時間

        for (Long i = 1L; i <= 10_0000_0000; i++) {
            sum += i;
        }

        Long end = System.currentTimeMillis(); // 記錄任務完成時間

        System.out.println("sum = " + sum + " 時間 :" + (end -start));
    }


    // ForkJoin
    public static void test2() throws ExecutionException, InterruptedException {

        Long start = System.currentTimeMillis();

        ForkJoinPool forkJoinPool = new ForkJoinPool();
        ForkJoinTask<Long> task = new ForkJoinDemo(0L, 10_0000_0000L);
        ForkJoinTask<Long> submit = forkJoinPool.submit(task);// 提交任務
        Long sum = submit.get();

        Long end = System.currentTimeMillis();
        System.out.println("sum = " + sum + " 時間 :" + (end -start));

    }

    // Stream並行流 呼叫API
    public static void test3(){

        Long start = System.currentTimeMillis();

        // Stream並行流
        // DoubleStream
        // IntStream.rangeClosed()
        Long sum = LongStream.rangeClosed(0L, 10_0000_0000L).parallel().reduce(0,Long::sum); // sum是一個求和函式 返回一個a+b的和


        Long end = System.currentTimeMillis();

        System.out.println("sum = " + sum + " 時間 :" + (end -start));

    }
}

本質 :(RecursiveTask) 遞迴 返回一個ForkJoinTask

本文來自部落格園,作者:{夕立君},轉載請註明原文連結:https://www.cnblogs.com/liuzhhhao/p/15016660.html