1. 程式人生 > >ThreadPoolExecutor幾點應用建議

ThreadPoolExecutor幾點應用建議

正文

先看一副圖,描述了ThreadPoolExecutor的工作機制: 

整個ThreadPoolExecutor的任務處理有4步操作:

  • 第一步,初始的poolSize < corePoolSize,提交的runnable任務,會直接做為new一個Thread的引數,立馬執行
  • 第二步,當提交的任務數超過了corePoolSize,就進入了第二步操作。會將當前的runable提交到一個block queue中
  • 第三步,如果block queue是個有界佇列,當佇列滿了之後就進入了第三步。如果poolSize < maximumPoolsize時,會嘗試new 一個Thread的進行救急處理,立馬執行對應的runnable任務
  • 第四步,如果第三步救急方案也無法處理了,就會走到第四步執行reject操作。
幾點說明:(相信這些網上一搜一大把,我這裡簡單介紹下,為後面做一下鋪墊)
  • block queue有以下幾種實現:
    1. ArrayBlockingQueue :  有界的陣列佇列
    2. LinkedBlockingQueue : 可支援有界/無界的佇列,使用連結串列實現
    3. PriorityBlockingQueue : 優先佇列,可以針對任務排序
    4. SynchronousQueue : 佇列長度為1的佇列,和Array有點區別就是:client thread提交到block queue會是一個阻塞過程,直到有一個worker thread連線上來poll task。
  • RejectExecutionHandler是針對任務無法處理時的一些自保護處理:
    1. Reject 直接丟擲Reject exception
    2. Discard 直接忽略該runnable,不可取
    3. DiscardOldest 丟棄最早入佇列的的任務
    4. CallsRun 直接讓原先的client thread做為worker執行緒,進行執行
容易被人忽略的點: 1.  pool threads啟動後,以後的任務獲取都會通過block queue中,獲取堆積的runnable task. 所以建議: block size >= corePoolSize ,不然執行緒池就沒任何意義
2.  corePoolSize 和 maximumPoolSize的區別, 和大家正常理解的資料庫連線池不太一樣。   *  據dbcp pool為例,會有minIdle , maxActive配置。minIdle代表是常駐記憶體中的threads數量,maxActive代表是工作的最大執行緒數。   *  這裡的corePoolSize就是連線池的maxActive的概念,它沒有minIdle的概念(每個執行緒可以設定keepAliveTime,超過多少時間多有任務後銷燬執行緒,預設只會針對maximumPoolSize引數的執行緒生效,可以設定allowCoreThreadTimeOut=true,就可以對corePoolSize進行idle回收)。    * 這裡的maximumPoolSize,是一種救急措施的第一層。當threadPoolExecutor的工作threads存在滿負荷,並且block queue佇列也滿了,這時代表接近崩潰邊緣。這時允許臨時起一批threads,用來處理runnable,處理完後通過keepAliveTime進行排程回收。 所以建議:  maximumPoolSize >= corePoolSize =期望的最大執行緒數。 (我曾經配置了corePoolSize=1, maximumPoolSize=20, blockqueue為無界佇列,最後就成了單執行緒工作的pool。典型的配置錯誤) 3. 善用blockqueue和reject組合. 這裡要重點推薦下CallsRun的Rejected Handler,從字面意思就是讓呼叫者自己來執行。 我們經常會在線上使用一些執行緒池做非同步處理,比如我前面做的(業務層)非同步並行載入技術分析和設計, 將原本序列的請求都變為了並行操作,但過多的並行會增加系統的負載(比如軟中斷,上下文切換)。所以肯定需要對執行緒池做一個size限制。但是為了引入非同步操作後,避免因在block queue的等待時間過長,所以需要在佇列滿的時,執行一個callsRun的策略,並行的操作又轉為一個序列處理,這樣就可以保證儘量少的延遲影響。 所以建議:  RejectExecutionHandler = CallsRun ,  blockqueue size = 2 * poolSize (為啥是2倍poolSize,主要一個考慮就是瞬間高峰處理,允許一個thread等待一個runnable任務)

Btrace容量規劃

再提供一個btrace指令碼,分析線上的thread pool容量規劃是否合理,可以執行時輸出poolSize等一些資料。

