1. 程式人生 > 實用技巧 >Spring Boot 自定義執行緒池使用@Async實現非同步呼叫任務

Spring Boot 自定義執行緒池使用@Async實現非同步呼叫任務

定義執行緒池

第一步,先在Spring Boot主類中定義一個執行緒池,比如:

@SpringBootApplication
public class Application {
 
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
 
    @EnableAsync
    @Configuration
    class TaskPoolConfig {
 
        @Bean("taskExecutor")
        
public Executor taskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(10); executor.setMaxPoolSize(20); executor.setQueueCapacity(200); executor.setKeepAliveSeconds(60); executor.setThreadNamePrefix(
"taskExecutor-"); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); return executor; } } }

上面我們通過使用ThreadPoolTaskExecutor建立了一個執行緒池,同時設定了以下這些引數:

  • 核心執行緒數10:執行緒池建立時候初始化的執行緒數
  • 最大執行緒數20:執行緒池最大的執行緒數,只有在緩衝佇列滿了之後才會申請超過核心執行緒數的執行緒
  • 緩衝佇列200:用來緩衝執行任務的佇列
  • 允許執行緒的空閒時間60秒:當超過了核心執行緒出之外的執行緒在空閒時間到達之後會被銷燬
  • 執行緒池名的字首:設定好了之後可以方便我們定位處理任務所在的執行緒池
  • 執行緒池對拒絕任務的處理策略:這裡採用了CallerRunsPolicy策略,當執行緒池沒有處理能力的時候,該策略會直接在 execute 方法的呼叫執行緒中執行被拒絕的任務;如果執行程式已關閉,則會丟棄該任務

使用執行緒池

在定義了執行緒池之後,我們如何讓非同步呼叫的執行任務使用這個執行緒池中的資源來執行呢?方法非常簡單,我們只需要在@Async註解中指定執行緒池名即可,比如:

@Slf4j
@Component
public class Task {
 
    public static Random random = new Random();
 
    @Async("taskExecutor")
    public void doTaskOne() throws Exception {
        log.info("開始做任務一");
        long start = System.currentTimeMillis();
        Thread.sleep(random.nextInt(10000));
        long end = System.currentTimeMillis();
        log.info("完成任務一,耗時:" + (end - start) + "毫秒");
    }
 
    @Async("taskExecutor")
    public void doTaskTwo() throws Exception {
        log.info("開始做任務二");
        long start = System.currentTimeMillis();
        Thread.sleep(random.nextInt(10000));
        long end = System.currentTimeMillis();
        log.info("完成任務二,耗時:" + (end - start) + "毫秒");
    }
 
    @Async("taskExecutor")
    public void doTaskThree() throws Exception {
        log.info("開始做任務三");
        long start = System.currentTimeMillis();
        Thread.sleep(random.nextInt(10000));
        long end = System.currentTimeMillis();
        log.info("完成任務三,耗時:" + (end - start) + "毫秒");
    }
 
}

單元測試

@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest
public class ApplicationTests {
 
    @Autowired
    private Task task;
 
    @Test
    public void test() throws Exception {
 
        task.doTaskOne();
        task.doTaskTwo();
        task.doTaskThree();
 
        Thread.currentThread().join();
    }
 
}

執行上面的單元測試,我們可以在控制檯中看到所有輸出的執行緒名前都是之前我們定義的執行緒池字首名開始的,說明我們使用執行緒池來執行非同步任務的試驗成功了!

2020-03-27 22:01:15.620  INFO 73703 --- [ taskExecutor-1] com.didispace.async.Task                 : 開始做任務一
2020-03-27 22:01:15.620  INFO 73703 --- [ taskExecutor-2] com.didispace.async.Task                 : 開始做任務二
2020-03-27 22:01:15.620  INFO 73703 --- [ taskExecutor-3] com.didispace.async.Task                 : 開始做任務三
2020-03-27 22:01:18.165  INFO 73703 --- [ taskExecutor-2] com.didispace.async.Task                 : 完成任務二,耗時:2545毫秒
2020-03-27 22:01:22.149  INFO 73703 --- [ taskExecutor-3] com.didispace.async.Task                 : 完成任務三,耗時:6529毫秒
2020-03-27 22:01:23.912  INFO 73703 --- [ taskExecutor-1] com.didispace.async.Task                 : 完成任務一,耗時:8292毫秒

