1. 程式人生 > 實用技巧 >SpringBoot:詳解@EnableAsync + @Async 實現共享執行緒池

SpringBoot:詳解@EnableAsync + @Async 實現共享執行緒池

鍥子

平時的開發中,可以看到很多地方要使用多執行緒技術,

比如
1.處理大資料量的資料時,可以採用執行緒池,充分利用多核優勢
2.使用者觸發一個較長的流程時,可以將一部分處理邏輯,另起一個執行緒非同步處理,減少使用者等待時間

不過執行緒是一種寶貴的資源,一個系統執行在伺服器上,要根據CPU的數量來合理設定併發執行緒數量。
如果一個系統中每個執行緒使用者都自己定義執行緒或者執行緒池,有一些可見的不良後果

比如
1.系統各處啟執行緒太多,導致CPU切換上下文的消耗
2.定義執行緒池的引數不一致,導致各種不同實現共存難以處理和排查問題

所以本文介紹一種安全又幹淨的方式:在Springboot中,使用@EnableAsync + @Async註解實現公用執行緒池,這裡的詳解就是對涉及的知識點進行一點研究和分析,網上現存的介紹多是一句話帶過,比如

使用@EnableAsync來開啟非同步的支援,使用@Async來對某個方法進行非同步執行

知識點詳解

學習一個註解,同樣是直接看原始碼和註釋
為了避免篇幅過長,我分開寫了兩篇:

@EnableAsync

@EnableAsync 詳解

@Async

@Async 詳解

執行緒池引數

關於執行緒池引數等知識,美團技術團隊這批文章很好,建議學習
Java執行緒池實現原理及其在美團業務中的實踐

實戰&舉例

由知識點詳解中的內容可知,有兩種方式可以實現執行緒池

方式一 繼承AsyncConfigurer介面,直接使用@Async

執行緒池配置類

@Configuration
@EnableAsync
public class AsyncExecutor implements AsyncConfigurer {

    private static final Logger logger = LoggerFactory.getLogger(AsyncExecutor.class);

    //核心執行緒數
    private static final int CORE_POOL_SIZE = 5;

    //最大執行緒數
    private static final int MAX_POOL_SIZE = 5;

    //佇列大小
    private static final int QUEUE_CAPACITY = 50;

    //執行緒池中的執行緒的名稱字首
    private static final String THREAD_NAME = "MyExecutor-";


    @Override
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //配置核心執行緒數
        executor.setCorePoolSize(CORE_POOL_SIZE);
        //配置最大執行緒數
        executor.setMaxPoolSize(MAX_POOL_SIZE);
        //配置佇列大小
        executor.setQueueCapacity(QUEUE_CAPACITY);
        //配置執行緒池中的執行緒的名稱字首
        executor.setThreadNamePrefix(THREAD_NAME);
        //配置執行緒池拒絕策略,我設定為CallerRunsPolicy,當執行緒和佇列都滿了,由發起執行緒的主執行緒自己執行
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.initialize();
        return executor;
    }

    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return new MyAsyncUncaughtExceptionHandler();
    }

    private class MyAsyncUncaughtExceptionHandler implements AsyncUncaughtExceptionHandler {
        @Override
        public void handleUncaughtException(Throwable throwable, Method method, Object... obj) {
            //手動處理的邏輯
            logger.error("輸出報錯資訊");
        }
    }

方式二 不繼承AsyncConfigurer介面,使用@Async("指定執行緒池")

執行緒池配置類

@Configuration
@EnableAsync
public class AsyncExecutor2 {

    private static final Logger logger = LoggerFactory.getLogger(AsyncExecutor2.class);

    //核心執行緒數
    private static final int CORE_POOL_SIZE = 5;

    //最大執行緒數
    private static final int MAX_POOL_SIZE = 5;

    //佇列大小
    private static final int QUEUE_CAPACITY = 50;

    //執行緒池中的執行緒的名稱字首
    private static final String THREAD_NAME = "MyExecutor-";


    @Bean
    public Executor getAsyncExecutorMethod() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //配置核心執行緒數
        executor.setCorePoolSize(CORE_POOL_SIZE);
        //配置最大執行緒數
        executor.setMaxPoolSize(MAX_POOL_SIZE);
        //配置佇列大小
        executor.setQueueCapacity(QUEUE_CAPACITY);
        //配置執行緒池中的執行緒的名稱字首
        executor.setThreadNamePrefix(THREAD_NAME);
        //配置執行緒池拒絕策略,我設定為CallerRunsPolicy,當執行緒和佇列都滿了,由發起執行緒的主執行緒自己執行
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.initialize();
        return executor;
    }
}

測試

註解方法

@Service
public class AsyncService {
    private static final Logger logger = LoggerFactory.getLogger(AsyncService.class);
    @Async
    public void test1() throws InterruptedException {
        logger.info("---------test1 start: {}", Thread.currentThread().getName());
        Thread.sleep(1000);
        logger.info("---------test1 end: {}", Thread.currentThread().getName());
    }
    @Async
    public void test2() throws InterruptedException {
        logger.info("---------test2 start: {}", Thread.currentThread().getName());
        Thread.sleep(1000);
        logger.info("---------test2 end: {}", Thread.currentThread().getName());
    }
    @Async("getAsyncExecutorMethod")
    public void test3() throws InterruptedException {
        logger.info("---------test3 start: {}", Thread.currentThread().getName());
        Thread.sleep(5000);
        logger.info("---------test3 end: {}", Thread.currentThread().getName());
    }
    @Async("getAsyncExecutorMethod")
    public void test4() throws InterruptedException {
        logger.info("---------test4 start: {}", Thread.currentThread().getName());
        Thread.sleep(1000);
        logger.info("---------test4 end: {}", Thread.currentThread().getName());
    }
}

呼叫的介面

@RestController
@RequestMapping("api")
public class AsyncController {
    private static final Logger logger = LoggerFactory.getLogger(AsyncService.class);
    @Autowired
    AsyncService asyncService;

    @GetMapping("/async")
    public void ac() {
        try {
            asyncService.test1();
            asyncService.test2();
        } catch (InterruptedException e) {
            logger.error("ac error");
        }
    }

    @GetMapping("/async2")
    public void ac2() {
        try {
            asyncService.test3();
            asyncService.test4();
        } catch (InterruptedException e) {
            logger.error("ac2 error");
        }
    }
}

瀏覽器直接輸入
http://localhost:7777/oxye/api/async
幾秒後再輸入
http://localhost:7777/oxye/api/async2
再檢視IDEA-Debug-Console的輸出日誌

結果

可以看到,兩個執行緒先後開始執行,然後結束
前四列為async結果,後四列為async2結果

如果去掉@Async,會變成序列執行,如下

全篇完