1. 程式人生 > >springboot @Async 執行緒池使用

springboot @Async 執行緒池使用

一、配置類 AsyncConfig

import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import
org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import com.ly.async.AsyncExceptionHandler; import java.util.concurrent.Executor; @Configuration @EnableAsync public class AsyncConfig implements AsyncConfigurer { @Override public Executor getAsyncExecutor() { ThreadPoolTaskExecutor executor
= new ThreadPoolTaskExecutor(); executor.setCorePoolSize(10); executor.setMaxPoolSize(100); executor.setQueueCapacity(100); executor.setWaitForTasksToCompleteOnShutdown(true); executor.setAwaitTerminationSeconds(60 * 10); executor.setThreadNamePrefix("AsyncThread-"); executor.initialize();
//如果不初始化,導致找到不到執行器 return executor; } @Override public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { return new AsyncExceptionHandler(); } }
View Code

配置執行緒池的初始化屬性

 

二、非同步執行緒異常處理器 

AsyncExceptionHandler

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import java.lang.reflect.Method;
import java.util.Arrays;

public class AsyncExceptionHandler implements AsyncUncaughtExceptionHandler{
     
    private static final Logger log = LoggerFactory.getLogger(AsyncExceptionHandler.class);
        @Override
        public void handleUncaughtException(Throwable ex, Method method, Object... params) {
            log.error("Async method has uncaught exception, params:{}" + Arrays.toString(params));

            if (ex instanceof AsyncException) {
                AsyncException asyncException = (AsyncException) ex;
                log.error("asyncException:"  + asyncException.getMessage());
            }

            log.error("Exception :", ex);
        }
}
View Code

沒有返回值的執行緒,如果跑出異常,會被處理,有返回值的異常資訊可以通過 get() 方法獲取

 通過Future的get方法捕獲異常

 

三、非同步方法

@Async
    public void dealNoReturnTask() {
        logger.info("返回值為void的非同步呼叫開始" + Thread.currentThread().getName());
        //int a =1/0;
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        logger.info("返回值為void的非同步呼叫結束" + Thread.currentThread().getName());
    }

    @Async
    public Future<String> dealHaveReturnTask(int i) {
        //int a =1/0;
        logger.info("asyncInvokeReturnFuture, parementer=" + i);
        Future<String> future;
        try {
            Thread.sleep(1000 * i);
            future = new AsyncResult<String>("success:" + i);
        } catch (InterruptedException e) {
            future = new AsyncResult<String>("error");
        }
        return future;
    }

 

分別是有返回值和沒有返回值的

 

四、測試

@RunWith(SpringRunner.class)
@SpringBootTest
public class ApplicationTests {

    private static final Logger log = LoggerFactory.getLogger(ApplicationTests.class);
   
    @Autowired
    private AsyncTask asyncTask;

@Test
    public void testAsync2() throws InterruptedException, ExecutionException {
        Future<String> result = asyncTask.dealHaveReturnTask(2);
        System.out.println("輸出" + result.get());
    }
View Code

返回success:2

 

五、理解多執行緒

 @Test
    public void testAsync() throws InterruptedException, ExecutionException {
        System.out.println("執行緒名稱" + Thread.currentThread().getName());
        System.out.println("開始S1");
        asyncTask.sum();
        System.out.println("S2 呼叫完sum()");
    }
@Async
    public void sum() {
        System.out.println("進入sum");
        
        int sum = 0;
        for(int i = 0; i < 1000; i++) {
            
            sum += i;
        }
        System.out.println("執行緒名稱" + Thread.currentThread().getName());
        System.out.println(" sum " + sum);
    }

 

 

執行到sum方法的時候,主執行緒並沒有停止,而是繼續,子執行緒去執行裡面的方法

 

 六、使用場景

1、場景

 

  最近做專案的時候遇到了一個小問題:從前臺提交到服務端A,A呼叫服務端B處理超時,原因是前端一次請求往db插1萬資料,插完之後會去清理快取、傳送訊息。

服務端的有三個操作 a、插DB b、清理cache  c、傳送訊息。1萬條資料,說多不多,說少不少.況且不是單單insert。出現超時現象,不足為奇了吧~~

 

2、分析

 

  如何避免超時呢?一次請求處理辣麼多資料,可分多次請求處理,每次請求處理100條資料,可以有效解決超時問題. 為了不影響使用者的體驗,請求改為ajax 非同步請求。

除此之外,仔細分析下場景. a 操作是本次處理的核心. 而b、c操作可以非同步處理。換句話說,a操作處理完可以馬上返回給結果, 不必等b、c處理完再返回。b、c操作可以放在一個非同步任務去處理。

-------------

--舉個簡單的例子:
假設有個請求,這個請求服務端的處理需要執行3個很緩慢的IO操作(比如資料庫查詢或檔案查詢),那麼正常的順序可能是(括號裡面代表執行時間):
a、讀取檔案1  (10ms)
b、處理1的資料(1ms)
c、讀取檔案2  (10ms)
d、處理2的資料(1ms)
e、讀取檔案3  (10ms)
f、處理3的資料(1ms)
g、整合1、2、3的資料結果 (1ms)
單執行緒總共就需要34ms。
那如果你在這個請求內,把ab、cd、ef分別分給3個執行緒去做,就只需要12ms了。

所以多執行緒不是沒怎麼用,而是,你平常要善於發現一些可優化的點。然後評估方案是否應該使用。
假設還是上面那個相同的問題:但是每個步驟的執行時間不一樣了。
a、讀取檔案1  (1ms)
b、處理1的資料(1ms)
c、讀取檔案2  (1ms)
d、處理2的資料(1ms)
e、讀取檔案3  (28ms)
f、處理3的資料(1ms)
g、整合1、2、3的資料結果 (1ms)
單執行緒總共就需要34ms。
如果還是按上面的劃分方案(上面方案和木桶原理一樣,耗時取決於最慢的那個執行緒的執行速度),在這個例子中是第三個執行緒,執行29ms。那麼最後這個請求耗時是30ms。比起不用單執行緒,就節省了4ms。但是有可能執行緒排程切換也要花費個1、2ms。因此,這個方案顯得優勢就不明顯了,還帶來程式複雜度提升。不太值得。

那麼現在優化的點,就不是第一個例子那樣的任務分割多執行緒完成。而是優化檔案3的讀取速度。
可能是採用快取和減少一些重複讀取。
首先,假設有一種情況,所有使用者都請求這個請求,那其實相當於所有使用者都需要讀取檔案3。那你想想,100個人進行了這個請求,相當於你花在讀取這個檔案上的時間就是28×100=2800ms了。那麼,如果你把檔案快取起來,那隻要第一個使用者的請求讀取了,第二個使用者不需要讀取了,從記憶體取是很快速的,可能1ms都不到。

參考:

https://www.cnblogs.com/chenmo-xpw/p/5652029.html

 

專案