1. 程式人生 > >執行緒池在execute新增執行緒的策略

執行緒池在execute新增執行緒的策略

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
/*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated
* and so reject the task. */ int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { // 1. if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { // 2. int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) // 3. reject(command); }

1.如果執行的執行緒少於corePoolSize的執行緒,那麼嘗試使用addWorker()方法,加入新的執行緒,在addWorker裡新增失敗會返回false,會自動檢查執行緒執行狀態runState和 工作執行緒數量workerCount,通過返回false來防止錯誤警報,從而在不應該新增 執行緒的時候新增執行緒。

2.否則新增這個任務到等待佇列workQueue,如果一個任務可以成功地進入等待佇列,仍然需要 檢查是否應該新增一個執行緒 (因為自上次檢查以來已有的執行緒已經死亡)或者自進入該方法後池關閉。重新檢查狀態,如果停止,則回滾佇列,如果沒有,則啟動一個新的執行緒。

3.如果不能對任務進行排隊(即等待佇列workQueue塞滿),則嘗試新增一個新的執行緒。如果失敗了(已經執行中的執行緒數量大於在例項化時指定的 maximumPoolSize 會返回false,執行緒池被關閉或者符合停止的條件也會返回false),我們就知道執行緒池被關閉或飽和了,所以拒絕這個任務執行reject()方法,reject方法會執行在初始化執行緒池 傳入的處理策略,預設採用AbortPolicy,丟擲RejectedExecutionException策略拒絕執行。

阿里java規範手冊裡規定,執行緒資源必須通過執行緒池提供。不允許在應用中自行顯式建立執行緒不允許直接使用執行緒池內建的new方法來建立執行緒池。


Executors提供了一系列工廠方法用於創先執行緒池,返回的執行緒池都實現了ExecutorService 介面

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}
public LinkedBlockingQueue() {
    this(Integer.MAX_VALUE);//請求佇列長度為 Integer.MAX_VALUE
}
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,    //允許的建立執行緒數量為 Integer.MAX_VALUE 
60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>()); 
}