1. 程式人生 > 其它 >Executors原始碼之執行緒池

Executors原始碼之執行緒池

首先,看下四種執行緒池的八種構造方法:

/**
     * Creates a thread pool that reuses a fixed number of threads
     * operating off a shared unbounded queue.  At any point, at most
     * {@code nThreads} threads will be active processing tasks.
     * If additional tasks are submitted when all threads are active,
     * they will wait in the queue until a thread is available.
     * If any thread terminates due to a failure during execution
     * prior to shutdown, a new one will take its place if needed to
     * execute subsequent tasks.  The threads in the pool will exist
     * until it is explicitly {
@link ExecutorService#shutdown shutdown}. * * @param nThreads the number of threads in the pool * @return the newly created thread pool * @throws IllegalArgumentException if {@code nThreads <= 0} */ public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } /** * Creates a thread pool that maintains enough threads to support * the given parallelism level, and may use multiple queues to * reduce contention. The parallelism level corresponds to the * maximum number of threads actively engaged in, or available to * engage in, task processing. The actual number of threads may * grow and shrink dynamically. A work-stealing pool makes no * guarantees about the order in which submitted tasks are * executed. * *
@param parallelism the targeted parallelism level * @return the newly created thread pool * @throws IllegalArgumentException if {@code parallelism <= 0} * @since 1.8 */ public static ExecutorService newWorkStealingPool(int parallelism) { return new ForkJoinPool (parallelism, ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true); } /** * Creates a work-stealing thread pool using all * {@link Runtime#availableProcessors available processors} * as its target parallelism level. * @return the newly created thread pool * @see #newWorkStealingPool(int) * @since 1.8 */ public static ExecutorService newWorkStealingPool() { return new ForkJoinPool (Runtime.getRuntime().availableProcessors(), ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true); } /** * Creates a thread pool that reuses a fixed number of threads * operating off a shared unbounded queue, using the provided * ThreadFactory to create new threads when needed. At any point, * at most {@code nThreads} threads will be active processing * tasks. If additional tasks are submitted when all threads are * active, they will wait in the queue until a thread is * available. If any thread terminates due to a failure during * execution prior to shutdown, a new one will take its place if * needed to execute subsequent tasks. The threads in the pool will * exist until it is explicitly {@link ExecutorService#shutdown * shutdown}. * * @param nThreads the number of threads in the pool * @param threadFactory the factory to use when creating new threads * @return the newly created thread pool * @throws NullPointerException if threadFactory is null * @throws IllegalArgumentException if {@code nThreads <= 0} */ public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory); } /** * Creates an Executor that uses a single worker thread operating * off an unbounded queue. (Note however that if this single * thread terminates due to a failure during execution prior to * shutdown, a new one will take its place if needed to execute * subsequent tasks.) Tasks are guaranteed to execute * sequentially, and no more than one task will be active at any * given time. Unlike the otherwise equivalent * {@code newFixedThreadPool(1)} the returned executor is * guaranteed not to be reconfigurable to use additional threads. * * @return the newly created single-threaded Executor */ public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } /** * Creates an Executor that uses a single worker thread operating * off an unbounded queue, and uses the provided ThreadFactory to * create a new thread when needed. Unlike the otherwise * equivalent {@code newFixedThreadPool(1, threadFactory)} the * returned executor is guaranteed not to be reconfigurable to use * additional threads. * * @param threadFactory the factory to use when creating new * threads * * @return the newly created single-threaded Executor * @throws NullPointerException if threadFactory is null */ public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory)); } /** * Creates a thread pool that creates new threads as needed, but * will reuse previously constructed threads when they are * available. These pools will typically improve the performance * of programs that execute many short-lived asynchronous tasks. * Calls to {@code execute} will reuse previously constructed * threads if available. If no existing thread is available, a new * thread will be created and added to the pool. Threads that have * not been used for sixty seconds are terminated and removed from * the cache. Thus, a pool that remains idle for long enough will * not consume any resources. Note that pools with similar * properties but different details (for example, timeout parameters) * may be created using {@link ThreadPoolExecutor} constructors. * * @return the newly created thread pool */ public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } /** * Creates a thread pool that creates new threads as needed, but * will reuse previously constructed threads when they are * available, and uses the provided * ThreadFactory to create new threads when needed. * @param threadFactory the factory to use when creating new threads * @return the newly created thread pool * @throws NullPointerException if threadFactory is null */ public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), threadFactory); }