注意,方法標記了@Async 呼叫該方法,不能在該類中呼叫,不然不管用,就是呼叫者和被呼叫者不能是用一個類;

Future型別

非同步方法裡按照平時那樣返回結果,主執行緒是獲取不到的;Future的get方法可以阻塞主執行緒,直到子執行緒執行完畢,獲取非同步結果;

Future提供了三種功能:

  1. 判斷任務是否完成;

  2. 能夠中斷任務;

  3. 能夠獲取任務執行結果

    @Async("taskExecutor")
     
       public Future<String> run() throws Exception {
     
           long sleep = random.nextInt(10000);
     
           log.info("開始任務,需耗時:" + sleep + "毫秒");
     
           Thread.sleep(sleep);
     
           log.info("完成任務");
     
           return new AsyncResult<>("test");
     
       }
    @Test
     
       public void test() throws Exception {
     
           Future<String> futureResult = task.run();
     
           String result = futureResult.get(5, TimeUnit.SECONDS);
     
           log.info(result);
     
       }
     
     
    Future它的介面定義如下:
     
     
     
    public interface Future<V> {
     
       boolean cancel(boolean mayInterruptIfRunning);
     
       boolean isCancelled();
     
       boolean isDone();
     
       V get() throws InterruptedException, ExecutionException;
     
       V get(long timeout, TimeUnit unit)
     
           throws InterruptedException, ExecutionException, TimeoutException;
     
    }

它的介面定義如下:

public interface Future<V> {

   boolean cancel(boolean mayInterruptIfRunning);

   boolean isCancelled();

   boolean isDone();

   V get() throws InterruptedException, ExecutionException;

   V get(long timeout, TimeUnit unit)

       throws InterruptedException, ExecutionException, TimeoutException;

}

它宣告這樣的五個方法:

  • cancel方法用來取消任務,如果取消任務成功則返回true,如果取消任務失敗則返回false。引數mayInterruptIfRunning表示是否允許取消正在執行卻沒有執行完畢的任務,如果設定true,則表示可以取消正在執行過程中的任務。如果任務已經完成,則無論mayInterruptIfRunning為true還是false,此方法肯定返回false,即如果取消已經完成的任務會返回false;如果任務正在執行,若mayInterruptIfRunning設定為true,則返回true,若mayInterruptIfRunning設定為false,則返回false;如果任務還沒有執行,則無論mayInterruptIfRunning為true還是false,肯定返回true。

  • isCancelled方法表示任務是否被取消成功,如果在任務正常完成前被取消成功,則返回 true。

  • isDone方法表示任務是否已經完成,若任務完成,則返回true;

  • get()方法用來獲取執行結果,這個方法會產生阻塞主執行緒,會一直等到任務執行完畢才返回;

  • get(long timeout, TimeUnit unit)用來獲取執行結果,如果在指定時間內,還沒獲取到結果,就直接返回null。

如果是用迴圈跑了多個任務,想要等待多個非同步任務都結束,主執行緒才結束,可以這樣寫

List<Future<String>> futures = new ArrayList<>();
for(int i=0;i<companyNamejsonArray.size();i++){
        Future<String> future = financialCaseService.addDomList(companyNamejsonArray.getString(i), automaticTask.getTaskName(), taskId,i);
//在addDom中用return new AsyncResult<>(phoneNum+"成功");返回資訊
        futures.add(future);
    }
 
for (Future future : futures) {
    String string = (String) future.get();
    System.out.println(string);
}