Java併發程式設計(7)-ThreadPoolExecutor解讀(2)
文章目錄
更多關於Java併發程式設計的文章請點選這裡:Java併發程式設計實踐(0)-目錄頁
本文將介紹ThreadPoolExecutor的飽和策略以及ThreadPoolExecutor的擴充套件方法;當執行緒充滿了ThreadPool的有界佇列時,飽和策略開始起作用。飽和策略可以理解為佇列飽和後,處理後續無法入隊的任務的策略。在JDK中預先提供了幾種飽和策略,ThreadPoolExecutor可以通過呼叫setRejectedExecutionHandler來修改飽和策略。ThreadPoolExecutor在設計的時候就支援擴充套件其方法,主要為前處理beforeExecute、後處理afterExecute`以及執行完畢處理terminated。
一、ThreadPoolExecutor的飽和策略
1.1、什麼是飽和策略
當執行緒充滿了ThreadPool的有界佇列時,飽和策略開始起作用。**飽和策略可以理解為佇列飽和後,處理後續無法入隊的任務的策略。**在JDK中預先提供了幾種飽和策略,ThreadPoolExecutor可以通過呼叫setRejectedExecutionHandler
來修改飽和策略。
1.2、Abort策略
ThreadPoolExecutor的 預設策略,新任務提交時直接丟擲未檢查的異常RejectedExecutionException,該異常可由呼叫者捕獲。
@Test
public void testPolicy() {
ThreadPoolExecutor executor = new ThreadPoolExecutor(5,
5,
0,
TimeUnit.MICROSECONDS,
new ArrayBlockingQueue<Runnable>(5));
//設定飽和策略為AbortPolicy
executor.setRejectedExecutionHandler(new ThreadPoolExecutor. AbortPolicy());
//測試
for (int i = 1; i <= 11; i++) {
int count = i;
executor.execute(new Runnable() {
@Override
public void run() {
//模擬耗時操作
for (int j = 0; j < 100000; j++) {
//...
}
System.out.println("這是第" + count + "個執行緒在執行任務----" + Thread.currentThread().getName());
}
});
}
}
執行後,由於設定了核心池執行緒數為5,有界佇列能儲存的執行緒數為5,故一共只有10個任務能被執行,多餘的那1個任務將被以異常的形式丟擲。
1.3、CallerRuns策略
既不拋棄任務也不丟擲異常,而是將某些任務回退到呼叫者,讓呼叫者去執行它。
@Test
public void testPolicy() {
ThreadPoolExecutor executor = new ThreadPoolExecutor(5,
5,
0,
TimeUnit.MICROSECONDS,
new ArrayBlockingQueue<Runnable>(5));
//設定飽和策略為AbortPolicy
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//測試
for (int i = 1; i <= 11; i++) {
int count = i;
executor.execute(new Runnable() {
@Override
public void run() {
//模擬耗時操作
for (int j = 0; j < 100000; j++) {
//...
}
System.out.println("這是第" + count + "個執行緒在執行任務----" + Thread.currentThread().getName());
}
});
}
}
從執行結果中可以看到,多出來的那11個任務被executor的呼叫者main主執行緒
執行。
1.4、Discard策略
多餘的任務被拋棄,不執行。
@Test
public void testPolicy() {
ThreadPoolExecutor executor = new ThreadPoolExecutor(5,
5,
0,
TimeUnit.MICROSECONDS,
new ArrayBlockingQueue<Runnable>(5));
//設定飽和策略為DiscardPolicy
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
//測試
for (int i = 1; i <= 11; i++) {
int count = i;
executor.execute(new Runnable() {
@Override
public void run() {
//模擬耗時操作
for (int j = 0; j < 100000; j++) {
//...
}
System.out.println("這是第" + count + "個執行緒在執行任務----" + Thread.currentThread().getName());
}
});
}
}
1.5、DiscardOldest策略
拋棄工作佇列中的舊任務,然後嘗試提交新任務執行。
@Test
public void testPolicy() {
ThreadPoolExecutor executor = new ThreadPoolExecutor(5,
5,
0,
TimeUnit.MICROSECONDS,
new ArrayBlockingQueue<Runnable>(5));
//設定飽和策略為DiscardOldestPolicy
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
//測試
for (int i = 1; i <= 11; i++) {
int count = i;
executor.execute(new Runnable() {
@Override
public void run() {
//模擬耗時操作
for (int j = 0; j < 100000; j++) {
//...
}
System.out.println("這是第" + count + "個執行緒在執行任務----" + Thread.currentThread().getName());
}
});
}
}
從執行結果可以看出,第11個任務被執行了,說明工作佇列中的最舊任務被拋棄,執行了這個新的任務。
二、ThreadPoolExecutor的擴充套件方法
ThreadPoolExecutor在設計的時候就支援擴充套件其方法,主要為前處理beforeExecute
、後處理afterExecute
以及執行完畢處理terminated
。
自定義的ThreadPoolExecutor:
public class MyThreadPoolExecutor extends ThreadPoolExecutor {
public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
}
public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
}
public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
//before execute....
System.out.println("before execute....");
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
//after execute....
System.out.println("after execute....");
}
@Override
protected void terminated() {
super.terminated();
//terminated
System.out.println("terminated");
}
}