1. 程式人生 > >ThreadPoolExecutor裡面4種拒絕策略(詳細)

ThreadPoolExecutor裡面4種拒絕策略(詳細)

ThreadPoolExecutor類實現了ExecutorService介面和Executor介面,可以設定執行緒池corePoolSize,最大執行緒池大小,AliveTime,拒絕策略等。常用構造方法:

ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,

long keepAliveTime, TimeUnit unit,

BlockingQueue<Runnable> workQueue,

RejectedExecutionHandler handler)

corePoolSize: 執行緒池維護執行緒的最少數量

maximumPoolSize:執行緒池維護執行緒的最大數量

keepAliveTime: 執行緒池維護執行緒所允許的空閒時間

unit: 執行緒池維護執行緒所允許的空閒時間的單位

workQueue: 執行緒池所使用的緩衝佇列

handler: 執行緒池對拒絕任務的處理策略

當一個任務通過execute(Runnable)方法欲新增到執行緒池時:

l  如果此時執行緒池中的數量小於corePoolSize,即使執行緒池中的執行緒都處於空閒狀態,也要建立新的執行緒來處理被新增的任務。

2  如果此時執行緒池中的數量等於 corePoolSize,但是緩衝佇列 workQueue未滿,那麼任務被放入緩衝佇列。

3  如果此時執行緒池中的數量大於corePoolSize,緩衝佇列

workQueue滿,並且執行緒池中的數量小於maximumPoolSize,建新的執行緒來處理被新增的任務。

4  如果此時執行緒池中的數量大於corePoolSize,緩衝佇列workQueue滿,並且執行緒池中的數量等於maximumPoolSize,那麼通過 handler所指定的策略來處理此任務。也就是:處理任務的優先順序為:核心執行緒corePoolSize、任務佇列workQueue、最大執行緒maximumPoolSize,如果三者都滿了,使用handler處理被拒絕的任務。  

當執行緒池中的執行緒數量大於 corePoolSize時,如果某執行緒空閒時間超過keepAliveTime

,執行緒將被終止。這樣,執行緒池可以動態的調整池中的執行緒數。

handler有四個選擇:

ThreadPoolExecutor.AbortPolicy()

丟擲java.util.concurrent.RejectedExecutionException異常 ,示例如下:

privatestaticclass Worker implements Runnable {

publicvoid run() {

System.out.println(Thread.currentThread().getName() + " is running");

}

}

publicstaticvoid main(String[] args) {

int corePoolSize = 5;

int maxPoolSize = 10;

long keepAliveTime = 5;

BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(10);

//拒絕策略1:將丟擲 RejectedExecutionException.

RejectedExecutionHandler handler = 

new ThreadPoolExecutor.AbortPolicy();

ThreadPoolExecutor executor = new ThreadPoolExecutor

(corePoolSize, maxPoolSize, 

keepAliveTime, TimeUnit.SECONDS

queue, handler);

for(int i=0; i<100; i++) {

executor.execute(new Worker());

}

executor.shutdown();

}

執行結果如下:

pool-1-thread-2 is running

pool-1-thread-3 is running

Exception in thread "main" java.util.concurrent.RejectedExecutionException

pool-1-thread-1 is running

pool-1-thread-7 is running

pool-1-thread-6 is running

pool-1-thread-4 is running

pool-1-thread-9 is running

pool-1-thread-8 is running

pool-1-thread-5 is running

at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:1760)

at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:767)

at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:658)

at concurrent.ThreadPoolDemo.main(ThreadPoolDemo.java:33)

pool-1-thread-10 is running

處理原始碼如下:

publicvoid rejectedExecution(Runnable r, ThreadPoolExecutor e) {

thrownewRejectedExecutionException();

        }

策略2ThreadPoolExecutor.CallerRunsPolicy
用於被拒絕任務的處理程式,它直接在 execute 方法的呼叫執行緒中執行被拒絕的任務;如果執行程式已關閉,則會丟棄該任務。如下:

RejectedExecutionHandler handler = 

new ThreadPoolExecutor.CallerRunsPolicy();

執行如下:

pool-1-thread-7 is running

pool-1-thread-7 is running

pool-1-thread-7 is running

pool-1-thread-7 is running

pool-1-thread-7 is running

pool-1-thread-7 is running

pool-1-thread-2 is running

pool-1-thread-3 is running

pool-1-thread-1 is running

pool-1-thread-8 is running

main is running

main is running

main is running

pool-1-thread-4 is running

pool-1-thread-7 is running

pool-1-thread-7 is running

pool-1-thread-7 is running

處理原始碼如下:

publicvoid rejectedExecution(Runnable r, ThreadPoolExecutor e) {

if (!e.isShutdown()) {

                r.run();

            }

        }

策略3

RejectedExecutionHandler handler = 

new ThreadPoolExecutor.DiscardOldestPolicy();

這樣執行結果就不會有100個執行緒全部被執行。處理原始碼如下:

publicvoid rejectedExecution(Runnable r, ThreadPoolExecutor e) {

if (!e.isShutdown()) {

                e.getQueue().poll();

                e.execute(r);

            }

        }

執行結果也不會全部執行100個執行緒。

原始碼如下,實際就是對執行緒不執行操作:

publicstaticclass DiscardPolicy implements RejectedExecutionHandler {

/**

*Createsa<tt>DiscardPolicy</tt>.

*/

public DiscardPolicy() { }

/**

*Doesnothing,whichhastheeffectofdiscardingtaskr.

*@paramrtherunnabletaskrequestedtobeexecuted

*@parametheexecutorattemptingtoexecutethistask

*/

publicvoidrejectedExecution(Runnable r, ThreadPoolExecutor e) {

        }

    }

這四種策略是獨立無關的,是對任務拒絕處理的四中表現形式。最簡單的方式就是直接丟棄任務。但是卻有兩種方式,到底是該丟棄哪一個任務,比如可以丟棄當前將要加入佇列的任務本身(DiscardPolicy)或者丟棄任務佇列中最舊任務(DiscardOldestPolicy)。丟棄最舊任務也不是簡單的丟棄最舊的任務,而是有一些額外的處理。除了丟棄任務還可以直接丟擲一個異常(RejectedExecutionException),這是比較簡單的方式。丟擲異常的方式(AbortPolicy)儘管實現方式比較簡單,但是由於丟擲一個RuntimeException,因此會中斷呼叫者的處理過程。除了丟擲異常以外還可以不進入執行緒池執行,在這種方式(CallerRunsPolicy)中任務將有呼叫者執行緒去執行。 

使用者自定義拒絕策略

實現RejectedExecutionHandler,並自己定義策略模式


再次需要注意的是,ThreadPoolExecutor.submit() 函式,此方法內部呼叫的execute方法,並把execute執行完後的結果給返回,但如果任務並沒有執行的話(被拒絕了),則submit返回的future.get()會一直等到。

future 內部其實還是一個runnable,並把command給封裝了下,當command執行完後,future會返回一個值。