資料庫調優-03 (索引)
阿新 • • 發佈:2021-10-03
建立執行緒的四種方式
- 實現Runnable介面,重寫run()方法(避免多繼承侷限)
- 繼承Thread類,重寫run()方法(本質:Thread類也是實現Runnable介面)
- 實現Callable介面,重寫call()方法,有返回值
- 使用執行緒池(使用原因:不推薦手動建立執行緒,不方便管理,易造成較大開銷或浪費)
初識執行緒池
在Java中,我們可以利用多執行緒來最大化地壓榨CPU多核計算的能力。但是,執行緒本身是把雙刃劍,我們需要知道它的利弊,才能在實際系統中游刃有餘地運用。
執行緒池,本質上是一種物件池,用於管理執行緒資源。 在任務執行前,需要從執行緒池中拿出執行緒來執行。在任務執行完成之後,需要把執行緒放回執行緒池。通過執行緒的這種反覆利用機制,可以有效地避免直接建立執行緒所帶來的壞處。
不使用執行緒池的壞處:
- 頻繁的執行緒建立和銷燬會佔用更多的CPU和記憶體;
- 頻繁的執行緒建立和銷燬會對GC產生比較大的壓力;
- 執行緒太多,執行緒切換帶來的開銷將不可忽視;
- 執行緒太少,多核CPU得不到充分利用,是一種浪費。
執行緒池的好處:
- 降低資源的消耗。執行緒本身是一種資源,建立和銷燬執行緒會有CPU開銷;建立的執行緒也會佔用一定的記憶體;
- 提高任務執行的響應速度。任務執行時,可以不必等到執行緒建立完之後再執行;
- 提高執行緒的可管理性。執行緒不能無限制地建立,需要進行統一的分配、調優和監控。
因此,我們有必要對執行緒池進行比較完整地說明,以便能對執行緒池進行正確地治理。
執行緒池實現原理
執行緒池主要處理流程
通過上圖,我們看到了執行緒池的主要處理流程。我們的關注點在於,任務提交之後是怎麼執行的。大致如下:
- 判斷核心執行緒池是否已滿,如果不是,則建立執行緒執行任務;
- 如果核心執行緒池滿了,判斷佇列是否滿了,如果佇列沒滿,將任務放在佇列中;
- 如果佇列滿了,則判斷執行緒池是否已滿,如果沒滿,建立執行緒執行任務;
- 如果執行緒池也滿了,則按照拒絕策略對任務進行處理。
在jdk裡面,我們可以將處理流程描述得更清楚一點。來看看ThreadPoolExecutor
的處理流程。
ThreadPoolExecutor的處理流程:
corePool
-> 核心執行緒池maximumPool
BlockQueue
-> 佇列RejectedExecutionHandler
-> 拒絕策略
程式碼示例
Executors(不推薦使用)
Executors類建立執行緒池的方法歸根結底都是呼叫ThreadPoolExecutor類,只不過對每個方法賦值不同的引數去構造ThreadPoolExecutor物件。
- newCachedThreadPool:建立一個可快取的執行緒池,如果執行緒池長度超過處理需要,可靈活回收空閒執行緒,若無可回收,則新建執行緒。
- newFixedThreadPool:建立一個定長執行緒池,可控制執行緒最大併發數,超出的執行緒會在佇列中等待
- newScheduledThreadPool:建立一個定長執行緒池,支援定時及週期性任務執行。
- newSingleThreadExecutor:建立一個單執行緒化的執行緒池,它只會用唯一的工作執行緒來執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先順序)執行。
注意:執行緒池不允許使用Executors去建立,而是通過ThreadPoolExecutor的方式建立。
原因:上述四個方法的建立的佇列大小預設都是Integer.MAX_VALUE,堆積過多的任務請求會可能導致OOM。
public class ThreadPoolTest {
public static void main(String[] args) {
ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
// 定時排程,每個排程任務會至少等待`period`的時間,
// 如果任務執行的時間超過`period`,則等待的時間為任務執行的時間
executor.scheduleAtFixedRate(() -> {
try {
Thread.sleep(10000);
System.out.println(System.currentTimeMillis() / 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, 0, 2, TimeUnit.SECONDS);
// 定時排程,延遲`delay`後執行,且只執行一次
executor.schedule(() -> System.out.println("5 秒之後執行 schedule"), 5, TimeUnit.SECONDS);
}
}
ThreadPoolExecutor
- corePoolSize: 常駐核心執行緒數,如果大於0,即使本地任務執行完也不會被銷燬
- maximumPoolSize: 執行緒池能夠容納可同時執行的最大執行緒數
- keepAliveTime: 執行緒池中執行緒空閒的時間,當空閒時間達到該值時,執行緒會被銷燬, 只剩下 corePoolSize 個執行緒數量。
- unit: 空閒時間的單位。一般以TimeUnit類定義時分秒。
- workQueue: 當請求的執行緒數大於 corePoolSize 時,執行緒進入該阻塞佇列。
- LinkedBlockingQueue:無界佇列,當不指定佇列大小時,將會預設為Integer.MAX_VALUE大小的佇列,因此大量的任務將會堆積在佇列中,最終可能觸發OOM。
- ArrayBlockingQueue:有界佇列,基於陣列的先進先出佇列,此佇列建立時必須指定大小。
- PriorityBlockingQueue:有界佇列,基於優先順序任務的,它是通過Comparator決定的。
- SynchronousQueue:這個佇列比較特殊,它不會儲存提交的任務,而是將直接新建一個執行緒來執行新來的任務
- threadFactory: 執行緒工廠,用來生產一組相同任務的執行緒,同時也可以通過它增加字首名,虛擬機器棧分析時更清晰
- handler: 執行拒絕策略,當 workQueue 已滿,且超過maximumPoolSize 最大值,就要通過這個來處理,比如拒絕,丟棄等,這是一種限流的保護措施。
- AbortPolicy:預設的拒絕策略,拋RejectedExecutionException異常
- DiscardPolicy:相當大膽的策略,直接丟棄任務,沒有任何異常丟擲
- DiscardOldestPolicy:丟棄最老的任務,其實就是把最早進入工作佇列的任務丟棄,然後把新任務加入到工作佇列
- CallerRunsPolicy:提交任務的執行緒自己去執行該任務
- 執行緒池關閉
- shutdown() : 不會立刻終止執行緒,等所有快取佇列中的任務都執行完畢後才會終止。
- shutdownNow() : 立即終止執行緒池,並嘗試打斷正在執行的任務,並且清空任務快取佇列,返回尚未執行的任務
- 執行緒池監控
- long getTaskCount():獲取已經執行或正在執行的任務數
- long getCompletedTaskCount():獲取已經執行的任務數
- int getLargestPoolSize():獲取執行緒池曾經建立過的最大執行緒數,根據這個引數,我們可以知道執行緒池是否滿過
- int getPoolSize():獲取執行緒池執行緒數
- int getActiveCount():獲取活躍執行緒數(正在執行任務的執行緒數)
public class ThreadPool {
public static void main( String[] args ){
// maximumPoolSize設定為2 ,拒絕策略為AbortPolic策略,直接丟擲異常
ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
for(int i=0;i<3;i++) {
pool.execute(new ThreadTask());
}
}
}
class ThreadTask implements Runnable{
public ThreadTask() {
}
public void run() {
System.out.println(Thread.currentThread().getName());
}
}
ThreadFactory(阿里Java開發手冊推薦使用)
// 使用工廠類可以設定執行緒名字
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("demo-pool-%d").build();
ExecutorService pool = new ThreadPoolExecutor(5, 200, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());
注意需要引入guava包,否則ThreadFactoryBuilder會報錯
<!-- https://mvnrepository.com/artifact/com.google.guava/guava -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>28.1-jre</version>
</dependency>