1. 程式人生 > 程式設計 >Java併發類庫的執行緒池

Java併發類庫的執行緒池

new Thread 弊端

  • 頻繁建立執行緒和銷燬執行緒需要時間,大大降低系統的效率
  • 缺乏統一管理,可能無限制的新建物件,相互競爭,造成系統資源佔用過多
  • 缺少更多執行,定期執行,執行緒中斷等更多功能

執行緒池的好處

  • 減少了建立和銷燬執行緒的次數,執行緒可被重複利用,可執行多個任務
  • 控制最大併發數,提高系統資源利用率
  • 避免過多資源競爭,避免阻塞
  • 提供定期執行,單執行緒,併發數控制等功能

執行緒池的生命週期

執行緒池生命週期

ThreadPoolExecutor 核心類


public class ThreadPoolExecutor extends AbstractExecutorService {

    // 包含 4 個構造方法。 其他 3 個通過呼叫該構造方法。
public ThreadPoolExecutor( int corePoolSize,// 核心執行緒數量 int maximumPoolSize,// 最大執行緒數 long keepAliveTime,// 執行緒沒有執行任務時最大保持多久終止 TimeUnit unit,// keepAliveTime的時間單位 BlockingQueue<Runnable> workQueue,// 阻塞佇列,儲存等待執行的任務 ThreadFactory threadFactory,// 執行緒工廠,用來建立執行緒 RejectedExecutionHandler handler // 當拒絕處理任務時的策略 )
{ } } 複製程式碼

構造方法核心引數

int corePoolSize

核心執行緒數量。

預設情況下,建立了執行緒池後,執行緒池中的執行緒數為0,當有任務來之後,就會建立一個執行緒去執行任務,當執行緒池中的執行緒數目達到corePoolSize後,就會把到達的任務放到快取佇列(workQueue)當中。

int maximumPoolSize

最大執行緒數。線上程池中最多能建立多少個執行緒。

int keepAliveTime

執行緒沒有執行任務時最大保持多久終止。

預設情況下,只有當執行緒池中的執行緒數大於corePoolSize時,keepAliveTime才會起作用。

呼叫了allowCoreThreadTimeOut(boolean)方法,線上程池中的執行緒數不大於corePoolSize時,keepAliveTime引數也會起作用,直到執行緒池中的執行緒數為0。

TimeUnit unit

引數 keepAliveTime 的時間單位

TimeUnit.DAYS;               //天
TimeUnit.HOURS;             //小時
TimeUnit.MINUTES;           //分鐘
TimeUnit.SECONDS;           //秒
TimeUnit.MILLISECONDS;      //毫秒
TimeUnit.MICROSECONDS;      //微妙
TimeUnit.NANOSECONDS;       //納秒
複製程式碼

BlockingQueue workQueue

阻塞佇列,儲存等待執行的任務,對執行緒池執行過程產生重大影響。

  • ArrayBlockingQueue
  • LinkedBlockingQueue
  • SynchronousQueue

ThreadFactory threadFactory

執行緒工廠,主要用來建立執行緒。

RejectedExecutionHandler handler

當拒絕處理任務時的策略。

// 丟棄任務,並丟擲 RejectedExecutionException
ThreadPoolExecutor.AbortPolicy
// 丟棄任務,不丟擲異常
ThreadPoolExecutor.DiscardPolicy
// 丟棄佇列最前面的任務,然後重新嘗試執行任務(重複此過程)
ThreadPoolExecutor.DiscardOldestPolicy
// 呼叫執行緒處理該任務
ThreadPoolExecutor.CallerRunsPolicy
複製程式碼

執行緒池的互動和執行緒池的內部工作過程

執行緒池的互動和執行緒池的內部工作過程

ThreadPoolExecutor常用方法

public class ThreadPoolExecutor extends AbstractExecutorService {
    // 提交任務,交給執行緒池執行
    public void execute(Runnable command) {}

    // 提交任務,能夠返回執行結果 (execute + Future)
    public Future<?> submit(Runnable task) {}
    public <T> Future<T> submit(Runnable task,T result) {}
    public <T> Future<T> submit(Callable<T> task) {}

    // 關閉執行緒池,等待任務執行完
    public void shutdown() {}
            
    // 立即關閉,不等待任務關閉        
    public void shutdownNow() {}

    // 獲得執行緒池中已執行和未執行的任務總數
    public long getTaskCount() {}

    // 獲得已完成任務數量
    public long getCompletedTaskCount() {}

    // 執行緒池當前的執行緒數量
    public int getPoolSize() {}

    // 獲得當前執行緒池中正在執行的執行緒數量
    public int getActiveCount() {
}
複製程式碼

內建執行緒池


  • newSingleThreadExecutor()
  • newFixedThreadPooll(int nThreads)
  • newCachedThreadPool()
  • newScheduledThreadPool(int corePoolSize) / newSingleThreadScheduledExecutor()
  • newWorkStealingPool(int parallelism)

利用Executors提供的通用執行緒池建立方法,建立不同配置的執行緒池,主要區別在於不同的ExecutorService型別或者不同的初始引數。

執行緒池類圖

newSingleThreadExecutor()