newSingleThreadExecutor建立一個單執行緒化的執行緒池,它只會用唯一的工作執行緒來執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先順序)執行。

newFixedThreadPool建立一個定長執行緒池,可控制執行緒最大併發數,超出的執行緒會在佇列中等待。

newScheduledThreadPool建立一個可定期或者延時執行任務的定長執行緒池,支援定時及週期性任務執行。

newCachedThreadPool建立一個可快取執行緒池,如果執行緒池長度超過處理需要,可靈活回收空閒執行緒,若無可回收,則新建執行緒。

四種執行緒池都是建立了ThreadPollExecutor物件,只是傳遞的引數不一樣而已,觀察傳入的workQueue 都是預設,即最大可新增Integer.MAX_VALUE個任務,這也就是阿里巴巴java開發規範禁止直接使用java提供的預設執行緒池的原因了

ThreadPoolExecutor構造引數

int corePoolSize, 核心執行緒大小

int maximumPoolSize,最大執行緒大小

long keepAliveTime, 超過corePoolSize的執行緒多久不活動被銷燬時間

TimeUnit unit,時間單位

BlockingQueue<Runnable>workQueue 任務佇列

ThreadFactory threadFactory 執行緒池工廠

RejectedExecutionHandler handler 拒絕策略

其中任務佇列:

ArrayBlockingQueue :一個由陣列結構組成的有界阻塞佇列

LinkedBlockingQueue :一個由連結串列結構組成的有界阻塞佇列(常用)

PriorityBlockingQueue :一個支援優先順序排序的無界阻塞佇列

DelayQueue: 一個使用優先順序佇列實現的無界阻塞佇列

SynchronousQueue: 一個不儲存元素的阻塞佇列(常用)

LinkedTransferQueue: 一個由連結串列結構組成的無界阻塞佇列

LinkedBlockingDeque: 一個由連結串列結構組成的雙向阻塞佇列

執行緒池I的執行流程

當執行緒池小於corePoolSize時,新提交任務將建立一個新執行緒執行任務,即使此時執行緒池中存在空閒執行緒。

當執行緒池達到corePoolSize時,新提交任務將被放入workQueue中,等待執行緒池中任務排程執行

當workQueue已滿,且maximumPoolSize>corePoolSize時,新提交任務會建立新執行緒執行任務

當提交任務數超過maximumPoolSize時,新提交任務由RejectedExecutionHandler處理

當執行緒池中超過corePoolSize執行緒,空閒時間達到keepAliveTime時,釋放空閒執行緒

當設定allowCoreThreadTimeOut(true)時,該引數預設false,執行緒池中corePoolSize執行緒空閒時間達到keepAliveTime也將關閉

不同的使用場景

newFixedThreadPool:

  • 底層:返回ThreadPoolExecutor例項,接收引數為所設定執行緒數量n,corePoolSize和maximumPoolSize均為n;keepAliveTime為0L;時間單位TimeUnit.MILLISECONDS;WorkQueue為:new LinkedBlockingQueue<Runnable>()無界阻塞佇列
  • 通俗:建立可容納固定數量執行緒的池子,每個執行緒的存活時間是無限的,當池子滿了就不再新增執行緒了;如果池中的所有執行緒均在繁忙狀態,對於新任務會進入阻塞佇列中(無界的阻塞佇列)
  • 適用:執行長期任務

