池化技術——自定義執行緒池
目錄
- 池化技術——自定義執行緒池
- 1、為什麼要使用執行緒池?
- 1.1、池化技術的特點:
- 1.2、執行緒池的好處:
- 1.3、如何自定義一個執行緒池
- 2、三大方法
- 2.1、單個執行緒的執行緒池方法
- 2.2、固定的執行緒池的大小的方法
- 2.3、可伸縮的執行緒池的方法
- 2.4、完整的測試程式碼為:
- 3、為什麼要自定義執行緒池?三大方法建立執行緒池的弊端分析
- 4、七大引數
- 5、如何手動的去建立一個執行緒池
- 6、四種拒絕策略
- 6.1、會丟擲異常的拒絕策略
- 6.2、哪來的去哪裡拒絕策略
- 6.3、丟掉任務拒絕策略
- 6.4、嘗試競爭拒絕策略
- 7、關於最大執行緒數應該如何確定
- 7.1、CPU密集型
- 7.2、IO密集型
- 7.3、公式總結
- 1、為什麼要使用執行緒池?
池化技術——自定義執行緒池
1、為什麼要使用執行緒池?
池化技術
1.1、池化技術的特點:
程式的執行,本質:佔用系統的資源! 優化資源的使用!=>池化技術
執行緒池、連線池、記憶體池、物件池///..... 建立、銷燬。十分浪費資源
池化技術:事先準備好一些資源,有人要用,就來我這裡拿,用完之後還給我。
1.2、執行緒池的好處:
- 降低資源的消耗
- 降低資源的消耗
- 方便管理。
核心:==執行緒複用、可以控制最大併發數、管理執行緒==
1.3、如何自定義一個執行緒池
牢記:==三大方法、7大引數、4種拒絕策略==
2、三大方法
三大方法
在java的JDK中提夠了Executors開啟JDK預設執行緒池的類,其中有三個方法可以用來開啟執行緒池。
2.1、單個執行緒的執行緒池方法
ExecutorService threadPool = Executors.newSingleThreadExecutor(); //單個執行緒的執行緒池
該方法開啟的執行緒池,故名思義該池中只有一個執行緒。
2.2、固定的執行緒池的大小的方法
ExecutorService threadPool = Executors.newFixedThreadPool(5); //固定的執行緒池的大小
其中方法中傳遞的int型別的引數,就是池中的執行緒數量
2.3、可伸縮的執行緒池的方法
ExecutorService threadPool = Executors.newCachedThreadPool(); //可伸縮
該方法建立的執行緒池是不固定大小的,可以根據需求動態的在池子裡建立執行緒,遇強則強。
2.4、完整的測試程式碼為:
package com.xgp.pool;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* 工具類,三大方法
*/
public class Demo01 {
public static void main(String[] args) {
ExecutorService threadPool = Executors.newSingleThreadExecutor(); //單個執行緒的執行緒池
// ExecutorService threadPool = Executors.newFixedThreadPool(5); //固定的執行緒池的大小
// ExecutorService threadPool = Executors.newCachedThreadPool(); //可伸縮
try{
for(int i = 0;i < 10;i++) {
//使用執行緒池去建立
threadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + " OK");
});
}
}catch (Exception e) {
e.printStackTrace();
}finally {
//關閉執行緒池
threadPool.shutdown();
}
}
}
將三行註釋部分依次開啟的執行結果為:
pool-1-thread-1 OK
pool-1-thread-1 OK
pool-1-thread-1 OK
pool-1-thread-1 OK
pool-1-thread-1 OK
pool-1-thread-1 OK
pool-1-thread-1 OK
pool-1-thread-1 OK
pool-1-thread-1 OK
pool-1-thread-1 OK
上述執行結果為單個執行緒的執行緒池的結果:可以看出的確只有一條執行緒在執行。
pool-1-thread-1 OK
pool-1-thread-1 OK
pool-1-thread-1 OK
pool-1-thread-1 OK
pool-1-thread-1 OK
pool-1-thread-1 OK
pool-1-thread-2 OK
pool-1-thread-3 OK
pool-1-thread-4 OK
pool-1-thread-5 OK
上述執行結果為固定執行緒的執行緒池的結果:因為固定的大小為5,可以看出的確有5條執行緒在執行。
pool-1-thread-1 OK
pool-1-thread-3 OK
pool-1-thread-2 OK
pool-1-thread-4 OK
pool-1-thread-5 OK
pool-1-thread-7 OK
pool-1-thread-9 OK
pool-1-thread-10 OK
pool-1-thread-8 OK
pool-1-thread-6 OK
上述執行結果為彈性執行緒的執行緒池的結果:可以看出的確有10條執行緒在執行。
3、為什麼要自定義執行緒池?三大方法建立執行緒池的弊端分析
在單個執行緒池和固定大小的執行緒池中,因為處理的執行緒有限,當大量的請求進來時,都會在阻塞佇列中等候,而允許請求的佇列長度為Integet.MAX_VALUE,整數的最大值約為21億,會導致JVM記憶體溢位。
在彈性伸縮的執行緒池中,允許建立的執行緒數量為Integet.MAX_VALUE,可能會建立大量的執行緒,使得Jvm記憶體溢位。
==對於上述的兩點,其數值會在後面分析原始碼的環節看到,關於這一點,在阿里巴巴開發手冊中有著詳細的說明,並極力推薦採用自定義執行緒池,而不使用這三大方法。==
4、七大引數
七大引數
原始碼分析:我們要指定義自己的執行緒池,先從原始碼的角度看一看JDK現有的三個執行緒池是如何編寫的。
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
從三大方法的原始碼中可以看出,三種執行緒池都是new 了一個 ThreadPoolExecutor 物件,點選原始碼中看看。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
這裡呼叫了一個 this() 方法,在點選進去一看。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
到這裡就可以很明顯的看出本節想要講述的7大引數,是哪7大了吧。根據英文意思,可以很容易的說明這七大引數的意思了。
public ThreadPoolExecutor(int corePoolSize, //核心執行緒數
int maximumPoolSize, //最大執行緒數
long keepAliveTime, //超時等待
TimeUnit unit, //超時等待的單位
BlockingQueue<Runnable> workQueue, //阻塞佇列
ThreadFactory threadFactory, //執行緒池工廠
RejectedExecutionHandler handler) { //拒絕策略
在阿里巴巴開發手冊中,推薦的也是使用 ThreadPoolExecutor 來進行建立執行緒池的。
這裡可以用一張在銀行辦理業務的圖來生動的說明這七大引數。
這裡,解釋下這張圖對應的含義:
銀行在人很少的時候也只開放兩個視窗,並且什麼時候都不會進行關閉。——核心執行緒數
當人數大於2又小於5人時,後來的三人就在候客區等候辦理業務。——阻塞佇列
當人數大於5人又小於8人時,另外三個正在關閉的視窗需要開放進行辦理業務,於是乎就有了5個視窗在進行辦理業務。——最大執行緒數
將開啟其他三個視窗,需要領導將這三個視窗的員工叫回。——執行緒池工廠
當人數實在太多時,銀行都擠不下了,此時就會把門關了,不接受新的服務了。——拒絕策略
當銀行人數又變少時,另外的三個非核心視窗太久沒又生意,為了節約成本,則又會進行關閉。——超時等待
通過對上述7大引數的分析,同學們也能夠更加理解JDK自帶的三大方法的弊端,以及為什麼是整數的最大值的這個數值。
5、如何手動的去建立一個執行緒池
手動建立連線池
七大引數的說明也都講了,於是乎我們可以仿照這七大引數,來定義一個自己的執行緒池。對於其中的執行緒工廠,我們也一般採用預設的工廠,而其中的拒絕策略我們可以通過原始碼分析,先使用三大方法使用的拒絕策略。
點選進入 defaultHandler 的原始碼中可以看到。
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();
其中的 new AbortPolicy(); 就是三大方法使用的拒絕策略,我們先仿照銀行的例子,自定義一個執行緒池。程式碼如下:
package com.xgp.pool;
import java.util.concurrent.*;
/**
* 自定義執行緒池
*/
public class Demo02 {
/* public ThreadPoolExecutor(int corePoolSize, //核心執行緒數
int maximumPoolSize, //最大執行緒數
long keepAliveTime, //超時等待
TimeUnit unit, //超時等待的單位
BlockingQueue<Runnable> workQueue, //阻塞佇列
ThreadFactory threadFactory, //執行緒池工廠
RejectedExecutionHandler handler) {*/ //拒絕策略
public static void main(String[] args) {
ExecutorService pool = new ThreadPoolExecutor(
2,
5,
3,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy() //會丟擲異常的實現類
// new ThreadPoolExecutor.CallerRunsPolicy() //哪來的去哪裡
// new ThreadPoolExecutor.DiscardOldestPolicy() //不會丟擲異常,會丟掉任務
// new ThreadPoolExecutor.AbortPolicy() //嘗試會和第一個競爭
);
try{
for(int i = 0;i < 8;i++) {
//使用執行緒池去建立
pool.execute(() -> {
System.out.println(Thread.currentThread().getName() + " OK");
});
}
}catch (Exception e) {
e.printStackTrace();
}finally {
//關閉執行緒池
pool.shutdown();
}
}
}
於是乎,我們完成了一個自定義的執行緒池,核心執行緒數為2,最大執行緒數為5,超時等待的秒數為3s,阻塞佇列的長度為3。
6、四種拒絕策略
四種拒絕策略
通過分析原始碼,可以知道三大方法預設的拒絕策略在ThreadPoolExecutor這個類中,由於該類較為複雜,尋找起來不方便,於是我們可以採用IDEA的程式碼提示功能,非常明顯的提示出了四種拒絕策略,也就是上面自定義執行緒池中的被註釋部分。
將上面自定義執行緒池的程式碼註釋一一開啟,我們來進行測試:
6.1、會丟擲異常的拒絕策略
new ThreadPoolExecutor.AbortPolicy() //會丟擲異常的實現類
該拒絕策略執行的結果為:
pool-1-thread-1 OK
pool-1-thread-2 OK
pool-1-thread-1 OK
pool-1-thread-3 OK
pool-1-thread-4 OK
pool-1-thread-1 OK
pool-1-thread-2 OK
pool-1-thread-5 OK
java.util.concurrent.RejectedExecutionException: Task com.xgp.pool.Demo02$$Lambda$1/2093631819@378bf509 rejected from java.util.concurrent.ThreadPoolExecutor@5fd0d5ae[Running, pool size = 5, active threads = 4, queued tasks = 0, completed tasks = 4]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
at com.xgp.pool.Demo02.main(Demo02.java:40)
該策略就是當最大執行緒數+阻塞佇列數都不滿足請求數時,系統將丟擲異常來進行解決。
6.2、哪來的去哪裡拒絕策略
new ThreadPoolExecutor.CallerRunsPolicy() //哪來的去哪裡
該拒絕策略執行的結果為:
pool-1-thread-1 OK
main OK
main OK
pool-1-thread-1 OK
pool-1-thread-1 OK
pool-1-thread-1 OK
pool-1-thread-2 OK
pool-1-thread-3 OK
pool-1-thread-4 OK
pool-1-thread-5 OK
可以看出,該拒絕策略當執行緒池的執行緒數不能夠滿足需求時,會將不能服務的任務打道回府,即交給main執行緒來解決,該拒絕策略適用於原來的執行緒能夠解決問題的情況。
6.3、丟掉任務拒絕策略
new ThreadPoolExecutor.DiscardOldestPolicy() //不會丟擲異常,會丟掉任務
該拒絕策略執行的結果為:
pool-1-thread-2 OK
pool-1-thread-1 OK
pool-1-thread-2 OK
pool-1-thread-1 OK
pool-1-thread-3 OK
pool-1-thread-2 OK
pool-1-thread-4 OK
pool-1-thread-5 OK
數一數,一共是10個任務,但根據執行的情況只處理了8個任務,該拒絕策略將不能夠分配執行緒執行的任務全部丟棄了,會造成資料的丟失。
6.4、嘗試競爭拒絕策略
new ThreadPoolExecutor.DiscardPolicy() //嘗試會和第一個競爭
該拒絕策略執行的結果為:
pool-1-thread-1 OK
pool-1-thread-2 OK
pool-1-thread-3 OK
pool-1-thread-1 OK
pool-1-thread-3 OK
pool-1-thread-2 OK
pool-1-thread-1 OK
pool-1-thread-4 OK
pool-1-thread-5 OK
數一數,一共是10個任務,但根據執行的情況只處理了9個任務,其中競爭成功了一個,失敗了一個。該策略將會於最早進來的執行緒進行競爭,類似於作業系統中的搶佔式短作業優先演算法,該拒絕策略同樣會造成資料的丟失。
7、關於最大執行緒數應該如何確定
7.1、CPU密集型
CPU密集型
對於有多核Cpu的電腦,應該讓cpu充分忙碌起來,不要低於Cpu的核數,並且不應該在程式碼中寫死,而是應該能夠自動的獲取當前機器的Cpu核數,獲取的程式碼如下:
System.out.println(Runtime.getRuntime().availableProcessors());
7.2、IO密集型
IO密集型
對於系統中有大量IO任務時,應該要預留出足夠的執行緒來處理IO任務,因為IO任務極度耗時。如果判斷出系統中的IO密集的任務有10個,則定義的執行緒數量需要大於10。
7.3、公式總結
最大執行緒數 = 機器核素*2 + IO密集型任務數
對於上述該公式,只是網上的一種總結,作者也沒有進行深入的測試於瞭解,讀者應該根據自己的業務需要進行合理的調