1. 程式人生 > >ScheduleThreadPoolExecutor執行緒池分析

ScheduleThreadPoolExecutor執行緒池分析

ScheduleThreadPoolExecutor是官方推薦的取代Timer作定時任務的執行緒池,在研究ScheduleThreadPoolExecutor過程中發現此執行緒池無論什麼時候都只會有核心執行緒數coreSize個執行緒在工作

這樣就有個問題,如果任務較為繁重的情況下,或者單個任務執行時間較長的情況下,是否需要調整coreSize的大小。 先從ScheduleThreadPoolExecutor的構造函數出發: 只說最複雜構造函數了,其引數為corePoolSize核心執行緒數,threadFactory執行緒工廠,handler拒絕處理策略。建構函式中呼叫了父類ThreadPoolExecutor的建構函式,傳入的引數中第二個引數為執行緒池最大執行緒數Integer.MAX_VALUE,第三個引數執行緒保持時間為0,第五個引數為任務佇列DelayedWorkQueue,這幾個引數為預設傳入引數。

public ScheduledThreadPoolExecutor(int corePoolSize,
                                   ThreadFactory threadFactory,
                                   RejectedExecutionHandler handler) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue(), threadFactory, handler);
}

從以上建構函式看,ScheduleThreadPoolExecutor的核心執行緒數可由建構函式傳入,最大執行緒數為Integer的最大值,任務佇列為DelayedWorkQueue。

研究DelayedWorkQueue類原始碼可知它就是一個小根堆,排序規則按的是任務執行時間,堆頂為最快執行的任務,內部資料結構是一個數組,由於Java是靜態語言,陣列滿時擴容每次擴容50%。通過siftUp和siftDown方法維持小根堆特徵,時間複雜度為O(ln), 通過grow()進行擴容。

      /**
     * Sifts element added at bottom up to its heap-ordered spot.
     * Call only when holding lock.
     */
    private void siftUp(int k, RunnableScheduledFuture<?> key) {
        while (k > 0) {
            int parent = (k - 1) >>> 1;
            RunnableScheduledFuture<?> e = queue[parent];
            if (key.compareTo(e) >= 0)
                break;
            queue[k] = e;
            setIndex(e, k);
            k = parent;
        }
        queue[k] = key;
        setIndex(key, k);
    }

    /**
     * Sifts element added at top down to its heap-ordered spot.
     * Call only when holding lock.
     */
    private void siftDown(int k, RunnableScheduledFuture<?> key) {
        int half = size >>> 1;
        while (k < half) {
            int child = (k << 1) + 1;
            RunnableScheduledFuture<?> c = queue[child];
            int right = child + 1;
            if (right < size && c.compareTo(queue[right]) > 0)
                c = queue[child = right];
            if (key.compareTo(c) <= 0)
                break;
            queue[k] = c;
            setIndex(c, k);
            k = child;
        }
        queue[k] = key;
        setIndex(key, k);
    }
    
	/**
     * Resizes the heap array.  Call only when holding lock.
     */
    private void grow() {
        int oldCapacity = queue.length;
        int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
        if (newCapacity < 0) // overflow
            newCapacity = Integer.MAX_VALUE;
        queue = Arrays.copyOf(queue, newCapacity);
    }

從這裡可以看出這是個無界佇列,根據執行緒池執行緒增長流程,如果執行緒數小於核心執行緒數,會建立執行緒執行任務,如果執行緒數等於核心執行緒數,並且佇列未滿的情況下,放到佇列中,並不會建立新執行緒,這樣ScheduleThreadPoolExecutor將永遠只會有核心執行緒數個執行緒。

下面為測試程式碼:

public class TestScheduleThreadPoolExecutor {

public static final ScheduledThreadPoolExecutor stpe = new ScheduledThreadPoolExecutor(2);
public static int index = 0;

public static void main(String[] args) {

   Thread scheduleThread = new Thread(new Runnable() {

        @Override
        public void run() {

            while(true) {
              
                stpe.scheduleAtFixedRate(new Runnable() {
                    
                    @Override
                    public void run() {
    
                        System.out.println(index++);
                        try {
                            Thread.sleep(1000);
                        }
                        catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }  
                }, 1, 10, TimeUnit.SECONDS);
                
                try {
                    Thread.sleep(200);
                }
                catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }  
   });
   
   scheduleThread.start();
   
   Thread recordThread = new Thread(new Runnable() {

       @Override
       public void run() {

           while(true) {
             
               int number = stpe.getActiveCount();
               System.out.println("Thread number: "+ number + ", TaskCount: " + stpe.getTaskCount() +", LargestPoolSize: " + stpe.getLargestPoolSize());
               try {
                   Thread.sleep(200);
               }
               catch (InterruptedException e) {
                   e.printStackTrace();
               }
           }
       }  
  });  
   recordThread.start();
}
}

測試結果:

Thread number: 2, TaskCount: 180, LargestPoolSize: 2
51
Thread number: 2, TaskCount: 182, LargestPoolSize: 2
Thread number: 2, TaskCount: 183, LargestPoolSize: 2
Thread number: 2, TaskCount: 184, LargestPoolSize: 2
Thread number: 2, TaskCount: 185, LargestPoolSize: 2
52
Thread number: 2, TaskCount: 187, LargestPoolSize: 2
53
Thread number: 2, TaskCount: 189, LargestPoolSize: 2
Thread number: 2, TaskCount: 190, LargestPoolSize: 2
Thread number: 2, TaskCount: 191, LargestPoolSize: 2
Thread number: 2, TaskCount: 192, LargestPoolSize: 2
54
Thread number: 2, TaskCount: 194, LargestPoolSize: 2
55
Thread number: 2, TaskCount: 196, LargestPoolSize: 2
Thread number: 2, TaskCount: 197, LargestPoolSize: 2

在任務個數不斷增長的情況下,執行緒數一直保持兩個。