import static com.sun.btrace.BTraceUtils.addToAggregation;
import static com.sun.btrace.BTraceUtils.field;
import static com.sun.btrace.BTraceUtils.get;
import static com.sun.btrace.BTraceUtils.newAggregation;
import static com.sun.btrace.BTraceUtils.newAggregationKey;
import static com.sun.btrace.BTraceUtils.printAggregation;
import static com.sun.btrace.BTraceUtils.println;
import static com.sun.btrace.BTraceUtils.str;
import static com.sun.btrace.BTraceUtils.strcat;

import java.lang.reflect.Field;
import java.util.concurrent.atomic.AtomicInteger;

import com.sun.btrace.BTraceUtils;
import com.sun.btrace.aggregation.Aggregation;
import com.sun.btrace.aggregation.AggregationFunction;
import com.sun.btrace.aggregation.AggregationKey;
import com.sun.btrace.annotations.BTrace;
import com.sun.btrace.annotations.Kind;
import com.sun.btrace.annotations.Location;
import com.sun.btrace.annotations.OnEvent;
import com.sun.btrace.annotations.OnMethod;
import com.sun.btrace.annotations.OnTimer;
import com.sun.btrace.annotations.Self;

/**
 * 並行載入監控
 * 
 * @author jianghang 2011-4-7 下午10:59:53
 */
@BTrace
public class AsyncLoadTracer {

    private static AtomicInteger rejecctCount = BTraceUtils.newAtomicInteger(0);
    private static Aggregation   histogram    = newAggregation(AggregationFunction.QUANTIZE);
    private static Aggregation   average      = newAggregation(AggregationFunction.AVERAGE);
    private static Aggregation   max          = newAggregation(AggregationFunction.MAXIMUM);
    private static Aggregation   min          = newAggregation(AggregationFunction.MINIMUM);
    private static Aggregation   sum          = newAggregation(AggregationFunction.SUM);
    private static Aggregation   count        = newAggregation(AggregationFunction.COUNT);

    @OnMethod(clazz = "java.util.concurrent.ThreadPoolExecutor", method = "execute", location = @Location(value = Kind.ENTRY))
    public static void executeMonitor(@Self Object self) {
        Field poolSizeField = field("java.util.concurrent.ThreadPoolExecutor", "poolSize");
        Field largestPoolSizeField = field("java.util.concurrent.ThreadPoolExecutor", "largestPoolSize");
        Field workQueueField = field("java.util.concurrent.ThreadPoolExecutor", "workQueue");

        Field countField = field("java.util.concurrent.ArrayBlockingQueue", "count");
        int poolSize = (Integer) get(poolSizeField, self);
        int largestPoolSize = (Integer) get(largestPoolSizeField, self);
        int queueSize = (Integer) get(countField, get(workQueueField, self));

        println(strcat(strcat(strcat(strcat(strcat("poolSize : ", str(poolSize)), " largestPoolSize : "),
                                     str(largestPoolSize)), " queueSize : "), str(queueSize)));
    }

    @OnMethod(clazz = "java.util.concurrent.ThreadPoolExecutor", method = "reject", location = @Location(value = Kind.ENTRY))
    public static void rejectMonitor(@Self Object self) {
        String name = str(self);
        if (BTraceUtils.startsWith(name, "com.alibaba.pivot.common.asyncload.impl.pool.AsyncLoadThreadPool")) {
            BTraceUtils.incrementAndGet(rejecctCount);
        }
    }

    @OnTimer(1000)
    public static void rejectPrintln() {
        int reject = BTraceUtils.getAndSet(rejecctCount, 0);
        println(strcat("reject count in 1000 msec: ", str(reject)));
        AggregationKey key = newAggregationKey("rejectCount");
        addToAggregation(histogram, key, reject);
        addToAggregation(average, key, reject);
        addToAggregation(max, key, reject);
        addToAggregation(min, key, reject);
        addToAggregation(sum, key, reject);
        addToAggregation(count, key, reject);
    }

    @OnEvent
    public static void onEvent() {
        BTraceUtils.truncateAggregation(histogram, 10);
        println("---------------------------------------------");
        printAggregation("Count", count);
        printAggregation("Min", min);
        printAggregation("Max", max);
        printAggregation("Average", average);
        printAggregation("Sum", sum);
        printAggregation("Histogram", histogram);
        println("---------------------------------------------");
    }
}

執行結果:

poolSize : 1 , largestPoolSize = 10 , queueSize = 10
reject count in 1000 msec: 0

說明:

1. poolSize 代表為當前的執行緒數

2. largestPoolSize 代表為歷史最大的執行緒數

