1. 程式人生 > 其它 >【優雅程式碼】04-1行程式碼完成多執行緒,別再寫runnable了

【優雅程式碼】04-1行程式碼完成多執行緒,別再寫runnable了

java8提供的CompletableFuture以及匿名函式可以讓我們一行程式碼完成多執行緒

【優雅程式碼】04-1行程式碼完成多執行緒,別再寫runnable了

歡迎關注b站賬號/公眾號【六邊形戰士夏寧】,一個要把各項指標拉滿的男人。該文章已在github目錄收錄。
螢幕前的大帥比大漂亮如果有幫助到你的話請順手點個贊、加個收藏這對我真的很重要。別下次一定了,都不關注上哪下次一定。

1.背景介紹

java8提供的CompletableFuture以及匿名函式可以讓我們一行程式碼完成多執行緒

2.建立相關類

2.1.ThreadEntity

用於多執行緒測試的實體類

public class ThreadEntity {
    private int num;
    private int price;
    public int countPrice(){
        price = RandomUtils.nextInt();
        try {
            System.out.println(num);
            // 隨機等待1~10秒
            Thread.sleep(RandomUtils.nextInt(1, 10) * 1000);
            System.out.println(num);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return price;
    }
}

2.2.ThreadPoolManager

/**
 * tasks 每秒的任務數,預設200,依據訪問量及使用執行緒池的地方進行計算
 * taskCost:每個任務花費時間,預設0.1s
 * responseTime:最大響應時間,預設為1s,一般使用者最大忍受時間為3秒
 *
 * @author seal email:[email protected]
 * @date 2020/5/30 10:08 AM
 */
@Data
@Slf4j
@Configuration
public class ThreadPoolManager {
    /**
     * 平均響應時間預設2秒
     */
    private static final float ALL_COST_AVG = 2F;
    /**
     * 平均IO時間預設1.5秒
     */
    private static final float IO_COST_AVG = 1.5F;
    /**
     * 伺服器核數
     */
    private static final int SIZE_PROCESSOR = Runtime.getRuntime().availableProcessors();
    /**
     * https://www.cnblogs.com/dennyzhangdd/p/6909771.html?utm_source=itdadao&utm_medium=referral
     * 阻塞係數=阻塞時間/(阻塞時間+計算時間)
     * 執行緒數=核心數/(1-阻塞係數)
     * 等同於CPU核心數*cpu使用率*(1+等待時間與計算時間的比率)
     * N+1通常為最優效率
     * <p>
     * https://blog.51cto.com/13527416/2056080
     */
    private static final int SIZE_CORE_POOL = SIZE_PROCESSOR + 1;

    /**
     * 執行緒池維護最大數量,預設會與核心執行緒數一致無意義,保守情況取2cpu
     * 或者使用簡單計算 執行緒池大小 = ((執行緒 IO time + 執行緒 CPU time )/執行緒 CPU time ) CPU數目**
     * 請求所消耗的時間 /(請求所消耗的時間-DB處理)*CPU數目,重點在於cpu等待時間,通常為資料庫DB時間
     * 按照通常2秒展示介面,資料庫運算1.5秒則(2/0.5)*n,其實就是優化等待時間
     * <p>
     * 預設4n即8核32執行緒
     */
    private static final int SIZE_MAX_POOL = (int) (SIZE_PROCESSOR * (ALL_COST_AVG / (ALL_COST_AVG - IO_COST_AVG)));
    /**
     * 執行緒池佇列長度,預設為integer最大值,Dubbo使用1000,無限大會引起使用者使用者的任務一直排隊,應選擇適當性丟棄,
     * 可忍受時間6其它的則拋棄
     * SIZE_MAX_POOL/IO_COST_AVG=每秒可處理任務數,預設為
     * 可忍受時間6*每秒可處理任務數=X佇列數
     */
    private static final int SIZE_QUEUE = (int) (6 * (SIZE_MAX_POOL / IO_COST_AVG));
    /**
     * 執行緒池具體類
     * LinkedBlockingDeque常用於固定執行緒,SynchronousQueue常用於cache執行緒池
     * Executors.newCachedThreadPool()常用於短期任務
     * <p>
     * 執行緒工廠選擇,區別不大
     * 有spring的CustomizableThreadFactory,new CustomizableThreadFactory("springThread-pool-")
     * guava的ThreadFactoryBuilder,new ThreadFactoryBuilder().setNameFormat("retryClient-pool-").build();
     * apache-lang的BasicThreadFactory,new BasicThreadFactory.Builder().namingPattern("basicThreadFactory-").build()
     * <p>
     * 佇列滿了的策略預設AbortPolicy
     */
    private static ThreadPoolManager threadPoolManager = new ThreadPoolManager();

    private final ThreadPoolExecutor pool = new ThreadPoolExecutor(
            SIZE_CORE_POOL,
            SIZE_MAX_POOL,
            30L, TimeUnit.SECONDS, new LinkedBlockingDeque<>(SIZE_QUEUE),
            new CustomizableThreadFactory("springThread-pool-"),
            new ThreadPoolExecutor.AbortPolicy()
    );

    private void prepare() {
        if (pool.isShutdown() && !pool.prestartCoreThread()) {
            int coreSize = pool.prestartAllCoreThreads();
            System.out.println("當前執行緒池");
        }
    }