  • 建立一個單執行緒的執行緒池
  • 只有一個執行緒在工作,保證所有任務的執行順序按照任務的提交順序執行
  • 現行大多數GUI程式都是單執行緒的
  • Android中單執行緒可用於資料庫操作,檔案操作,應用批量安裝,應用批量刪除等不適合併發但可能IO阻塞性及影響UI執行緒響應的操作
public static void main(String[] args) {

    ExecutorService pool = Executors.newSingleThreadExecutor();

    // 執行過程將會順序輸出 0 --> 9
    for (int i = 0; i < 10; i++) {
        final int index = i;
        executorService.execute(() -> {
            System.out.println(index);
        })
    }
}
複製程式碼
// corePoolSize = 1
// maximumPoolSIze = 1
// keyAliveTime = 0L
// unit = TimeUnit.MILLISECONDS
// workQueue = new LinkedBlockingQueue<Runnable>()
// threadFactory = Executors.defaultThreadFactory()
// handler = defaultHandler = new AbortPolicy()
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1,1,0L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));
}
複製程式碼

newFixedThreadPooll(int nThreads)

  • 建立固定大小的執行緒池,每次提交一個任務就建立一個執行緒,直到達到nThreads
  • 執行緒池的大小一旦達到 nThreads 就保持不變,如果某個執行緒因為執行異常而結束,執行緒池會補充一個新執行緒
  • 可控制執行緒最大併發數,超出的執行緒會在佇列中等待
public static void main(String[] args) {
    // 執行緒池大小為3
    ExecutorService executorService = Executors.newFixedThreadPool(3);

    // 每隔秒列印三個數字
    for (int i = 0; i < 50; i++) {
        final int index = i;
        executorService.execute(() -> {
            System.out.println(index);
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
    }
    executorService.shutdown();
}
複製程式碼
// corePoolSize = nThreads
// maximumPoolSIze = nThreads
// keyAliveTime = 0L
// unit = TimeUnit.MILLISECONDS
// workQueue = new LinkedBlockingQueue<Runnable>()
// threadFactory = Executors.defaultThreadFactory()
// handler = defaultHandler = new AbortPolicy()
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads,nThreads,new LinkedBlockingQueue<Runnable>());
}
複製程式碼

newCachedThreadPool()

  • 建立一個可快取的執行緒池
  • 執行緒池的大小超過了處理任務所需要的執行緒,那麼就會回收部分空閒(60s)的執行緒
  • 當任務數增加時,此執行緒池可智慧的新增新執行緒來處理任務
  • 此執行緒池不會對執行緒池大小做限制,執行緒池大小完全依賴於作業系統(或者說JVM)能夠建立的最大執行緒大小
public static void main(String[] args) {
    ExecutorService executorService = Executors.newCachedThreadPool();

    for (int i = 0; i < 10; i++) {
        final int index = i;
        executorService.execute(new Runnable() {
            @Override
            public void run() {
                System.out.println(index);
            }
        });
    }

    executorService.shuntdown();
}
複製程式碼
// corePoolSize = 0
// maximumPoolSIze = Integer.MAX_VALUE
// keyAliveTime = 60L
// unit = TimeUnit.MILLISECONDS
// workQueue = new LinkedBlockingQueue<Runnable>()
// threadFactory = Executors.defaultThreadFactory()
// handler = defaultHandler = new AbortPolicy()
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0,Integer.MAX_VALUE,60L,TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
}
複製程式碼

newScheduledThreadPool(int corePoolSize) / newSingleThreadScheduledExecutor()

  • 建立一個大小無限的執行緒池
  • 支援定時以及週期性執行任務的需求
public static void main(String[] args) {
    // new ScheduledThreadPoolExecutor(corePoolSize)
    ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5);

    // executorService 執行具有排程含義
    // delay: 3  SECONDS 後執行
	executorService.schedule(() -> System.out.println("schedule running"),2,TimeUnit.SECONDS);

    executorService.shutdown();
}
複製程式碼
public static void main(String[] args) {
    ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5);
    
    // executorService 執行具有排程含義
    // scheduleAtFixedRate 以指定的速率執行 每隔一段時候就觸發
    // 1: initalDelay 延遲1秒
    // 3: period 每格3秒
    executorService.scheduleAtFixedRate(() -> System.out.println(System.nanoTime()),3,TimeUnit.SECONDS);

    // 不適用關閉執行緒池
    // 若需要關閉執行緒池,可通過提供關閉資訊,再呼叫該方法
    // executorService.shutdown();
}
複製程式碼
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}

// --- 

// DEFAULT_KEEPALIVE_MILLIS = 10L
// MILLISECONDS = TimeUnit.MILLI_SCALE
public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize,DEFAULT_KEEPALIVE_MILLIS,MILLISECONDS,new DelayedWorkQueue());
}
複製程式碼

newWorkStealingPool(int parallelism)

Java 8才加入這個建立方法,其內部會構建ForkJoinPool,利用Work-Stealing演演算法,並行地處 理任務(預設為主機CPU的可用核心數),不保證處理順序

public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),ForkJoinPool.defaultForkJoinWorkerThreadFactory,null,true);
}

// --- 

public ForkJoinPool(int parallelism,ForkJoinWorkerThreadFactory factory,UncaughtExceptionHandler handler,boolean asyncMode) {

    this(checkParallelism(parallelism),checkFactory(factory),handler,asyncMode ? FIFO_QUEUE : LIFO_QUEUE,"ForkJoinPool-" + nextPoolId() + "-worker-");

    checkPermission();
}
複製程式碼

執行緒池的配置


需要更具實際情況而定。

  • CPU 密集型任務

需要儘可能壓榨 CPU。可參考 N / N+1 (N:CPU數量)

  • IO 密集型

可參考 執行緒數 = CPU核數 × (1 + 平均等待時間/平均工作時間)

參考閱讀


www.cnblogs.com/kuoAT/p/671…

www.cnblogs.com/dolphin0520…