newSingleThreadExecutor:

  • 底層:FinalizableDelegatedExecutorService包裝的ThreadPoolExecutor例項,corePoolSize為1;maximumPoolSize為1;keepAliveTime為0L;時間單位TimeUnit.MILLISECONDS;workQueue為:new LinkedBlockingQueue<Runnable>()無解阻塞佇列
  • 通俗:建立只有一個執行緒的執行緒池,當該執行緒正繁忙時,對於新任務會進入阻塞佇列中(無界的阻塞佇列)
  • 適用:按順序執行任務的場景

newCachedThreadPool:

  • 底層:返回ThreadPoolExecutor例項,corePoolSize為0;maximumPoolSize為Integer.MAX_VALUE;keepAliveTime為60L;時間單位TimeUnit.SECONDS;workQueue為SynchronousQueue(同步佇列)
  • 通俗:當有新任務到來,則插入到SynchronousQueue中,由於SynchronousQueue是同步佇列,因此會在池中尋找可用執行緒來執行,若有可以執行緒則執行,若沒有可用執行緒則建立一個執行緒來執行該任務;若池中執行緒空閒時間超過指定時間,則該執行緒會被銷燬。
  • 適用:執行很多短期的非同步任務

NewScheduledThreadPool:

  • 底層:建立ScheduledThreadPoolExecutor例項,該物件繼承了ThreadPoolExecutor,corePoolSize為傳遞來的引數,maximumPoolSize為Integer.MAX_VALUE;keepAliveTime為0;時間單位TimeUnit.NANOSECONDS;workQueue為:new DelayedWorkQueue()一個按超時時間升序排序的佇列
  • 通俗:建立一個固定大小的執行緒池,執行緒池內執行緒存活時間無限制,執行緒池可以支援定時及週期性任務執行,如果所有執行緒均處於繁忙狀態,對於新任務會進入DelayedWorkQueue佇列中,這是一種按照超時時間排序的佇列結構
  • 適用:執行週期性任務

注:

  • 一般如果執行緒池任務佇列採用LinkedBlockingQueue佇列的話,那麼不會拒絕任何任務(因為其大小為Integer.MAX_VALUE),這種情況下,ThreadPoolExecutor最多僅會按照最小執行緒數corePoolSize來建立執行緒,也就是說執行緒池大小被忽略了。
  • 如果執行緒池任務佇列採用ArrayBlockingQueue佇列,初始化設定了最大佇列數。那麼ThreadPoolExecutor的maximumPoolSize才會生效,那麼ThreadPoolExecutor的maximumPoolSize才會生效會採用新的演算法處理任務,
  • 例如假定執行緒池的最小執行緒數為4,最大為8,ArrayBlockingQueue最大為10。隨著任務到達並被放到佇列中,執行緒池中最多執行4個執行緒(即核心執行緒數)直到佇列完全填滿,也就是說等待狀態的任務小於等於10,ThreadPoolExecutor也只會利用4個核心執行緒執行緒處理任務。
  • 如果佇列已滿,而又有新任務進來,此時才會啟動一個新執行緒,這裡不會因為佇列已滿而拒接該任務,相反會啟動一個新執行緒。新執行緒會執行佇列中的第一個任務,為新來的任務騰出空間。如果執行緒數已經等於最大執行緒數,任務佇列也已經滿了,則執行緒池會拒絕這個任務,預設拒絕策略是丟擲異常。
  • 這個演算法背的理念是:該池大部分時間僅使用核心執行緒(4個),即使有適量的任務在佇列中等待執行。這時執行緒池就可以用作節流閥。如果擠壓的請求變得非常多,這時該池就會嘗試執行更多的執行緒來清理;這時第二個節流閥—最大執行緒數就起作用了。

返回類ExecutorService extends Executor

new類ThreadPoolExecutor extends AbstractExecutorServiceimplements ExecutorServiceextends Executor