自定義執行緒池ThreadPoolExecutor
阿新 • • 發佈:2020-09-09
使用自定義的方式建立執行緒池
Java本身提供的獲取執行緒池的方式
使用Executors直接獲取執行緒池,注意,前四個方式的底層都是通過new ThreadPoolExecutor()的方式建立的執行緒池,只是引數不一樣而已,我們也正是利用了這點特性來實現自己的執行緒池
1. newCachedThreadPool
建立一個可快取無限制數量的執行緒池, 如果執行緒池中沒有空閒的執行緒的話,再來任務會新建執行緒, 執行緒60s內沒被使用,則銷燬。 簡單的說,忙不過來的時候就新建執行緒 Executors.newCachedThreadPool() 底層實現 public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } 常駐核心執行緒數為0 執行緒池最大執行緒數Integer.MAX_VALUE 執行緒空閒60S回收 使用工作佇列來存放任務,這個佇列有好幾種,各有各的特性。 **風險:**由於執行緒池最大執行緒池為Integer.MAX_VALUE,所以有OOM的風險
2. newFixedThreadPool
建立一直指定大小的執行緒池,如果執行緒池滿了,後面的任務會在佇列中等待,等拿到空閒的執行緒才能執行 Executors.newFixedThreadPool(nThreads) 底層實現 public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } 常駐核心執行緒數自定義 執行緒池最大執行緒數等於常駐核心執行緒數 因為最大執行緒數等於常駐核心執行緒,而常駐核心執行緒不會被回收,所以時間引數為0 使用工作佇列來存放任務,這個佇列有好幾種,各有各的特性。 **風險:**由於佇列沒有指明長度,預設為Integer.MAX_VALUE,所以有OOM的風險
3. newSingleThreadExecutor
建立一個大小為1的執行緒池,用唯一的執行緒來執行任務,保證任務有序進行 Executors.newSingleThreadExecutor() 底層實現 public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } 常駐核心執行緒數為1 執行緒池最大執行緒數等於常駐核心執行緒數1 執行緒池裡一共就一個常駐核心執行緒,所以不會被回收,所以時間引數為0 使用工作佇列來存放任務,這個佇列有好幾種,各有各的特性。 **風險:**由於佇列沒有指明長度,預設為Integer.MAX_VALUE,所以有OOM的風險
4. newScheduledThreadPool
建立指定大小的執行緒池,支援定時及週期性的執行任務
Executors.newScheduledThreadPool(corePoolSize)
底層實現
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
ScheduledThreadPoolExecutor繼承了ThreadPoolExecutor,一下為ScheduledThreadPoolExecutor的構造方法
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
常駐核心執行緒數自定義
執行緒池最大執行緒數等於Integer.MAX_VALUE
不回收執行緒
使用工作佇列來存放任務,這個佇列有好幾種,各有各的特性。
**風險:**由於執行緒池最大執行緒池為Integer.MAX_VALUE,所以有OOM的風險
5. newWorkStealingPool
JDK1.8 引入
建立持有足夠執行緒的執行緒池來支援給定的並行級別,
並通過使用多個佇列減少競爭,並行級別的引數,如果不傳,預設為cpu的數量,
返回的不再是 ThreadPoolExecutor 而是 ForkJoinPool
Executors.newWorkStealingPool()
底層實現為**ForkJoinPool**,和上面的四個不同
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
正在研究ing
在阿里巴巴Java開發手冊中明確指出,不允許使用jdk自帶的方式獲取執行緒池。就是上面的前四個方法,所以,我們自己建立即可
建立自定義的執行緒工廠
public class ThreadFactoryImpl implements ThreadFactory {
/**
* 執行緒池號
*/
private static final AtomicInteger poolNumber = new AtomicInteger(1);
/**
* 執行緒字首名稱
*/
private final String namePrefix;
/**
* 建立初始值為1且執行緒安全的執行緒號
*/
private final AtomicInteger threadNumber = new AtomicInteger(1);
public ThreadFactoryImpl(String whatFeatureOfGroup) {
namePrefix = "ThreadFactoryImpl's " + whatFeatureOfGroup + "-work-";
}
@Override
public Thread newThread(Runnable r) {
int threadNextId = threadNumber.getAndIncrement();
String name = namePrefix + threadNextId;
Thread thread = new Thread(null, r, name, 0);
System.out.println("建立的第"+threadNextId+"個執行緒");
return thread;
}
}
AtomicInteger 實現了原子性,保證了高併發下的執行緒安全,該系類還有很多。
我們可以在自定義的執行緒工廠裡面新增我們需要的內容
不指定的話,會是預設的執行緒工廠
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
public static ThreadFactory defaultThreadFactory() {
return new DefaultThreadFactory();
}
建立自定義執行緒池拒絕策略
public class ThreadPoolRejectHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println("task rejected. "+executor.toString());
}
}
在ThreadPoolExecutor中提供了四個公開的內部靜態類
1. AbortPolicy(預設):丟棄任務並丟擲RejectedExecutionException異常
2. DiscardPloicy:丟棄任務,不丟擲異常(不推薦使用)
3. DiscardOldestPolicy:丟棄佇列中等待最久的任務,把當前任務加入到佇列中
4. CallerRunsPolicy:繞過執行緒池,直接呼叫任務的run()方法。
**根據需求選中合適的策略才是正確的**
實現我們的執行緒池
public class ThreadPoolUtil {
/**
* @param corePoolSize 常駐核心執行緒,執行緒池初始化的時候池裡是沒有執行緒的,前面 corePoolSize 個任務是會建立執行緒,
* 當前執行緒池中的數量大於常駐核心執行緒數的時候,如果有空閒的執行緒則使用,沒有的話就把任務放到
* 工作佇列中
* @param maximumPoolSize 執行緒池允許建立的最大執行緒數,如果佇列滿了,且執行緒數小於最大執行緒數,則新建臨時執行緒(空閒超過時間會被銷燬的),
* 如果佇列為無界佇列,則該引數無用
* @param workQueueSize 工作佇列,請求執行緒數大於常駐核心執行緒數的時候,將多餘的任務放到工作佇列
* @param threadName 執行緒名稱
* @param handler 執行緒池拒絕策略,當執行緒池和佇列都滿了,則呼叫該策略,執行具體的邏輯
* @author: taoym
* @date: 2020/9/9 11:35
* @desc: 自定義執行緒池的實現 總體邏輯就是 前corePoolSize個任務時,來一個任務就建立一個執行緒
* 如果當前執行緒池的執行緒數大於了corePoolSize那麼接下來再來的任務就會放入到我們上面設定的workQueue佇列中
* 如果此時workQueue也滿了,那麼再來任務時,就會新建臨時執行緒,那麼此時如果我們設定了keepAliveTime或者設定了allowCoreThreadTimeOut,那麼系統就會進行執行緒的活性檢查,一旦超時便銷燬執行緒
* 如果此時執行緒池中的當前執行緒大於了maximumPoolSize最大執行緒數,那麼就會執行我們剛才設定的handler拒絕策略
*/
public static ExecutorService createThreadPool(int corePoolSize,
int maximumPoolSize,
int workQueueSize,
String threadName,
RejectedExecutionHandler handler) {
BlockingQueue workQueue = new LinkedBlockingDeque(workQueueSize);
ThreadFactoryImpl threadFactory = new ThreadFactoryImpl(threadName);
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 60L, TimeUnit.SECONDS, workQueue, handler);
// 提前建立好核心執行緒
//threadPoolExecutor.prestartAllCoreThreads();
// 常駐核心執行緒的空閒時間超過 keepAliveTime 的時候要被回收
//threadPoolExecutor.allowCoreThreadTimeOut(true);
return threadPoolExecutor;
}
}
註釋寫的很明白了,desc下的註釋來自springForAll的文章。別的都是自己找的加上自己所理解的編寫而成。
文章是剛研究了《碼出高效》的執行緒池篇加上對原始碼文件的理解,趁熱打鐵寫出來的,寫的不好,多多見諒。