1. 程式人生 > 其它 >java執行緒池優化

java執行緒池優化

ThreadPoolExecutor機制
一、概述
1、ThreadPoolExecutor作為java.util.concurrent包對外提供基礎實現,以內部執行緒池的形式對外提供管理任務執行,執行緒排程,執行緒池管理等等服務;
2、Executors方法提供的執行緒服務,都是通過引數設定來實現不同的執行緒池機制。
3、先來了解其執行緒池管理的機制,有助於正確使用,避免錯誤使用導致嚴重故障。同時可以根據自己的需求實現自己的執行緒池

二、核心構造方法講解
下面是ThreadPoolExecutor最核心的構造方法
Java程式碼
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}

構造方法引數講解
引數名 作用
corePoolSize 核心執行緒池大小
maximumPoolSize 最大執行緒池大小
keepAliveTime 執行緒池中超過corePoolSize數目的空閒執行緒最大存活時間;可以allowCoreThreadTimeOut(true)使得核心執行緒有效時間
TimeUnit keepAliveTime時間單位
workQueue 阻塞任務佇列
threadFactory 新建執行緒工廠
RejectedExecutionHandler 當提交任務數超過maxmumPoolSize+workQueue之和時,任務會交給RejectedExecutionHandler來處理

一、ThreadPoolExecutor的重要引數

corePoolSize:核心執行緒數
核心執行緒會一直存活,及時沒有任務需要執行
當執行緒數小於核心執行緒數時,即使有執行緒空閒,執行緒池也會優先建立新執行緒處理
設定allowCoreThreadTimeout=true(預設false)時,核心執行緒會超時關閉
queueCapacity:任務佇列容量(阻塞佇列)
當核心執行緒數達到最大時,新任務會放在佇列中排隊等待執行
maxPoolSize:最大執行緒數
當執行緒數>=corePoolSize,且任務佇列已滿時。執行緒池會建立新執行緒來處理任務
當執行緒數=maxPoolSize,且任務佇列已滿時,執行緒池會拒絕處理任務而丟擲異常
keepAliveTime:執行緒空閒時間
當執行緒空閒時間達到keepAliveTime時,執行緒會退出,直到執行緒數量=corePoolSize
如果allowCoreThreadTimeout=true,則會直到執行緒數量=0
allowCoreThreadTimeout:允許核心執行緒超時
rejectedExecutionHandler:任務拒絕處理器
兩種情況會拒絕處理任務:
當執行緒數已經達到maxPoolSize,切佇列已滿,會拒絕新任務
當執行緒池被呼叫shutdown()後,會等待執行緒池裡的任務執行完畢,再shutdown。如果在呼叫shutdown()和執行緒池真正shutdown之間提交任務,會拒絕新任務
執行緒池會呼叫rejectedExecutionHandler來處理這個任務。如果沒有設定預設是AbortPolicy,會丟擲異常
ThreadPoolExecutor類有幾個內部實現類來處理這類情況:
AbortPolicy 丟棄任務,拋執行時異常
CallerRunsPolicy 執行任務
DiscardPolicy 忽視,什麼都不會發生
DiscardOldestPolicy 從佇列中踢出最先進入佇列(最後一個執行)的任務
實現RejectedExecutionHandler介面,可自定義處理器

二、ThreadPoolExecutor執行順序:
執行緒池按以下行為執行任務

當執行緒數小於核心執行緒數時,建立執行緒。
當執行緒數大於等於核心執行緒數,且任務佇列未滿時,將任務放入任務佇列。
當執行緒數大於等於核心執行緒數,且任務佇列已滿
若執行緒數小於最大執行緒數,建立執行緒
若執行緒數等於最大執行緒數,丟擲異常,拒絕任務

三、如何設定引數

