Java Executor併發框架(二)RejectedExecutionHandler介紹
一、介紹
當Executor已經關閉(即執行了executorService.shutdown()方法後),並且Executor將有限邊界用於最大執行緒數量和工作佇列容量,且已經飽和時,在方法execute()中提交的新任務將被拒絕。
在以上述情況下,execute 方法將呼叫其 RejectedExecutionHandler 的 RejectedExecutionHandler.rejectedExecution(java.lang.Runnable, java.util.concurrent.ThreadPoolExecutor)
方法。
首先我們來看下RejectedExecutionHandler 的定義:
下面提供了四種預定義的處理程式策略:
(1) 預設的ThreadPoolExecutor.AbortPolicy 處理程式遭到拒絕將丟擲執行時RejectedExecutionException;
(2) ThreadPoolExecutor.CallerRunsPolicy 執行緒呼叫執行該任務的 execute 本身。此策略提供簡單的反饋控制機制,能夠減緩新任務的提交速度
(3) ThreadPoolExecutor.DiscardPolicy 不能執行的任務將被刪除;
(4) ThreadPoolExecutor.DiscardOldestPolicy 如果執行程式尚未關閉,則位於工作佇列頭部的任務將被刪除,然後重試執行程式(如果再次失敗,則重複此過程)。
再看看具體的AbortPolicy:
看一下其他拒絕策略的具體實現:
CallerRunsPolicy實現類:
DiscardPolicy實現類:
DiscardOldestPolicy實現類:
通過以上分析,我們都知道:這四個RejectedExecutionHandler的實現類都是ThreadPoolExecutor的靜態內部類。
二、測試案例
寫一個task:package com.npf.thread.test; public class Task implements Runnable { protected String name; public Task(String name) { super(); this.name = name; } @Override public void run() { try { System.out.println(this.name + " is running."); Thread.sleep(500); } catch (Exception e) { } } }
1. AbortPolicy 示例
package com.npf.thread.test;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class AbortPolicyDemo {
public static void main(String[] args) {
// 建立執行緒池。執行緒池的"最大池大小"和"核心池大小"都為1,"執行緒池"的阻塞佇列容量為1。
ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 1, 0,
TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1));
// 設定執行緒池的拒絕策略為AbortPolicy
pool.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
try {
// 新建10個任務,並將它們新增到執行緒池中
for (int i = 0; i < 10; i++) {
Runnable myTask = new Task("task-"+i);
pool.execute(myTask);
}
} catch (RejectedExecutionException e) {
e.printStackTrace();
// 關閉執行緒池
pool.shutdown();
}
}
}
某一次執行結果,直接已異常的形式丟擲:
2. CallerRunsPolicy 示例
package com.npf.thread.test;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class CallerRunsPolicyDemo {
public static void main(String[] args) {
// 建立執行緒池。執行緒池的"最大池大小"和"核心池大小"都為1,"執行緒池"的阻塞佇列容量為1。
ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 1, 0,
TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1));
// 設定執行緒池的拒絕策略為CallerRunsPolicy
pool.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 新建10個任務,並將它們新增到執行緒池中
for (int i = 0; i < 10; i++) {
Runnable myTask = new Task("task-"+i);
pool.execute(myTask);
}
// 關閉執行緒池
pool.shutdown();
}
}
某一次執行結果,當有新任務新增到執行緒池被拒絕時,執行緒池會將被拒絕的任務新增到"執行緒池正在執行的執行緒"中去執行:
3. DiscardPolicy 示例
package com.npf.thread.test;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class DiscardPolicyDemo {
public static void main(String[] args) {
// 建立執行緒池。執行緒池的"最大池大小"和"核心池大小"都為1,"執行緒池"的阻塞佇列容量為1。
ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 1, 0,
TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1));
// 設定執行緒池的拒絕策略為DiscardPolicy
pool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
// 新建10個任務,並將它們新增到執行緒池中
for (int i = 0; i < 10; i++) {
Runnable myTask = new Task("task-"+i);
pool.execute(myTask);
}
// 關閉執行緒池
pool.shutdown();
}
}
某一次執行結果,執行緒池pool的"最大池大小"和"核心池大小"都為1,這意味著"執行緒池能同時執行的任務數量最大隻能是1"。執行緒池pool的阻塞佇列是ArrayBlockingQueue,ArrayBlockingQueue是一個有界的阻塞佇列,ArrayBlockingQueue的容量為1。這也意味著執行緒池的阻塞佇列只能有一個執行緒池阻塞等待。由此可知,執行緒池中共運行了2個任務。第1個任務直接放到Worker中,通過執行緒去執行;第2個任務放到阻塞佇列中等待。其他的任務都被丟棄了。
4.
DiscardOldestPolicy 示例
package com.npf.thread.test;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class DiscardOldestPolicyDemo {
public static void main(String[] args) {
// 建立執行緒池。執行緒池的"最大池大小"和"核心池大小"都為1,"執行緒池"的阻塞佇列容量為1。
ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 1, 0,
TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1));
// 設定執行緒池的拒絕策略為DiscardOldestPolicy
pool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
// 新建10個任務,並將它們新增到執行緒池中
for (int i = 0; i < 10; i++) {
Runnable myTask = new Task("task-"+i);
pool.execute(myTask);
}
// 關閉執行緒池
pool.shutdown();
}
}
某一次執行結果,當有新任務新增到執行緒池被拒絕時,執行緒池會丟棄阻塞佇列中末尾的任務,然後將被拒絕的任務新增到末尾。