ThreadPoolExecutor自定義執行緒池 IO密集型的場景,CPU計算密集型的場景
阿新 • • 發佈:2021-01-21
自定義執行緒池配置元件類封裝
pom.xml 配置
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>transmittable-thread-local</artifactId>
<version>2.11.5</version>
</dependency>
<dependency >
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
整合ThreadPoolExecutor自定義 IO,CPU密集型和單機
package com.tuan.threadpool;
import com.alibaba.ttl.threadpool.TtlExecutors;
import lombok. Data;
import lombok.NoArgsConstructor;
import org.springframework.stereotype.Component;
import java.util.concurrent.*;
/**
* 自定義執行緒池
* <p>
* 1,maximunPoolSize設定大小依據:
* 有業務型別類配置,分為以下兩種型別,由Runtime.getRuntime().availableProcessors()
* 來判斷伺服器可使用的cpu核數在根據以下的兩種型別來判斷。
* - CPU密集型
* 該任務需要大量的運算,而且沒有阻塞,需要CPU一直全速執行,
* CPU密集任務只有在真正的多核CPU上才可能得到加速。
* 一般計算公式:CPU核數 + 1個執行緒的執行緒池
* - IO密集型
* 即該任務需要大量的IO,即大量的阻塞,這種型別分以下兩種情況設定
* 1,如果IO密集型任務執行緒並非一直在執行任務,則應配置儘可能多的執行緒,如CPU核數 * 2
* 2,參考公式:CPU核數 /1 - 阻塞係數 阻塞係數在0.8~0.9之間
* 比如:8核CPU:8/1 - 0.9 = 80個執行緒數
*
* 2,阻塞佇列種類:
* - ArrayBlockingQueue 由陣列結構組成的有界阻塞佇列
* - LinkedBlockingQueue 由連結串列結構組成的有界(但大小預設值為Integer.MAX_VALUE)阻塞佇列
* - PriorityBlockingQueue 支援優先順序排序的無界阻塞佇列
* - DelayQueue 使用優先順序佇列實現的延遲無界阻塞佇列
* - SynchronousQueue 不儲存元素的阻塞佇列,也即單個元素的阻塞佇列
* - LinkedTransferQueue 由連結串列結構組成的無界阻塞佇列
* - LinkedBlockingDeque 由連結串列組成的雙向阻塞佇列
*
* 3,拒絕策略
* - AbortPolicy 直接拋異常阻止系統正常執行
* - CallerRunsPolicy 由呼叫執行緒處理該任務
* - DiscardOldestPolicy 丟棄佇列最前面的任務,然後重新嘗試執行任務(重複此過程)
* - DiscardPolicy 也是丟棄任務,但是不丟擲異常。
* </p>
*/
@Data
@NoArgsConstructor
public class ThreadPoolComponent {
//核心執行緒數
private volatile Integer corePoolSize;
//最大執行緒數
private volatile Integer maximumPoolSize;
//除核心執行緒外的執行緒最大空閒時間
private volatile Long keepAliveTime;
//空閒時間單位
private volatile TimeUnit unit;
//阻塞佇列
private BlockingQueue<Runnable> workQueue;
//拒絕策略
private RejectedExecutionHandler handler;
//伺服器的cpu核數
private static final Integer CPUS = Runtime.getRuntime().availableProcessors();
//預設直接拋異常
private static final RejectedExecutionHandler defaultHandler = new ThreadPoolExecutor.AbortPolicy();
//自定義執行緒的引數
public ThreadPoolComponent(Integer corePoolSize,
Integer maximumPoolSize,
Long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.keepAliveTime = keepAliveTime;
this.unit = unit;
this.workQueue = workQueue;
this.handler = handler;
}
/**
* 獲取只有單個執行緒的執行緒池
* @return ExecutorService
*/
public ExecutorService getSingleExecutorService() {
ExecutorService threadPool = new ThreadPoolExecutor(
1,
1,
0L,
TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
Executors.defaultThreadFactory(),
defaultHandler);
return TtlExecutors.getTtlExecutorService(threadPool);
}
/**
* corePoolSize 保留在池中的執行緒數
* maximumPoolSize 最大執行緒數
* keepAliveTime 當執行緒數大於核心數時,這是多餘的空閒執行緒將在終止之前等待新任務的最長時間。
* unit 時間單位
* threadFactory 新執行緒時使用的工廠模式
* workQueue 等待執行緒佇列的大小
* handler 由於達到執行緒邊界被阻止時使用的處理程式模式
*
* 實際情況下具體流程如下:
*
* 1)當池子大小小於corePoolSize就新建執行緒,並處理請求
*
* 2)當池子大小等於corePoolSize,把請求放入workQueue中,池子裡的空閒執行緒就去從workQueue中取任務並處理
*
* 3)當workQueue放不下新入的任務時,新建執行緒入池,並處理請求,如果池子大小撐到了maximumPoolSize就用RejectedExecutionHandler來做拒絕處理
*
* 4)另外,當池子的執行緒數大於corePoolSize的時候,多餘的執行緒會等待keepAliveTime長的時間,如果無請求可處理就自行銷燬
*/
/**
* 得到執行緒執行物件ExecutorService
* 固定拒絕策略為:AbortPolicy
* @return ExecutorService
*/
public ExecutorService getExecutorService(Integer corePoolSize,Integer maximumPoolSize,
Long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
ExecutorService threadPool = new ThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
keepAliveTime,
unit,
workQueue,
Executors.defaultThreadFactory(),
handler);
return TtlExecutors.getTtlExecutorService(threadPool);
}
/**
* 當前業務為較少IO密集型的場景
* 獲取初始化好maximunPoolSize的執行緒池
*/
public ExecutorService getMirrorIOExecutorService(Integer corePoolSize,
Long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
ExecutorService threadPool = new ThreadPoolExecutor(
corePoolSize,
initMirrorIOMaxPoolSize(),
keepAliveTime,
unit,
workQueue,
Executors.defaultThreadFactory(),
handler);
return TtlExecutors.getTtlExecutorService(threadPool);
}
/**
* 當前業務為較多IO密集型的場景
* 獲取初始化好maximunPoolSize的執行緒池
* @return ExecutorService
*/
public ExecutorService getFullIOExecutorService(Integer corePoolSize,
Long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
ExecutorService threadPool = new ThreadPoolExecutor(
corePoolSize,
initFullIOMaxPoolSize(),
keepAliveTime,
unit,
workQueue,
Executors.defaultThreadFactory(),
handler);
return TtlExecutors.getTtlExecutorService(threadPool);
}
/**
* 當前業務為CPU計算密集型的場景
* 獲取初始化好maximunPoolSize的執行緒池
* @return ExecutorService
*/
public ExecutorService getCPUExecutorService(Integer corePoolSize,
Long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
ExecutorService threadPool = new ThreadPoolExecutor(
corePoolSize,
initCPUMaxPoolSize(),
keepAliveTime,
unit,
workQueue,
Executors.defaultThreadFactory(),
handler);
return TtlExecutors.getTtlExecutorService(threadPool);
}
/**
* 初始化maximunPoolSize----IO密集型
* @return Integer
*/
public Integer initMirrorIOMaxPoolSize() {
maximumPoolSize = CPUS * 2;
return maximumPoolSize;
}
public Integer initFullIOMaxPoolSize() {
maximumPoolSize = (int) (CPUS/(1-0.9));
return maximumPoolSize;
}
/**
* 初始化maximunPoolSize----CPU密集型
* @return Integer
*/
public Integer initCPUMaxPoolSize() {
maximumPoolSize = CPUS + 1;
return maximumPoolSize;
}
}
其他服務配置注入執行緒池
package com.tuan.config;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import com.tuan.threadpool.ThreadPoolComponent;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* 測試服務配置類
* @author tuan
* @date 2021/01/20 17:30
*/
@Configuration
public class TestConfiguration {
/**
* 適合處理IO密集型任務
* @author tuan
* @date 2021/01/20 17:29
* @return java.util.concurrent.ExecutorService
*/
@Bean(name = "fullIOThreadPool")
public ExecutorService getFullIOExecutorService() {
return new ThreadPoolComponent().getFullIOExecutorService(5, 60L, TimeUnit.SECONDS, new LinkedBlockingDeque<>(80), new ThreadPoolExecutor.CallerRunsPolicy());
}
/**
* 適合CPU計算密集型任務
* @author tuan
* @date 2021/01/20 17:29
* @return java.util.concurrent.ExecutorService
*/
@Bean(name = "fullCPUEThreadPool")
public ExecutorService getCPUExecutorService() {
return new ThreadPoolComponent().getCPUExecutorService(10, 180L, TimeUnit.SECONDS, new LinkedBlockingDeque<>(160), new ThreadPoolExecutor.CallerRunsPolicy());
}
/**
* 單執行緒池
* @author tuan
* @date 2021/01/20 17:29
* @return java.util.concurrent.ExecutorService
*/
@Bean(name = "SingleThreadPool")
public ExecutorService getSingleExecutorService() {
return new ThreadPoolComponent().getSingleExecutorService();
}
}
執行緒池呼叫測試
package com.tuan;
import com.tuan.config.TestConfiguration;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicLongArray;
/**
* 測試
* @author tuan
* @date 2021/01/20 17:18
**/
@SpringBootTest(classes = Application.class)
@RunWith(SpringRunner.class)
public class multithreadedTest {
@Autowired
private TestConfiguration SingleThreadPool;
@Autowired
private TestConfiguration fullCPUEThreadPool;
@Autowired
private TestConfiguration fullIOThreadPool;
@Test
public void SingleThreadPool(){
ExecutorService singleExecutorService = SingleThreadPool.getFullIOExecutorService();
//往執行緒池中迴圈提交執行緒
for (int i = 0; i < 10; i++) {
//開啟執行緒
singleExecutorService.execute(()->{
System.out.println("getId"+Thread.currentThread().getId());
});
}
}
@Test
public void ListSplitTest(){
AtomicLongArray array =new AtomicLongArray(1000000);
ExecutorService cpuExecutorService = fullCPUEThreadPool.getCPUExecutorService();
for (int j=0;j<100;j++){
cpuExecutorService.execute(()->{
for (int i=0; i<10000;i++){
array.incrementAndGet(i);
}
});
}
System.out.println(array);
}
}