ThreadPoolExecutor幾點應用建議
阿新 • • 發佈:2019-02-15
正文
先看一副圖,描述了ThreadPoolExecutor的工作機制:
整個ThreadPoolExecutor的任務處理有4步操作:
- 第一步,初始的poolSize < corePoolSize,提交的runnable任務,會直接做為new一個Thread的引數,立馬執行
-
第二步,當提交的任務數超過了corePoolSize,就進入了第二步操作。會將當前的runable提交到一個block queue中
- 第三步,如果block queue是個有界佇列,當佇列滿了之後就進入了第三步。如果poolSize < maximumPoolsize時,會嘗試new 一個Thread的進行救急處理,立馬執行對應的runnable任務
- 第四步,如果第三步救急方案也無法處理了,就會走到第四步執行reject操作。
-
block queue有以下幾種實現:
1. ArrayBlockingQueue : 有界的陣列佇列
2. LinkedBlockingQueue : 可支援有界/無界的佇列,使用連結串列實現
3. PriorityBlockingQueue : 優先佇列,可以針對任務排序
4. SynchronousQueue : 佇列長度為1的佇列,和Array有點區別就是:client thread提交到block queue會是一個阻塞過程,直到有一個worker thread連線上來poll task。 -
RejectExecutionHandler是針對任務無法處理時的一些自保護處理:
1. Reject 直接丟擲Reject exception
2. Discard 直接忽略該runnable,不可取
3. DiscardOldest 丟棄最早入佇列的的任務
4. CallsRun 直接讓原先的client thread做為worker執行緒,進行執行
Btrace容量規劃
再提供一個btrace指令碼,分析線上的thread pool容量規劃是否合理,可以執行時輸出poolSize等一些資料。
import static com.sun.btrace.BTraceUtils.addToAggregation;
import static com.sun.btrace.BTraceUtils.field;
import static com.sun.btrace.BTraceUtils.get;
import static com.sun.btrace.BTraceUtils.newAggregation;
import static com.sun.btrace.BTraceUtils.newAggregationKey;
import static com.sun.btrace.BTraceUtils.printAggregation;
import static com.sun.btrace.BTraceUtils.println;
import static com.sun.btrace.BTraceUtils.str;
import static com.sun.btrace.BTraceUtils.strcat;
import java.lang.reflect.Field;
import java.util.concurrent.atomic.AtomicInteger;
import com.sun.btrace.BTraceUtils;
import com.sun.btrace.aggregation.Aggregation;
import com.sun.btrace.aggregation.AggregationFunction;
import com.sun.btrace.aggregation.AggregationKey;
import com.sun.btrace.annotations.BTrace;
import com.sun.btrace.annotations.Kind;
import com.sun.btrace.annotations.Location;
import com.sun.btrace.annotations.OnEvent;
import com.sun.btrace.annotations.OnMethod;
import com.sun.btrace.annotations.OnTimer;
import com.sun.btrace.annotations.Self;
/**
* 並行載入監控
*
* @author jianghang 2011-4-7 下午10:59:53
*/
@BTrace
public class AsyncLoadTracer {
private static AtomicInteger rejecctCount = BTraceUtils.newAtomicInteger(0);
private static Aggregation histogram = newAggregation(AggregationFunction.QUANTIZE);
private static Aggregation average = newAggregation(AggregationFunction.AVERAGE);
private static Aggregation max = newAggregation(AggregationFunction.MAXIMUM);
private static Aggregation min = newAggregation(AggregationFunction.MINIMUM);
private static Aggregation sum = newAggregation(AggregationFunction.SUM);
private static Aggregation count = newAggregation(AggregationFunction.COUNT);
@OnMethod(clazz = "java.util.concurrent.ThreadPoolExecutor", method = "execute", location = @Location(value = Kind.ENTRY))
public static void executeMonitor(@Self Object self) {
Field poolSizeField = field("java.util.concurrent.ThreadPoolExecutor", "poolSize");
Field largestPoolSizeField = field("java.util.concurrent.ThreadPoolExecutor", "largestPoolSize");
Field workQueueField = field("java.util.concurrent.ThreadPoolExecutor", "workQueue");
Field countField = field("java.util.concurrent.ArrayBlockingQueue", "count");
int poolSize = (Integer) get(poolSizeField, self);
int largestPoolSize = (Integer) get(largestPoolSizeField, self);
int queueSize = (Integer) get(countField, get(workQueueField, self));
println(strcat(strcat(strcat(strcat(strcat("poolSize : ", str(poolSize)), " largestPoolSize : "),
str(largestPoolSize)), " queueSize : "), str(queueSize)));
}
@OnMethod(clazz = "java.util.concurrent.ThreadPoolExecutor", method = "reject", location = @Location(value = Kind.ENTRY))
public static void rejectMonitor(@Self Object self) {
String name = str(self);
if (BTraceUtils.startsWith(name, "com.alibaba.pivot.common.asyncload.impl.pool.AsyncLoadThreadPool")) {
BTraceUtils.incrementAndGet(rejecctCount);
}
}
@OnTimer(1000)
public static void rejectPrintln() {
int reject = BTraceUtils.getAndSet(rejecctCount, 0);
println(strcat("reject count in 1000 msec: ", str(reject)));
AggregationKey key = newAggregationKey("rejectCount");
addToAggregation(histogram, key, reject);
addToAggregation(average, key, reject);
addToAggregation(max, key, reject);
addToAggregation(min, key, reject);
addToAggregation(sum, key, reject);
addToAggregation(count, key, reject);
}
@OnEvent
public static void onEvent() {
BTraceUtils.truncateAggregation(histogram, 10);
println("---------------------------------------------");
printAggregation("Count", count);
printAggregation("Min", min);
printAggregation("Max", max);
printAggregation("Average", average);
printAggregation("Sum", sum);
printAggregation("Histogram", histogram);
println("---------------------------------------------");
}
}
執行結果:
poolSize : 1 , largestPoolSize = 10 , queueSize = 10
reject count in 1000 msec: 0
說明:
1. poolSize 代表為當前的執行緒數
2. largestPoolSize 代表為歷史最大的執行緒數
3. queueSize 代表blockqueue的當前堆積的size
4. reject count 代表在1000ms內的被reject的數量
ExecutorService 建立多執行緒的步驟:
1。定義執行緒類 |
class Handler implements Runnable{ } |
2。建立ExecutorService執行緒池 |
ExecutorService executorService = Executors.newCachedThreadPool(); 或者 int cpuNums = Runtime.getRuntime().availableProcessors(); //獲取當前系統的CPU 數目 ExecutorService executorService =Executors.newFixedThreadPool(cpuNums * POOL_SIZE); //ExecutorService通常根據系統資源情況靈活定義執行緒池大小 |
3。呼叫執行緒池操作 |
迴圈操作,成為daemon,把新例項放入Executor池中 while(true){ executorService.execute(new Handler(socket)); // class Handler implements Runnable{ 或者 executorService.execute(createTask(i)); //private static Runnable createTask(final int taskID) } execute(Runnable物件)方法 其實就是對Runnable物件呼叫start()方法 (當然還有一些其他後臺動作,比如佇列,優先順序,IDLE timeout,active啟用等) |
幾種不同的ExecutorService執行緒池物件
1.newCachedThreadPool() |
-快取型池子,先檢視池中有沒有以前建立的執行緒,如果有,就reuse.如果沒有,就建一個新的執行緒加入池中 -快取型池子通常用於執行一些生存期很短的非同步型任務 因此在一些面向連線的daemon型SERVER中用得不多。 -能reuse的執行緒,必須是timeout IDLE內的池中執行緒,預設timeout是60s,超過這個IDLE時長,執行緒例項將被終止及移出池。 注意,放入CachedThreadPool的執行緒不必擔心其結束,超過TIMEOUT不活動,其會自動被終止。 |
2. newFixedThreadPool |
-newFixedThreadPool與cacheThreadPool差不多,也是能reuse就用,但不能隨時建新的執行緒 -其獨特之處:任意時間點,最多隻能有固定數目的活動執行緒存在,此時如果有新的執行緒要建立,只能放在另外的佇列中等待,直到當前的執行緒中某個執行緒終止直接被移出池子 -和cacheThreadPool不同,FixedThreadPool沒有IDLE機制(可能也有,但既然文件沒提,肯定非常長,類似依賴上層的TCP或UDP IDLE機制之類的),所以FixedThreadPool多數針對一些很穩定很固定的正規併發執行緒,多用於伺服器 -從方法的原始碼看,cache池和fixed 池呼叫的是同一個底層池,只不過引數不同: fixed池執行緒數固定,並且是0秒IDLE(無IDLE) cache池執行緒數支援0-Integer.MAX_VALUE(顯然完全沒考慮主機的資源承受能力),60秒IDLE |
3.ScheduledThreadPool |
-排程型執行緒池 -這個池子裡的執行緒可以按schedule依次delay執行,或週期執行 |
4.SingleThreadExecutor |
-單例執行緒,任意時間池中只能有一個執行緒 -用的是和cache池和fixed池相同的底層池,但執行緒數目是1-1,0秒IDLE(無IDLE) |
上面四種執行緒池,都使用Executor的預設執行緒工廠建立執行緒,也可單獨定義自己的執行緒工廠
下面是預設執行緒工廠程式碼:
static class DefaultThreadFactory implements ThreadFactory { static final AtomicInteger poolNumber = new AtomicInteger(1); final ThreadGroup group; final AtomicInteger threadNumber = new AtomicInteger(1); final String namePrefix; DefaultThreadFactory() { SecurityManager s = System.getSecurityManager(); group = (s != null)? s.getThreadGroup() :Thread.currentThread().getThreadGroup(); namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-"; } public Thread newThread(Runnable r) { Thread t = new Thread(group, r,namePrefix + threadNumber.getAndIncrement(),0); if (t.isDaemon()) t.setDaemon(false); if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; } } |
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) { |
Executor的execute()方法
execute() 方法將Runnable例項加入pool中,並進行一些pool size計算和優先順序處理
execute() 方法本身在Executor介面中定義,有多個實現類都定義了不同的execute()方法
如ThreadPoolExecutor類(cache,fiexed,single三種池子都是呼叫它)的execute方法如下:
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) { if (runState == RUNNING && workQueue.offer(command)) { if (runState != RUNNING || poolSize == 0) ensureQueuedTaskHandled(command); } else if (!addIfUnderMaximumPoolSize(command)) reject(command); // is shutdown or saturated } } |