1. 程式人生 > 實用技巧 >SpringBoot中的非同步操作與執行緒池

SpringBoot中的非同步操作與執行緒池

SpringBoot中的非同步操作與執行緒池

執行緒池型別

Java通過 java.util.concurrent.Executors 的靜態方法提供五種執行緒池

  1. newCachedThreadPool 建立一個可快取執行緒池,如果執行緒池長度超過處理需要,可靈活回收空閒執行緒,若無可回收,則新建執行緒。
  2. newFixedThreadPool 建立一個定長執行緒池,可控制執行緒最大併發數,超出的執行緒會在佇列中等待。
  3. newScheduledThreadPool 建立一個定長執行緒池,支援定時及週期性任務執行。
  4. newSingleThreadExecutor 建立一個單執行緒化的執行緒池,它只會用唯一的工作執行緒來執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先順序)執行。
  5. newWorkStealingPool 這是java8新增的執行緒池型別。建立一個含有足夠多執行緒的執行緒池,來維持相應的並行級別,它會通過工作竊取的方式,使得多核的CPU不會閒置,總會有活著的執行緒讓CPU去執行。

五種執行緒池的底層實現

  • ThreadPoolExecutor 是CachedThreadPool、FixedThreadPool、SingleThreadExecutor、ScheduledThreadPool 這四種類型執行緒池的底層實現
  • ForkJoinPool (java7已有) 是WorkStealingPool執行緒池的底層實現

使用執行緒池的優點

  • 重用存在的執行緒,減少物件建立、消亡的開銷,效能佳。
  • 可有效控制最大併發執行緒數,提高系統資源的使用率,同時避免過多資源競爭,避免堵塞。
  • 提供定時執行、定期執行、單執行緒、併發數控制等功能。

如何在SpringBoot中優雅的使用執行緒池

註冊執行緒池

在config目錄下建立 AsyncConfig 配置類,在配置類中定義執行緒池

package com.example.async_demo.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.SchedulingConfigurer;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.scheduling.config.ScheduledTaskRegistrar;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@Configuration
@EnableAsync
@EnableScheduling
public class AsyncConfig implements SchedulingConfigurer {
    //第一種執行緒池定義方式,可代替CachedThreadPool、FixedThreadPool、SingleThreadExecutor這三種
    // Spring執行緒池
    @Lazy //執行緒池懶載入
    @Bean(name="threadPoolTaskExecutor",destroyMethod="shutdown") //name為執行緒池名稱,destroyMethod="shutdown"在spring bean回收後釋放資源
    public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
        //封裝的是原生的ThreadPoolExecutor型別執行緒池
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //核心執行緒數(獲取硬體):執行緒池建立時候初始化的執行緒數
        int corePoolSize = Runtime.getRuntime().availableProcessors();
        System.out.println(corePoolSize);
        executor.setCorePoolSize(corePoolSize);
        //最大執行緒數+5:執行緒池最大的執行緒數,只有在緩衝佇列滿了之後才會申請超過核心執行緒數的執行緒
        executor.setMaxPoolSize(corePoolSize+5);
        //緩衝佇列500:用來緩衝執行任務的佇列
        executor.setQueueCapacity(500);
        //允許執行緒的空閒時間60秒:當超過了核心執行緒出之外的執行緒在空閒時間到達之後會被銷燬
        executor.setKeepAliveSeconds(60);
        //執行緒池名的字首:設定好了之後可以方便我們定位處理任務所在的執行緒池
        executor.setThreadNamePrefix("MyAsync-");
        executor.initialize();
        return executor;
    }

    //第二種執行緒池定義方式,使用的是WorkStealingPool
    //java8 搶佔式執行緒池
    @Lazy
    @Bean(name="workStealingPool",destroyMethod="shutdown")
    public ExecutorService workStealingPool(){
        ExecutorService executorService = Executors.newWorkStealingPool();
        return executorService;
    }

    //第三種執行緒池定義方式,為週期任務執行緒池
    //週期任務執行緒池
    @Lazy
    @Bean(name="scheduledThreadPool",destroyMethod="shutdown")
    public ExecutorService scheduledThreadPool() {
        return Executors.newScheduledThreadPool(3);
    }

    @Override
    public void configureTasks(ScheduledTaskRegistrar scheduledTaskRegistrar) {
        scheduledTaskRegistrar.setScheduler(scheduledThreadPool());
    }
}

我在上述案例程式碼中定義了三種類型的執行緒池

  1. 第一種是ThreadPoolTaskExecutor執行緒池,他是Spring中的 org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor 執行緒池,
    底層是對 java.util.concurrent.ThreadPoolExecutor 的封裝,綜合了CachedThreadPool、FixedThreadPool、SingleThreadExecutor這三種執行緒池的優點;
  2. 第二種是java8新增的 workStealingPool 執行緒池。第一種和第二種使用時可以在配置類上使用@EnableAsync註解,這樣就能優雅的使用@Async註解方法來實現執行緒run邏輯了;
  3. 第三種是ScheduledThreadPool執行緒池,不過在Spring中使用需要配置類實現SchedulingConfigurer介面,重寫configureTasks方法。在配置類上使用
    @EnableScheduling註解,就可以優雅的使用@Scheduled註解方法來實現週期邏輯了

