Java執行緒池拒絕策略
現線上程池有一個任務佇列,用於快取所有待處理的任務,正在處理的任務將從任務佇列中移除。因此在任務佇列長度有限的情況下,再新增任務就會出現任務被拒絕加入到佇列處理的情況,需要有一種策略來處理應該加入任務佇列卻因為佇列已滿無法加入的情況。另外線上程池關閉的時候也需要對任務加入佇列操作進行額外的協調處理。
在JDK的RejectedExecutionHandler介面中,提供了四種方式來處理任務拒絕策略:
1、直接丟棄(DiscardPolicy)
程式碼
package com.courage; import java.text.SimpleDateFormat; import java.util.Date; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /* 直接丟棄策略 */ public class DiscardPolicyDemo { private static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); public static void main(String[] args) { //定義執行緒池核心數 int corePoolSize = 1; //定義執行緒池最大連線 int maximumPoolSize = 1; //阻塞佇列 BlockingQueue queue = new ArrayBlockingQueue<Runnable>(1); //建立執行緒池 ThreadPoolExecutor pool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 0, TimeUnit.SECONDS, queue ) ; //設定執行緒池拒絕策略 pool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy ()); //不斷往執行緒池提交任務 for(int i=0;i<10;i++){ final int index = i; pool.submit(new Runnable(){ @Override public void run() { log(Thread.currentThread().getName()+"begin run task :"+index); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } log(Thread.currentThread().getName()+" finish run task :"+index); } }); } log("main thread before sleep!!!"); try { Thread.sleep(4000); } catch (InterruptedException e) { e.printStackTrace(); } log("before shutdown()"); pool.shutdown(); log("after shutdown(),pool.isTerminated=" + pool.isTerminated()); try { pool.awaitTermination(1000L, TimeUnit.SECONDS); } catch (InterruptedException e) { e.printStackTrace(); } log("now,pool.isTerminated=" + pool.isTerminated()); } //重寫log方法 protected static void log(String string) { System.out.println(sdf.format(new Date())+" "+string); } }
執行結果
2020-12-24 16:23:45 main thread before sleep!!! 2020-12-24 16:23:45 pool-1-thread-2begin run task :2 2020-12-24 16:23:45 pool-1-thread-1begin run task :0 2020-12-24 16:23:45 pool-1-thread-3begin run task :3 2020-12-24 16:23:46 pool-1-thread-3 finish run task :3 2020-12-24 16:23:46 pool-1-thread-1 finish run task :0 2020-12-24 16:23:46 pool-1-thread-2 finish run task :2 2020-12-24 16:23:46 pool-1-thread-3begin run task :1 2020-12-24 16:23:47 pool-1-thread-3 finish run task :1 2020-12-24 16:23:49 before shutdown() 2020-12-24 16:23:49 after shutdown(),pool.isTerminated=false 2020-12-24 16:23:49 now,pool.isTerminated=true
只有task0、task1、task2、task3四個任務被執行了,其餘的六個任務被丟棄
為什麼只有四個任務被執行了呢?
過程是這樣的:由於我們的任務佇列的容量為3.當task0正在執行的時候,task1、task2、task3被提交到了佇列中但是還沒有執行,受佇列容量的限制,submit提交的task4~task9就都被直接拋棄了。因此就只有task0、task1、task2、task3被執行了。
2、丟棄佇列中最老的任務(DiscardOldestPolicy)
程式碼
package com.courage; import java.text.SimpleDateFormat; import java.util.Date; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class DiscardOldestPolicyDemo { private static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); public static void main(String[] args) { //定義執行緒池核心數 int corePoolSize = 1; //定義執行緒池最大連線 int maximumPoolSize = 1; //阻塞佇列 BlockingQueue queue = new ArrayBlockingQueue<Runnable>(1); //建立執行緒池 ThreadPoolExecutor pool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 0, TimeUnit.SECONDS, queue ) ; //設定執行緒池拒絕策略 pool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy()); //不斷往執行緒池提交任務 for(int i=0;i<10;i++){ final int index = i; pool.submit(new Runnable(){ @Override public void run() { log(Thread.currentThread().getName()+"begin run task :"+index); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } log(Thread.currentThread().getName()+" finish run task :"+index); } }); } log("main thread before sleep!!!"); try { Thread.sleep(4000); } catch (InterruptedException e) { e.printStackTrace(); } log("before shutdown()"); pool.shutdown(); log("after shutdown(),pool.isTerminated=" + pool.isTerminated()); try { pool.awaitTermination(1000L, TimeUnit.SECONDS); } catch (InterruptedException e) { e.printStackTrace(); } log("now,pool.isTerminated=" + pool.isTerminated()); } //重寫log方法 protected static void log(String string) { System.out.println(sdf.format(new Date())+" "+string); } }
執行結果
2020-12-24 19:10:54 main thread before sleep!!!
2020-12-24 19:10:54 pool-1-thread-1begin run task :0
2020-12-24 19:10:55 pool-1-thread-1 finish run task :0
2020-12-24 19:10:55 pool-1-thread-1begin run task :9
2020-12-24 19:10:56 pool-1-thread-1 finish run task :9
2020-12-24 19:10:58 before shutdown()
2020-12-24 19:10:58 after shutdown(),pool.isTerminated=false
2020-12-24 19:10:58 now,pool.isTerminated=true
為什麼只有task0和task9執行了呢?
因為開始執行任務task0時,這時task1進入佇列中,task2進入佇列中會頂替掉task1,task3會頂替掉task2,直到task9頂替掉8,只有task9在任務佇列中,最後被執行。
3、拋異常(AbortPolicy)
程式碼同上
只修改策略為AbortPolicy
執行結果
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@433c675d[Not completed, task = java.util.concurrent.Executors$RunnableAdapter@2e817b38[Wrapped task = com.courage.DiscardOldestPolicyDemo$1@c4437c4]] rejected from java.util.concurrent.ThreadPoolExecutor@3f91beef[Running, pool size = 1, active threads = 1, queued tasks = 1, completed tasks = 0]
at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2055)
at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:825)
at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1355)
at java.base/java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:118)
at com.courage.DiscardOldestPolicyDemo.main(DiscardOldestPolicyDemo.java:28)
2020-12-24 19:15:35 pool-1-thread-1begin run task :0
2020-12-24 19:15:36 pool-1-thread-1 finish run task :0
2020-12-24 19:15:36 pool-1-thread-1begin run task :1
2020-12-24 19:15:37 pool-1-thread-1 finish run task :1
當task0開始執行時,task1進入佇列,之後的任務丟擲異常。
4、分給呼叫執行緒來執行(CallerRunsPolicy)
程式碼同上
只修改策略為CallerRunsPolicy
執行結果
2020-12-24 19:19:33 pool-1-thread-1begin run task :0
2020-12-24 19:19:33 mainbegin run task :2
2020-12-24 19:19:34 main finish run task :2
2020-12-24 19:19:34 pool-1-thread-1 finish run task :0
2020-12-24 19:19:34 mainbegin run task :3
2020-12-24 19:19:34 pool-1-thread-1begin run task :1
2020-12-24 19:19:35 main finish run task :3
2020-12-24 19:19:35 pool-1-thread-1 finish run task :1
2020-12-24 19:19:35 pool-1-thread-1begin run task :4
2020-12-24 19:19:35 mainbegin run task :5
2020-12-24 19:19:36 main finish run task :5
2020-12-24 19:19:36 pool-1-thread-1 finish run task :4
2020-12-24 19:19:36 pool-1-thread-1begin run task :6
2020-12-24 19:19:36 mainbegin run task :8
2020-12-24 19:19:37 main finish run task :8
2020-12-24 19:19:37 pool-1-thread-1 finish run task :6
2020-12-24 19:19:37 pool-1-thread-1begin run task :7
2020-12-24 19:19:37 main thread before sleep!!!
2020-12-24 19:19:38 pool-1-thread-1 finish run task :7
2020-12-24 19:19:38 pool-1-thread-1begin run task :9
2020-12-24 19:19:39 pool-1-thread-1 finish run task :9
2020-12-24 19:19:41 before shutdown()
2020-12-24 19:19:41 after shutdown(),pool.isTerminated=false
2020-12-24 19:19:41 now,pool.isTerminated=true
從結果可以看出,沒有任務被拋棄,而是將由的任務分配到main執行緒中執行了。
小結
這四種策略是獨立無關的,是對任務拒絕處理的四中表現形式。最簡單的方式就是直接丟棄任務。但是卻有兩種方式,到底是該丟棄哪一個任務,比如可以丟棄當前將要加入佇列的任務本身(DiscardPolicy)或者丟棄任務佇列中最舊任務(DiscardOldestPolicy)。丟棄最舊任務也不是簡單的丟棄最舊的任務,而是有一些額外的處理。除了丟棄任務還可以直接丟擲一個異常(RejectedExecutionException),這是比較簡單的方式。丟擲異常的方式(AbortPolicy)儘管實現方式比較簡單,但是由於丟擲一個RuntimeException,因此會中斷呼叫者的處理過程。除了丟擲異常以外還可以不進入執行緒池執行,在這種方式(CallerRunsPolicy)中任務將有呼叫者執行緒去執行。