1. 程式人生 > 其它 >資料庫調優-03 (索引)

資料庫調優-03 (索引)

建立執行緒的四種方式

  • 實現Runnable介面,重寫run()方法(避免多繼承侷限)
  • 繼承Thread類,重寫run()方法(本質:Thread類也是實現Runnable介面)
  • 實現Callable介面,重寫call()方法,有返回值
  • 使用執行緒池(使用原因:不推薦手動建立執行緒,不方便管理,易造成較大開銷或浪費)

初識執行緒池

在Java中,我們可以利用多執行緒來最大化地壓榨CPU多核計算的能力。但是,執行緒本身是把雙刃劍,我們需要知道它的利弊,才能在實際系統中游刃有餘地運用。

執行緒池,本質上是一種物件池,用於管理執行緒資源。 在任務執行前,需要從執行緒池中拿出執行緒來執行。在任務執行完成之後,需要把執行緒放回執行緒池。通過執行緒的這種反覆利用機制,可以有效地避免直接建立執行緒所帶來的壞處。

不使用執行緒池的壞處:

  1. 頻繁的執行緒建立和銷燬會佔用更多的CPU和記憶體;
  2. 頻繁的執行緒建立和銷燬會對GC產生比較大的壓力;
  3. 執行緒太多,執行緒切換帶來的開銷將不可忽視;
  4. 執行緒太少,多核CPU得不到充分利用,是一種浪費。

執行緒池的好處:

  1. 降低資源的消耗。執行緒本身是一種資源,建立和銷燬執行緒會有CPU開銷;建立的執行緒也會佔用一定的記憶體;
  2. 提高任務執行的響應速度。任務執行時,可以不必等到執行緒建立完之後再執行;
  3. 提高執行緒的可管理性。執行緒不能無限制地建立,需要進行統一的分配、調優和監控。

因此,我們有必要對執行緒池進行比較完整地說明,以便能對執行緒池進行正確地治理。

執行緒池實現原理

執行緒池主要處理流程

通過上圖,我們看到了執行緒池的主要處理流程。我們的關注點在於,任務提交之後是怎麼執行的。大致如下:

  1. 判斷核心執行緒池是否已滿,如果不是,則建立執行緒執行任務;
  2. 如果核心執行緒池滿了,判斷佇列是否滿了,如果佇列沒滿,將任務放在佇列中;
  3. 如果佇列滿了,則判斷執行緒池是否已滿,如果沒滿,建立執行緒執行任務;
  4. 如果執行緒池也滿了,則按照拒絕策略對任務進行處理。

在jdk裡面,我們可以將處理流程描述得更清楚一點。來看看ThreadPoolExecutor的處理流程。

ThreadPoolExecutor的處理流程:

  1. corePool -> 核心執行緒池
  2. maximumPool
    -> 執行緒池
  3. BlockQueue -> 佇列
  4. RejectedExecutionHandler -> 拒絕策略

程式碼示例

Executors(不推薦使用)

Executors類建立執行緒池的方法歸根結底都是呼叫ThreadPoolExecutor類,只不過對每個方法賦值不同的引數去構造ThreadPoolExecutor物件。

  1. newCachedThreadPool:建立一個可快取的執行緒池,如果執行緒池長度超過處理需要,可靈活回收空閒執行緒,若無可回收,則新建執行緒。
  2. newFixedThreadPool:建立一個定長執行緒池,可控制執行緒最大併發數,超出的執行緒會在佇列中等待
  3. newScheduledThreadPool:建立一個定長執行緒池,支援定時及週期性任務執行。
  4. newSingleThreadExecutor:建立一個單執行緒化的執行緒池,它只會用唯一的工作執行緒來執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先順序)執行。

注意:執行緒池不允許使用Executors去建立,而是通過ThreadPoolExecutor的方式建立。

原因:上述四個方法的建立的佇列大小預設都是Integer.MAX_VALUE,堆積過多的任務請求會可能導致OOM。