預設值
corePoolSize=1
queueCapacity=Integer.MAX_VALUE
maxPoolSize=Integer.MAX_VALUE
keepAliveTime=60s
allowCoreThreadTimeout=false
rejectedExecutionHandler=AbortPolicy()
如何來設定
需要根據幾個值來決定
tasks :每秒的任務數,假設為500~1000
taskcost:每個任務花費時間,假設為0.1s
responsetime:系統允許容忍的最大響應時間,假設為1s
做幾個計算
corePoolSize = 每秒需要多少個執行緒處理?
threadcount = tasks/(1/taskcost) =tasks*taskcout = (500~1000)*0.1 = 50~100 個執行緒。corePoolSize設定應該大於50
根據8020原則,如果80%的每秒任務數小於800,那麼corePoolSize設定為80即可
queueCapacity = (coreSizePool/taskcost)*responsetime
計算可得 queueCapacity = 80/0.1*1 = 80。意思是佇列裡的執行緒可以等待1s,超過了的需要新開執行緒來執行
切記不能設定為Integer.MAX_VALUE,這樣佇列會很大,執行緒數只會保持在corePoolSize大小,當任務陡增時,不能新開執行緒來執行,響應時間會隨之陡增。
maxPoolSize = (max(tasks)- queueCapacity)/(1/taskcost)
計算可得 maxPoolSize = (1000-80)/10 = 92
(最大任務數-佇列容量)/每個執行緒每秒處理能力 = 最大執行緒數
rejectedExecutionHandler:根據具體情況來決定,任務不重要可丟棄,任務重要則要利用一些緩衝機制來處理
keepAliveTime和allowCoreThreadTimeout採用預設通常能滿足
以上都是理想值,實際情況下要根據機器效能來決定。如果在未達到最大執行緒數的情況機器cpu load已經滿了,則需要通過升級硬體(呵呵)和優化程式碼,降低taskcost來處理。
————————————————

重點講解:
其中比較容易讓人誤解的是:corePoolSize,maximumPoolSize,workQueue之間關係。

1.當執行緒池小於corePoolSize時,新提交任務將建立一個新執行緒執行任務,即使此時執行緒池中存在空閒執行緒。
2.當執行緒池達到corePoolSize時,新提交任務將被放入workQueue中,等待執行緒池中任務排程執行
3.當workQueue已滿,且maximumPoolSize>corePoolSize時,新提交任務會建立新執行緒執行任務
4.當提交任務數超過maximumPoolSize時,新提交任務由RejectedExecutionHandler處理
5.當執行緒池中超過corePoolSize執行緒,空閒時間達到keepAliveTime時,關閉空閒執行緒
6.當設定allowCoreThreadTimeOut(true)時,執行緒池中corePoolSize執行緒空閒時間達到keepAliveTime也將關閉

執行緒管理機制圖示:


三、Executors提供的執行緒池配置方案

1、構造一個固定執行緒數目的執行緒池,配置的corePoolSize與maximumPoolSize大小相同,同時使用了一個無界LinkedBlockingQueue存放阻塞任務,因此多餘的任務將存在再阻塞佇列,不會由RejectedExecutionHandler處理
Java程式碼
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

2、構造一個緩衝功能的執行緒池,配置corePoolSize=0,maximumPoolSize=Integer.MAX_VALUE,keepAliveTime=60s,以及一個無容量的阻塞佇列 SynchronousQueue,因此任務提交之後,將會建立新的執行緒執行;執行緒空閒超過60s將會銷燬
Java程式碼
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

3、構造一個只支援一個執行緒的執行緒池,配置corePoolSize=maximumPoolSize=1,無界阻塞佇列LinkedBlockingQueue;保證任務由一個執行緒序列執行
Java程式碼
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}

4、構造有定時功能的執行緒池,配置corePoolSize,無界延遲阻塞佇列DelayedWorkQueue;有意思的是:maximumPoolSize=Integer.MAX_VALUE,由於DelayedWorkQueue是無界佇列,所以這個值是沒有意義的
Java程式碼
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}

public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}

public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
new DelayedWorkQueue(), threadFactory);
}

————————————————

總結:
1、用ThreadPoolExecutor自定義執行緒池,看執行緒是的用途,如果任務量不大,可以用無界佇列,如果任務量非常大,要用有界佇列,防止OOM
2、如果任務量很大,還要求每個任務都處理成功,要對提交的任務進行阻塞提交,重寫拒絕機制,改為阻塞提交。保證不拋棄一個任務
3、最大執行緒數一般設為2N+1最好,N是CPU核數
4、核心執行緒數,看應用,如果是任務,一天跑一次,設定為0,合適,因為跑完就停掉了,如果是常用執行緒池,看任務量,是保留一個核心還是幾個核心執行緒數
5、如果要獲取任務執行結果,用CompletionService,但是注意,獲取任務的結果的要重新開一個執行緒獲取,如果在主執行緒獲取,就要等任務都提交後才獲取,就會阻塞大量任務結果,佇列過大OOM,所以最好非同步開個執行緒獲取結果