1. 程式人生 > >線程池 概念理解

線程池 概念理解

data clas rejected 更新 dal amp 定時 btn 服務

轉 Java線程池解析

技術分享 作者 whthomas 關註 2016.04.06 20:07* 字數 1427 閱讀 340評論 0

Java的一大優勢是能完成多線程任務,對線程的封裝和調度非常好,那麽它又是如何實現的呢?

jdk的包下和線程相關類的類圖。

技術分享
屏幕快照 2016-04-06 下午2.01.16.png

從上面可以看出Java的線程池主的實現類主要有兩個類ThreadPoolExecutorForkJoinPool

ForkJoinPoolFork/Join框架下使用的一個線程池,一般情況下,我們使用的比較多的就是ThreadPoolExecutor。我們大多數時候創建線程池是通過Executors類的幾個方法實現的:

  • newFixedThreadPool():創建一個固定線程數的線程池,可控制線程最大並發數,適用需要限制線程池數量的應用場景。
public static ExecutorService newFixedThreadPool
(int nThreads)
{ return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory)
{ return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory); }
  • newSingleThreadExecutor():創建一個單線程的線程池,它只會用唯一的工作線程來執行任務,保證所有任務按照指定順序(FIFO)執行適用於那種需要按照線程數量執行的場景。
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>(),
                                threadFactory));
}
  • newCachedThreadPool():創建一個可以根據需要創建新線程的線程池,它是沒有線程數量限制的,適用於短期異步任務的操作,或者是負載比較輕的服務器。
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>(),
                                  threadFactory);
}
  • newScheduledThreadPool():創建一個固定線程數的線程池,支持定時及周期性執行後臺任務。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}
public static ScheduledExecutorService newScheduledThreadPool(
        int corePoolSize, ThreadFactory threadFactory) {
    return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}

這裏看到基本上這幾個方法都是返回了ThreadPoolExecutor這個對象。以下是ThreadPoolExecutor的構造方法:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)

幾個參數的含義:

  1. corePoolSize(線程池基本大小):當提交一個新任務到線程池,線程會創建一個新的線程來執行任務,無論其他的基本線程是否是空閑的狀態。這種情況會持續到當需要執行的任務數量大於線程池基本線程數量大小時就不再創建了。
  2. maximumPoolSize(線程池最大數量):線程池允許創建的最大線程數量,如果隊列滿了,並且已經創建的線程數小於最大線程數量,這個線程池會再創建新的線程執行任務。
  3. KeepAliveTime(線程保持活動的時間):線程空閑之後保持存活的時間。
  4. TimeUnit(線程保持活動時間的單位):可以使用TimeUnit時間單位來設置。
  5. runnableTaskQueue(任務隊列):用於保存等待執行的任務的阻塞隊列,可以選擇以下幾個:
    • ArrayBlockingQueue:基於數組的阻塞隊列,按照FIFO原則進行排序
    • LinkedBlockingQueue:基於鏈表的阻塞隊列,按照FIFO原則對元素進行排序,吞吐量高於ArrayBlockingQueue。Executors.newFixedThreadPool()使用了這個隊列。
    • SynchronousQueue:一個不儲存元素的阻塞隊列,每一個插入操作必須等到另外一個線程調用移除操作,否則插入操作一直處於阻塞狀態。吞吐量高於LinkedBlockingQueue,Executors.newCachedThreadPool使用了這個隊列。
    • PriorityBlockingQueue:一個具有優先級的無限阻塞隊列
  6. RejectedExecutionHandler(飽和策略):這個本身是Java的一個接口,當隊列和線程池都滿了,需要一種策略處理新的任務,在這個類的最下部提供了四種內置的實現類:
    • AbortPolicy:直接拋出異常。
    • CallerRunsPolicy:只用調用者所在的線程來運行任務。
    • DiscardOldestPolicy:丟棄隊列裏最近的一個任務,並執行當前的任務。
    • DiscardPolicy:不處理,直接丟棄。
    • 自定義策略:實現RejectedExecutionHandler接口,自定義策略。
  7. ThreadFactory(線程工廠):用於設置創建新的線程的工廠。可以使用guava的ThreadFactoryBuilder來創建一個ThreadFactory。

線程池用的最多的是`execute()方法,它的執行實際上分了三步:

  1. 當少量的線程在運行,線程的數量還沒有達到corePoolSize,那麽啟用新的線程來執行新任務。
  2. 如果線程數量已經達到了corePoolSize,那麽嘗試把任務緩存起來,然後二次檢查線程池的狀態,看這個時候是否能添加一個額外的線程,來執行這個任務。如果這個檢查到線程池關閉了,就拒絕任務。
  3. 如果我們沒法緩存這個任務,那麽我們就嘗試去添加線程去執行這個任務,如果失敗,可能任務已被取消或者任務隊列已經飽和,就拒絕掉這個任務。
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

線程池把每個線程都封裝成一個對象Worker,以上有一個關鍵的函數addWorker():

addWorker(Runnable firstTask, boolean core)

firstTask代表這個線程池首先要執行的任務,core代表是否使用corePoolSize來做為線程池線程的最大標記。

以上就是對線程池的一個基本解析。

Java並發 © 著作權歸作者所有 舉報文章

線程池 概念理解