public class ThreadPoolTest {
    public static void main(String[] args) {
        ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
        
        // 定時排程,每個排程任務會至少等待`period`的時間,
        // 如果任務執行的時間超過`period`,則等待的時間為任務執行的時間
        executor.scheduleAtFixedRate(() -> {
            try {
                Thread.sleep(10000);
                System.out.println(System.currentTimeMillis() / 1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, 0, 2, TimeUnit.SECONDS);
        
        // 定時排程,延遲`delay`後執行,且只執行一次
        executor.schedule(() -> System.out.println("5 秒之後執行 schedule"), 5, TimeUnit.SECONDS);
    }
}

ThreadPoolExecutor

  • corePoolSize: 常駐核心執行緒數,如果大於0,即使本地任務執行完也不會被銷燬
  • maximumPoolSize: 執行緒池能夠容納可同時執行的最大執行緒數
  • keepAliveTime: 執行緒池中執行緒空閒的時間,當空閒時間達到該值時,執行緒會被銷燬, 只剩下 corePoolSize 個執行緒數量。
  • unit: 空閒時間的單位。一般以TimeUnit類定義時分秒。
  • workQueue: 當請求的執行緒數大於 corePoolSize 時,執行緒進入該阻塞佇列。
    • LinkedBlockingQueue:無界佇列,當不指定佇列大小時,將會預設為Integer.MAX_VALUE大小的佇列,因此大量的任務將會堆積在佇列中,最終可能觸發OOM。
    • ArrayBlockingQueue:有界佇列,基於陣列的先進先出佇列,此佇列建立時必須指定大小。
    • PriorityBlockingQueue:有界佇列,基於優先順序任務的,它是通過Comparator決定的。
    • SynchronousQueue:這個佇列比較特殊,它不會儲存提交的任務,而是將直接新建一個執行緒來執行新來的任務
  • threadFactory: 執行緒工廠,用來生產一組相同任務的執行緒,同時也可以通過它增加字首名,虛擬機器棧分析時更清晰
  • handler: 執行拒絕策略,當 workQueue 已滿,且超過maximumPoolSize 最大值,就要通過這個來處理,比如拒絕,丟棄等,這是一種限流的保護措施。
    • AbortPolicy:預設的拒絕策略,拋RejectedExecutionException異常
    • DiscardPolicy:相當大膽的策略,直接丟棄任務,沒有任何異常丟擲
    • DiscardOldestPolicy:丟棄最老的任務,其實就是把最早進入工作佇列的任務丟棄,然後把新任務加入到工作佇列
    • CallerRunsPolicy:提交任務的執行緒自己去執行該任務
  • 執行緒池關閉
    • shutdown() : 不會立刻終止執行緒,等所有快取佇列中的任務都執行完畢後才會終止。
    • shutdownNow() : 立即終止執行緒池,並嘗試打斷正在執行的任務,並且清空任務快取佇列,返回尚未執行的任務
  • 執行緒池監控
    • long getTaskCount():獲取已經執行或正在執行的任務數
    • long getCompletedTaskCount():獲取已經執行的任務數
    • int getLargestPoolSize():獲取執行緒池曾經建立過的最大執行緒數,根據這個引數,我們可以知道執行緒池是否滿過
    • int getPoolSize():獲取執行緒池執行緒數
    • int getActiveCount():獲取活躍執行緒數(正在執行任務的執行緒數)
public class ThreadPool {
    public static void main( String[] args ){
        // maximumPoolSize設定為2 ,拒絕策略為AbortPolic策略,直接丟擲異常
        ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
        for(int i=0;i<3;i++) {
            pool.execute(new ThreadTask());
        }   
    }
}

class ThreadTask implements Runnable{
    
    public ThreadTask() {
    }
    
    public void run() {
        System.out.println(Thread.currentThread().getName());
    }
}

ThreadFactory(阿里Java開發手冊推薦使用)

// 使用工廠類可以設定執行緒名字
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("demo-pool-%d").build();
ExecutorService pool = new ThreadPoolExecutor(5, 200, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());

注意需要引入guava包,否則ThreadFactoryBuilder會報錯

<!-- https://mvnrepository.com/artifact/com.google.guava/guava -->
<dependency>
    <groupId>com.google.guava</groupId>
    <artifactId>guava</artifactId>
    <version>28.1-jre</version>
</dependency>