ScheduleThreadPoolExecutor執行緒池分析
ScheduleThreadPoolExecutor是官方推薦的取代Timer作定時任務的執行緒池,在研究ScheduleThreadPoolExecutor過程中發現此執行緒池無論什麼時候都只會有核心執行緒數coreSize個執行緒在工作。
這樣就有個問題,如果任務較為繁重的情況下,或者單個任務執行時間較長的情況下,是否需要調整coreSize的大小。 先從ScheduleThreadPoolExecutor的構造函數出發: 只說最複雜構造函數了,其引數為corePoolSize核心執行緒數,threadFactory執行緒工廠,handler拒絕處理策略。建構函式中呼叫了父類ThreadPoolExecutor的建構函式,傳入的引數中第二個引數為執行緒池最大執行緒數Integer.MAX_VALUE,第三個引數執行緒保持時間為0,第五個引數為任務佇列DelayedWorkQueue,這幾個引數為預設傳入引數。
public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory, handler); }
從以上建構函式看,ScheduleThreadPoolExecutor的核心執行緒數可由建構函式傳入,最大執行緒數為Integer的最大值,任務佇列為DelayedWorkQueue。
研究DelayedWorkQueue類原始碼可知它就是一個小根堆,排序規則按的是任務執行時間,堆頂為最快執行的任務,內部資料結構是一個數組,由於Java是靜態語言,陣列滿時擴容每次擴容50%。通過siftUp和siftDown方法維持小根堆特徵,時間複雜度為O(ln), 通過grow()進行擴容。
/** * Sifts element added at bottom up to its heap-ordered spot. * Call only when holding lock. */ private void siftUp(int k, RunnableScheduledFuture<?> key) { while (k > 0) { int parent = (k - 1) >>> 1; RunnableScheduledFuture<?> e = queue[parent]; if (key.compareTo(e) >= 0) break; queue[k] = e; setIndex(e, k); k = parent; } queue[k] = key; setIndex(key, k); } /** * Sifts element added at top down to its heap-ordered spot. * Call only when holding lock. */ private void siftDown(int k, RunnableScheduledFuture<?> key) { int half = size >>> 1; while (k < half) { int child = (k << 1) + 1; RunnableScheduledFuture<?> c = queue[child]; int right = child + 1; if (right < size && c.compareTo(queue[right]) > 0) c = queue[child = right]; if (key.compareTo(c) <= 0) break; queue[k] = c; setIndex(c, k); k = child; } queue[k] = key; setIndex(key, k); } /** * Resizes the heap array. Call only when holding lock. */ private void grow() { int oldCapacity = queue.length; int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50% if (newCapacity < 0) // overflow newCapacity = Integer.MAX_VALUE; queue = Arrays.copyOf(queue, newCapacity); }
從這裡可以看出這是個無界佇列,根據執行緒池執行緒增長流程,如果執行緒數小於核心執行緒數,會建立執行緒執行任務,如果執行緒數等於核心執行緒數,並且佇列未滿的情況下,放到佇列中,並不會建立新執行緒,這樣ScheduleThreadPoolExecutor將永遠只會有核心執行緒數個執行緒。
下面為測試程式碼:
public class TestScheduleThreadPoolExecutor {
public static final ScheduledThreadPoolExecutor stpe = new ScheduledThreadPoolExecutor(2);
public static int index = 0;
public static void main(String[] args) {
Thread scheduleThread = new Thread(new Runnable() {
@Override
public void run() {
while(true) {
stpe.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
System.out.println(index++);
try {
Thread.sleep(1000);
}
catch (InterruptedException e) {
e.printStackTrace();
}
}
}, 1, 10, TimeUnit.SECONDS);
try {
Thread.sleep(200);
}
catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
scheduleThread.start();
Thread recordThread = new Thread(new Runnable() {
@Override
public void run() {
while(true) {
int number = stpe.getActiveCount();
System.out.println("Thread number: "+ number + ", TaskCount: " + stpe.getTaskCount() +", LargestPoolSize: " + stpe.getLargestPoolSize());
try {
Thread.sleep(200);
}
catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
recordThread.start();
}
}
測試結果:
Thread number: 2, TaskCount: 180, LargestPoolSize: 2
51
Thread number: 2, TaskCount: 182, LargestPoolSize: 2
Thread number: 2, TaskCount: 183, LargestPoolSize: 2
Thread number: 2, TaskCount: 184, LargestPoolSize: 2
Thread number: 2, TaskCount: 185, LargestPoolSize: 2
52
Thread number: 2, TaskCount: 187, LargestPoolSize: 2
53
Thread number: 2, TaskCount: 189, LargestPoolSize: 2
Thread number: 2, TaskCount: 190, LargestPoolSize: 2
Thread number: 2, TaskCount: 191, LargestPoolSize: 2
Thread number: 2, TaskCount: 192, LargestPoolSize: 2
54
Thread number: 2, TaskCount: 194, LargestPoolSize: 2
55
Thread number: 2, TaskCount: 196, LargestPoolSize: 2
Thread number: 2, TaskCount: 197, LargestPoolSize: 2
在任務個數不斷增長的情況下,執行緒數一直保持兩個。