3. queueSize 代表blockqueue的當前堆積的size

4. reject count 代表在1000ms內的被reject的數量

ExecutorService 建立多執行緒的步驟:

1。定義執行緒類 class Handler implements Runnable{
}
2。建立ExecutorService執行緒池 ExecutorService executorService = Executors.newCachedThreadPool();

或者

int cpuNums = Runtime.getRuntime().availableProcessors();
                //獲取當前系統的CPU 數目
ExecutorService executorService =Executors.newFixedThreadPool(cpuNums * POOL_SIZE);
                //ExecutorService通常根據系統資源情況靈活定義執行緒池大小
3。呼叫執行緒池操作 迴圈操作,成為daemon,把新例項放入Executor池中
      while(true){
        executorService.execute(new Handler(socket)); 
           // class Handler implements Runnable{
        或者
        executorService.execute(createTask(i));
            //private static Runnable createTask(final int taskID)
      }

execute(Runnable物件)方法
其實就是對Runnable物件呼叫start()方法
(當然還有一些其他後臺動作,比如佇列,優先順序,IDLE timeout,active啟用等)


幾種不同的ExecutorService執行緒池物件
1.newCachedThreadPool()  -快取型池子,先檢視池中有沒有以前建立的執行緒,如果有,就reuse.如果沒有,就建一個新的執行緒加入池中
-快取型池子通常用於執行一些生存期很短的非同步型任務
 因此在一些面向連線的daemon型SERVER中用得不多。
-能reuse的執行緒,必須是timeout IDLE內的池中執行緒,預設timeout是60s,超過這個IDLE時長,執行緒例項將被終止及移出池。
  注意,放入CachedThreadPool的執行緒不必擔心其結束,超過TIMEOUT不活動,其會自動被終止。
2. newFixedThreadPool -newFixedThreadPool與cacheThreadPool差不多,也是能reuse就用,但不能隨時建新的執行緒
-其獨特之處:任意時間點,最多隻能有固定數目的活動執行緒存在,此時如果有新的執行緒要建立,只能放在另外的佇列中等待直到當前的執行緒中某個執行緒終止直接被移出池子
-和cacheThreadPool不同,FixedThreadPool沒有IDLE機制(可能也有,但既然文件沒提,肯定非常長,類似依賴上層的TCP或UDP IDLE機制之類的),所以FixedThreadPool多數針對一些很穩定很固定的正規併發執行緒,多用於伺服器
-從方法的原始碼看,cache池和fixed 池呼叫的是同一個底層池,只不過引數不同:
fixed池執行緒數固定,並且是0秒IDLE(無IDLE)
cache池執行緒數支援0-Integer.MAX_VALUE(顯然完全沒考慮主機的資源承受能力),60秒IDLE  
3.ScheduledThreadPool -排程型執行緒池
-這個池子裡的執行緒可以按schedule依次delay執行,或週期執行
4.SingleThreadExecutor -單例執行緒,任意時間池中只能有一個執行緒
-用的是和cache池和fixed池相同的底層池,但執行緒數目是1-1,0秒IDLE(無IDLE)


上面四種執行緒池,都使用Executor的預設執行緒工廠建立執行緒,也可單獨定義自己的執行緒工廠
下面是預設執行緒工廠程式碼:
    static class DefaultThreadFactory implements ThreadFactory {
        static final AtomicInteger poolNumber = new AtomicInteger(1);
        final ThreadGroup group;
        final AtomicInteger threadNumber = new AtomicInteger(1);
        final String namePrefix;

        DefaultThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null)? s.getThreadGroup() :Thread.currentThread().getThreadGroup();
          
            namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-";
        }

        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,namePrefix + threadNumber.getAndIncrement(),0);
            if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    }
也可自己定義ThreadFactory,加入建立池的引數中
 public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {


Executor的execute()方法
execute() 方法將Runnable例項加入pool中,並進行一些pool size計算和優先順序處理
execute() 方法本身在Executor介面中定義,有多個實現類都定義了不同的execute()方法
如ThreadPoolExecutor類(cache,fiexed,single三種池子都是呼叫它)的execute方法如下:
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
            if (runState == RUNNING && workQueue.offer(command)) {
                if (runState != RUNNING || poolSize == 0)
                    ensureQueuedTaskHandled(command);
            }
            else if (!addIfUnderMaximumPoolSize(command))
                reject(command); // is shutdown or saturated
        }
    }