1. 程式人生 > 其它 >ThreadPoolExecutor自定義執行緒池 IO密集型的場景,CPU計算密集型的場景

ThreadPoolExecutor自定義執行緒池 IO密集型的場景,CPU計算密集型的場景

技術標籤:Java多執行緒java併發程式設計

自定義執行緒池配置元件類封裝
pom.xml 配置

		<dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>transmittable-thread-local</artifactId>
            <version>2.11.5</version>
        </dependency>

        <dependency
>
<groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> </dependency>

整合ThreadPoolExecutor自定義 IO,CPU密集型和單機

package com.tuan.threadpool;

import com.alibaba.ttl.threadpool.TtlExecutors;
import lombok.
Data; import lombok.NoArgsConstructor; import org.springframework.stereotype.Component; import java.util.concurrent.*; /** * 自定義執行緒池 * <p> * 1,maximunPoolSize設定大小依據: * 有業務型別類配置,分為以下兩種型別,由Runtime.getRuntime().availableProcessors() * 來判斷伺服器可使用的cpu核數在根據以下的兩種型別來判斷。 * - CPU密集型 * 該任務需要大量的運算,而且沒有阻塞,需要CPU一直全速執行, * CPU密集任務只有在真正的多核CPU上才可能得到加速。 * 一般計算公式:CPU核數 + 1個執行緒的執行緒池 * - IO密集型 * 即該任務需要大量的IO,即大量的阻塞,這種型別分以下兩種情況設定 * 1,如果IO密集型任務執行緒並非一直在執行任務,則應配置儘可能多的執行緒,如CPU核數 * 2 * 2,參考公式:CPU核數 /1 - 阻塞係數 阻塞係數在0.8~0.9之間 * 比如:8核CPU:8/1 - 0.9 = 80個執行緒數 * * 2,阻塞佇列種類: * - ArrayBlockingQueue 由陣列結構組成的有界阻塞佇列 * - LinkedBlockingQueue 由連結串列結構組成的有界(但大小預設值為Integer.MAX_VALUE)阻塞佇列 * - PriorityBlockingQueue 支援優先順序排序的無界阻塞佇列 * - DelayQueue 使用優先順序佇列實現的延遲無界阻塞佇列 * - SynchronousQueue 不儲存元素的阻塞佇列,也即單個元素的阻塞佇列 * - LinkedTransferQueue 由連結串列結構組成的無界阻塞佇列 * - LinkedBlockingDeque 由連結串列組成的雙向阻塞佇列 * * 3,拒絕策略 * - AbortPolicy 直接拋異常阻止系統正常執行 * - CallerRunsPolicy 由呼叫執行緒處理該任務 * - DiscardOldestPolicy 丟棄佇列最前面的任務,然後重新嘗試執行任務(重複此過程) * - DiscardPolicy 也是丟棄任務,但是不丟擲異常。 * </p> */
@Data @NoArgsConstructor public class ThreadPoolComponent { //核心執行緒數 private volatile Integer corePoolSize; //最大執行緒數 private volatile Integer maximumPoolSize; //除核心執行緒外的執行緒最大空閒時間 private volatile Long keepAliveTime; //空閒時間單位 private volatile TimeUnit unit; //阻塞佇列 private BlockingQueue<Runnable> workQueue; //拒絕策略 private RejectedExecutionHandler handler; //伺服器的cpu核數 private static final Integer CPUS = Runtime.getRuntime().availableProcessors(); //預設直接拋異常 private static final RejectedExecutionHandler defaultHandler = new ThreadPoolExecutor.AbortPolicy(); //自定義執行緒的引數 public ThreadPoolComponent(Integer corePoolSize, Integer maximumPoolSize, Long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.keepAliveTime = keepAliveTime; this.unit = unit; this.workQueue = workQueue; this.handler = handler; } /** * 獲取只有單個執行緒的執行緒池 * @return ExecutorService */ public ExecutorService getSingleExecutorService() { ExecutorService threadPool = new ThreadPoolExecutor( 1, 1, 0L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), Executors.defaultThreadFactory(), defaultHandler); return TtlExecutors.getTtlExecutorService(threadPool); } /** * corePoolSize 保留在池中的執行緒數 * maximumPoolSize 最大執行緒數 * keepAliveTime 當執行緒數大於核心數時,這是多餘的空閒執行緒將在終止之前等待新任務的最長時間。 * unit 時間單位 * threadFactory 新執行緒時使用的工廠模式 * workQueue 等待執行緒佇列的大小 * handler 由於達到執行緒邊界被阻止時使用的處理程式模式 * * 實際情況下具體流程如下: * * 1)當池子大小小於corePoolSize就新建執行緒,並處理請求 * * 2)當池子大小等於corePoolSize,把請求放入workQueue中,池子裡的空閒執行緒就去從workQueue中取任務並處理 * * 3)當workQueue放不下新入的任務時,新建執行緒入池,並處理請求,如果池子大小撐到了maximumPoolSize就用RejectedExecutionHandler來做拒絕處理 * * 4)另外,當池子的執行緒數大於corePoolSize的時候,多餘的執行緒會等待keepAliveTime長的時間,如果無請求可處理就自行銷燬 */ /** * 得到執行緒執行物件ExecutorService * 固定拒絕策略為:AbortPolicy * @return ExecutorService */ public ExecutorService getExecutorService(Integer corePoolSize,Integer maximumPoolSize, Long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { ExecutorService threadPool = new ThreadPoolExecutor( corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler); return TtlExecutors.getTtlExecutorService(threadPool); } /** * 當前業務為較少IO密集型的場景 * 獲取初始化好maximunPoolSize的執行緒池 */ public ExecutorService getMirrorIOExecutorService(Integer corePoolSize, Long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { ExecutorService threadPool = new ThreadPoolExecutor( corePoolSize, initMirrorIOMaxPoolSize(), keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler); return TtlExecutors.getTtlExecutorService(threadPool); } /** * 當前業務為較多IO密集型的場景 * 獲取初始化好maximunPoolSize的執行緒池 * @return ExecutorService */ public ExecutorService getFullIOExecutorService(Integer corePoolSize, Long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { ExecutorService threadPool = new ThreadPoolExecutor( corePoolSize, initFullIOMaxPoolSize(), keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler); return TtlExecutors.getTtlExecutorService(threadPool); } /** * 當前業務為CPU計算密集型的場景 * 獲取初始化好maximunPoolSize的執行緒池 * @return ExecutorService */ public ExecutorService getCPUExecutorService(Integer corePoolSize, Long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { ExecutorService threadPool = new ThreadPoolExecutor( corePoolSize, initCPUMaxPoolSize(), keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler); return TtlExecutors.getTtlExecutorService(threadPool); } /** * 初始化maximunPoolSize----IO密集型 * @return Integer */ public Integer initMirrorIOMaxPoolSize() { maximumPoolSize = CPUS * 2; return maximumPoolSize; } public Integer initFullIOMaxPoolSize() { maximumPoolSize = (int) (CPUS/(1-0.9)); return maximumPoolSize; } /** * 初始化maximunPoolSize----CPU密集型 * @return Integer */ public Integer initCPUMaxPoolSize() { maximumPoolSize = CPUS + 1; return maximumPoolSize; } }

其他服務配置注入執行緒池

package com.tuan.config;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import com.tuan.threadpool.ThreadPoolComponent;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;



/**
 *  測試服務配置類
 * @author tuan
 * @date 2021/01/20 17:30
 */
@Configuration
public class TestConfiguration {

