1. 程式人生 > 其它 >執行緒池ThreadPoolTaskExecutor的同步及非同步使用

執行緒池ThreadPoolTaskExecutor的同步及非同步使用

參考資訊

本人蔘考的是這一篇,描述方面比本人好得多: springboot執行緒池的使用和擴充套件 VisiableThreadPoolTaskExecutor

背景:

簡略記一下,筆記:

  • 目標是想在 springboot服務下,自定義一個執行緒池,然後使用非同步,原目的是為了批量匯入用。

專案架構

  • 普通的springboot服務

步驟

1、先定義一個 ExecutorConfig 類
為了方便,Executor 實現類也可以放在這個config 類。用 @Bean 註解一下也是可以的
並且要加上 @Configuration @EnableAsync 這兩個註解~~


import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;

import lombok.extern.slf4j.Slf4j;

/**
 * 執行緒池config
 *
 */
@Configuration
@EnableAsync
@Slf4j
public class ExecutorConfig {

    // 核心執行緒數
    @Value("${async.executor.thread.core_pool_size:5}")
    private int corePoolSize;

    // 最大執行緒數
    @Value("${async.executor.thread.max_pool_size:10}")
    private int maxPoolSize;

    // 執行緒池佇列數
    @Value("${async.executor.thread.queue_capacity:2000}")
    private int queueCapacity;

    // 執行緒池字首
    @Value("${async.executor.thread.name.prefix:async-task-}")
    private String namePrefix;

    @Bean(name = "asyncServiceExecutor")
    public Executor asyncServiceExecutor() {
        log.warn("start asyncServiceExecutor");
        //在這裡修改
        ThreadPoolTaskExecutor executor = new VisiableThreadPoolTaskExecutor();
        //配置核心執行緒數
        executor.setCorePoolSize(corePoolSize);
        //配置最大執行緒數
        executor.setMaxPoolSize(maxPoolSize);
        //配置佇列大小
        executor.setQueueCapacity(queueCapacity);
        //配置執行緒池中的執行緒的名稱字首
        executor.setThreadNamePrefix(namePrefix);
        // rejection-policy:當pool已經達到max size的時候,如何處理新任務
        // CALLER_RUNS:不在新執行緒中執行任務,而是有呼叫者所在的執行緒來執行
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //執行初始化
        executor.initialize();
        return executor;
    }
}

2、這裡用到了一個 VisiableThreadPoolTaskExecutor 類, 這個就是為了對執行緒池執行過程中的細節進行記錄的一個類。
看下程式碼就清楚了:

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.concurrent.ListenableFuture;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;

/**
 * ThreadPoolTaskExecutor的子類
 */
public class VisiableThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {
    private static final Logger logger = LoggerFactory.getLogger(VisiableThreadPoolTaskExecutor.class);

    private void showThreadPoolInfo(String prefix){
        ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();

        if(null==threadPoolExecutor){
            return;
        }

        logger.info("{}, {},taskCount [{}], completedTaskCount [{}], activeCount [{}], queueSize [{}]",
                this.getThreadNamePrefix(),
                prefix,
                threadPoolExecutor.getTaskCount(),
                threadPoolExecutor.getCompletedTaskCount(),
                threadPoolExecutor.getActiveCount(),
                threadPoolExecutor.getQueue().size());
    }

    @Override
    public void execute(Runnable task) {
        showThreadPoolInfo("1. do execute");
        super.execute(task);
    }

    @Override
    public void execute(Runnable task, long startTimeout) {
        showThreadPoolInfo("2. do execute");
        super.execute(task, startTimeout);
    }

    @Override
    public Future<?> submit(Runnable task) {
        showThreadPoolInfo("1. do submit");
        return super.submit(task);
    }

    @Override
    public <T> Future<T> submit(Callable<T> task) {
        showThreadPoolInfo("2. do submit");
        return super.submit(task);
    }

    @Override
    public ListenableFuture<?> submitListenable(Runnable task) {
        showThreadPoolInfo("1. do submitListenable");
        return super.submitListenable(task);
    }

    @Override
    public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
        showThreadPoolInfo("2. do submitListenable");
        return super.submitListenable(task);
    }
}

3、其實到此,準備工作就做完了。。。接下來就是呼叫了。
controller 層, service 層。。。emmm,這裡暫不詳述了,參考的博文寫得更好。。。
大概示意一下service層的同步非同步方法的呼叫。。:

  • 執行緒池同步呼叫的方式:
    意思就是, 一個請求過來,會等執行緒池裡面執行完成後,再返回。
    非同步,就不等待了,直接響應回去了~
    // 同步用法,要把 Executor 注入進來,並指定 name
    @Resource(name = "asyncServiceExecutor")
    private Executor executor;

    @Override
    public void showme() {
        log.info("start>>>>>>");
        executor.execute(()->{
            // 模擬請求。。
            // 從日誌上看, start >>>>> 的列印時間和  end >>>>>> 的列印時間,中間是會隔著3秒的
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        log.info("end>>>>>>");
    }

  • 非同步用法
    // 非同步用法,就不用這兩行了。。。
    // @Resource(name = "asyncServiceExecutor")
    // private Executor executor;

    // 但是要在方法上面,加上這一行了,重要,重要~
    @Async("asyncServiceExecutor")
    @Override
    public void showme() {
        log.info("start>>>>>>");
        // 模擬請求。。
        // 這裡用睡眠3秒錶示請求,但實際上這個方法,並不會阻塞,它會馬上返回。。。
        // 用日誌上看到, start >>>>> 和 end >>>> 是在同一時間戳列印的。。。
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        log.info("end>>>>>>");
    }



本人比較懶,也只是隨手筆記,如有錯漏請指正!謝謝。