    public static ThreadPoolManager getInstance() {
        if (threadPoolManager != null) {
            ThreadPoolExecutor pool = threadPoolManager.pool;
        }
        return threadPoolManager;
    }
}

3.核心程式碼

3.1.並行流

parallel是並行核心可以發現內部是多執行緒執行,但是經過collect以後會排好序所以不用擔心,小專案可以使用,大專案的話建議老老實實用自己的執行緒池,JDK自帶的fork/join並不貼合業務

System.out.println(Stream.of(1, 2, 3, 4, 5, 6).parallel().map(l -> {
    System.out.println(l);
    return l;
}).collect(Collectors.toList()));

輸出如下,因為多執行緒所以隨機輸出,但因為使用collect收集則最終結果並未發生改變

2
6
4
5
3
1
[1, 2, 3, 4, 5, 6]

3.2.同步程式碼

這個可以不用再去實現執行緒的介面,不過還是要考慮一下佇列滿了的丟棄情況

List<ThreadEntity> listEntity = IntStream.range(0, 10).mapToObj(x -> ThreadEntity.builder().num(x).build()).collect(Collectors.toList());
List<CompletableFuture<Integer>> listCompletableFuture = listEntity.stream().map(x -> {
    try {
        // 此處ThreadPoolManager.getInstance().getPool()如果不傳該引數則使用預設commonPool,無特殊需求的話trycatch一般不寫
        return CompletableFuture.supplyAsync(() -> x.countPrice(),
                ThreadPoolManager.getInstance().getPool());
    } catch (RejectedExecutionException e) {
        System.out.println("reject" + x);
        log.error("", e);
        return null;
    }
}).collect(Collectors.toList());
List<Integer> result = listCompletableFuture.stream().map(CompletableFuture::join).collect(Collectors.toList());
System.out.println(result);
System.out.println(listEntity);

輸出如下可以看到執行是以多執行緒的方式進行,但是結果和原先是保持一致的

start6
start9
start0
start3
start2
start1
start8
start5
start4
start7
end3
end8
end5
end7
end9
end1
end2
end6
end0
end4
[131523688, 1491605535, 222657954, 132274662, 1134597171, 2057763841, 1168687436, 1842194861, 1264173480, 56446450]
[ThreadEntity(super=com.example.demo.lesson.grace.thread.ThreadEntity@7d6f201, num=0, price=131523688), ThreadEntity(super=com.example.demo.lesson.grace.thread.ThreadEntity@58e825f3, num=1, price=1491605535), ThreadEntity(super=com.example.demo.lesson.grace.thread.ThreadEntity@d458bb1, num=2, price=222657954), ThreadEntity(super=com.example.demo.lesson.grace.thread.ThreadEntity@7e26830, num=3, price=132274662), ThreadEntity(super=com.example.demo.lesson.grace.thread.ThreadEntity@43a0a2b8, num=4, price=1134597171), ThreadEntity(super=com.example.demo.lesson.grace.thread.ThreadEntity@7aa70ac1, num=5, price=2057763841), ThreadEntity(super=com.example.demo.lesson.grace.thread.ThreadEntity@45a8d047, num=6, price=1168687436), ThreadEntity(super=com.example.demo.lesson.grace.thread.ThreadEntity@6dcdb8e3, num=7, price=1842194861), ThreadEntity(super=com.example.demo.lesson.grace.thread.ThreadEntity@4b59d119, num=8, price=1264173480), ThreadEntity(super=com.example.demo.lesson.grace.thread.ThreadEntity@35d5d9e, num=9, price=56446450)]

如果IntStream.range(0, 10)改成(0, 1000)則會有如下拒絕報錯

java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.CompletableFuture$AsyncSupply@5af97850 rejected from java.util.concurrent.ThreadPoolExecutor@491666ad[Running, pool size = 64, active threads = 64, queued tasks = 256, completed tasks = 0]
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
    at java.util.concurrent.CompletableFuture.asyncSupplyStage(CompletableFuture.java:1618)
    at java.util.concurrent.CompletableFuture.supplyAsync(CompletableFuture.java:1843)
    at com.example.demo.lesson.grace.thread.TestMain.lambda$threadEx1$2(TestMain.java:34)
    at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
    at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
    at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
    at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
    at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
    at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
    at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
    at com.example.demo.lesson.grace.thread.TestMain.threadEx1(TestMain.java:41)
    at com.example.demo.lesson.grace.thread.TestMain.main(TestMain.java:26)
rejectThreadEntity(super=com.example.demo.lesson.grace.thread.ThreadEntity@1a9, num=366)

3.3.非同步程式碼

以下程式碼可以直接簡寫成一行,在處理非同步任務變得異常方便
CompletableFuture.runAsync(() -> fun())

List<ThreadEntity> listEntity = IntStream.range(0, 500).mapToObj(x -> ThreadEntity.builder().num(x).build()).collect(Collectors.toList());
List<CompletableFuture> listCompletableFuture = listEntity.stream().map(x -> {
    try {
        // 此處ThreadPoolManager.getInstance().getPool()如果不傳該引數則使用預設commonPool,無特殊需求的話trycatch一般不寫
        return CompletableFuture.runAsync(() -> x.countPrice(), ThreadPoolManager.getInstance().getPool());
    } catch (RejectedExecutionException e) {
        System.out.println("reject" + x);
        return null;
    }
}).collect(Collectors.toList());
listCompletableFuture.stream().map(CompletableFuture::join);
System.out.println("1234");
// 一行多執行緒非同步執行寫法
CompletableFuture.runAsync(() -> System.out.println(1));

輸出如下,可以看到主執行緒已經結束了其它子執行緒才在輸出,完全沒有等待的多執行緒

1234
1
start7
start0
start6
start5
start4
start2
start8
start1
start9
start3
end8
end4
end9
end6
end2
end0
end1
end3
end5
end7