    /**
     * 適合處理IO密集型任務
     * @author tuan
     * @date 2021/01/20 17:29
     * @return java.util.concurrent.ExecutorService
     */
    @Bean(name = "fullIOThreadPool")
    public ExecutorService getFullIOExecutorService() {
        return new ThreadPoolComponent().getFullIOExecutorService(5, 60L, TimeUnit.SECONDS, new LinkedBlockingDeque<>(80), new ThreadPoolExecutor.CallerRunsPolicy());
    }

    /**
     * 適合CPU計算密集型任務
     * @author tuan
     * @date 2021/01/20 17:29
     * @return java.util.concurrent.ExecutorService
     */
    @Bean(name = "fullCPUEThreadPool")
    public ExecutorService getCPUExecutorService() {
        return new ThreadPoolComponent().getCPUExecutorService(10, 180L, TimeUnit.SECONDS, new LinkedBlockingDeque<>(160), new ThreadPoolExecutor.CallerRunsPolicy());
    }

    /**
     * 單執行緒池
     * @author tuan
     * @date 2021/01/20 17:29
     * @return java.util.concurrent.ExecutorService
     */
    @Bean(name = "SingleThreadPool")
    public ExecutorService getSingleExecutorService() {
        return new ThreadPoolComponent().getSingleExecutorService();
    }

}

執行緒池呼叫測試

package com.tuan;

import com.tuan.config.TestConfiguration;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicLongArray;

/**
 * 測試
 * @author tuan
 * @date 2021/01/20 17:18
 **/
@SpringBootTest(classes = Application.class)
@RunWith(SpringRunner.class)
public class multithreadedTest {

    @Autowired
    private TestConfiguration SingleThreadPool;

    @Autowired
    private TestConfiguration fullCPUEThreadPool;

    @Autowired
    private TestConfiguration fullIOThreadPool;



    @Test
    public void SingleThreadPool(){
        ExecutorService singleExecutorService = SingleThreadPool.getFullIOExecutorService();
        //往執行緒池中迴圈提交執行緒
        for (int i = 0; i < 10; i++) {
            //開啟執行緒
            singleExecutorService.execute(()->{
                System.out.println("getId"+Thread.currentThread().getId());
            });
        }
    }

    @Test
    public void ListSplitTest(){
        AtomicLongArray array =new AtomicLongArray(1000000);
        ExecutorService cpuExecutorService = fullCPUEThreadPool.getCPUExecutorService();
        for (int j=0;j<100;j++){
            cpuExecutorService.execute(()->{
                for (int i=0; i<10000;i++){
                    array.incrementAndGet(i);
                }
            });
        }
        System.out.println(array);
    }
}