使用執行緒池

對第一種和第二種執行緒池在service中實現執行緒run的邏輯

package com.example.async_demo.service;

import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.stereotype.Service;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;

@Service
public class AsyncService {
    
    //使用名為threadPoolTaskExecutor的執行緒池,返回Future
    @Async("threadPoolTaskExecutor")
    public Future<Double> service1(){
        double result = getRand(3000);
        return AsyncResult.forValue(result);
    }
    
    //使用名為threadPoolTaskExecutor的執行緒池,返回CompletableFuture
    @Async("threadPoolTaskExecutor")
    public CompletableFuture<Double> service2(){
        double result = getRand(3000);
        return CompletableFuture.completedFuture(result);
    }
    
    //使用名為workStealingPool的執行緒池,返回CompletableFuture
    @Async("workStealingPool")
    public CompletableFuture<Double> service3(){
        double result = getRand(3000);
        return CompletableFuture.completedFuture(result);
    }

    private double getRand(long sleep){
        System.out.println(Thread.currentThread().getId()+"-start");
        try {
            Thread.sleep(sleep);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        double result = Math.random();//方法返回的結果
        return result;
    }
}

測試第一種和第二種執行緒池

@SpringBootTest
class AsyncDemoApplicationTests {

    @Autowired
    private AsyncService asyncService;

    @Test
    void test1() throws ExecutionException, InterruptedException {
        long start = System.currentTimeMillis();
        Future<Double> result1 = asyncService.service1();
        Future<Double> result2 = asyncService.service1();
        Future<Double> result3 = asyncService.service1();
        
        //讓主執行緒等待子執行緒結束之後才能繼續執行
        while (!(result1.isDone()&&result2.isDone()&&result3.isDone())){
            Thread.sleep(500);
        }
        long end = System.currentTimeMillis();
        System.out.println(end-start+"ms");
        System.out.println(result1.get());
        System.out.println(result2.get());
        System.out.println(result3.get());
    }

    @Test
    void test2() throws ExecutionException, InterruptedException {
        long start = System.currentTimeMillis();
        CompletableFuture<Double> result1 = asyncService.service2();
        CompletableFuture<Double> result2 = asyncService.service2();
        CompletableFuture<Double> result3 = asyncService.service2();

        //join() 的作用:讓主執行緒等待子執行緒結束之後才能繼續執行
        CompletableFuture.allOf(result1,result2,result3).join();
        long end = System.currentTimeMillis();
        System.out.println(end-start+"ms");
        System.out.println(result1.get());
        System.out.println(result2.get());
        System.out.println(result3.get());
    }

    @Test
    void test3() throws ExecutionException, InterruptedException {
        long start = System.currentTimeMillis();
        CompletableFuture<Double> result1 = asyncService.service3();
        CompletableFuture<Double> result2 = asyncService.service3();
        CompletableFuture<Double> result3 = asyncService.service3();

        //join() 的作用:讓主執行緒等待子執行緒結束之後才能繼續執行
        CompletableFuture.allOf(result1,result2,result3).join();
        long end = System.currentTimeMillis();
        System.out.println(end-start+"ms");
        System.out.println(result1.get());
        System.out.println(result2.get());
        System.out.println(result3.get());
    }
}

test1測試結果

test2測試結果

test3測試結果

通過測試發現Future返回型別不適合主線等待多個子執行緒全部完成的操作,
因為需要用到while迴圈去阻塞主執行緒,而CompletableFuture可以通過CompletableFuture.allOf(cf1,cf2,cf3).join()
去完成這個操作,所以推薦使用CompletableFuture作為返回型別

注意:@Async註解的方法不能在本類中被呼叫,只能在其他類中呼叫,如Controller類

對第三種執行緒池在service中實現執行緒的邏輯

package com.example.async_demo.service;

import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

@Service
public class AsyncService {
    
    //cron表示式 每5秒執行一次
    //@Scheduled(cron = "*/5 * * * * ?")
    @Scheduled(cron = "${cron.sec5}") //表示式寫在application.yml檔案中,則以這種方式取出。
    public void service4(){
        System.out.println("5s-"+Thread.currentThread().getId()+":"+System.currentTimeMillis()/1000);
    }
    
    //cron表示式 每3秒執行一次
    @Scheduled(cron = "${cron.sec3}")
    public void service5(){
        System.out.println("3s-"+Thread.currentThread().getId()+":"+System.currentTimeMillis()/1000);
    }

}

application.yml 檔案

cron:
  sec5: '*/5 * * * * ?'
  sec3: '*/3 * * * * ?'

週期任務測試結果(啟動Application類)

通過測試結果可發現兩個週期任務使用了三個執行緒,
執行緒id分別是20、21、25。兩個週期任務分別以3s和5s執行一次,
但不固定在某個執行緒中執行,而是哪個執行緒空閒則使用哪個執行緒

注意:若不為週期任務配置執行緒池,只使用@EnableScheduling和@Scheduled註解的話,
則所有周期任務共用一個子執行緒,若出現下一個週期開始上一個週期任務還沒結束的情況,
則執行緒阻塞,直到前一個任務完成

CRON表示式

  • cron表示式是定義任務週期的一種表示式
  • 這裡不多介紹,可以參考這篇部落格