java 執行緒池初步涉足
先看以下程式碼,是我們建立執行緒池的一種方式:
ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
可以進入Executors類看一下,java建立執行緒池的四種方式,分別有以下四個大類:
(1)newCachedThreadPool建立一個可快取執行緒池,如果執行緒池長度超過處理需要,可靈活回收空閒執行緒(空閒執行緒超過存活時間的可以回收),若無可回收,則新建執行緒。
(2)newFixedThreadPool 建立一個定長執行緒池,可控制執行緒最大併發數,超出的執行緒會在佇列中等待。
(3)newScheduledThreadPool 建立一個定長執行緒池,支援定時及週期性任務執行。
(4)newSingleThreadExecutor 建立一個單執行緒化的執行緒池,它只會用唯一的工作執行緒來執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先順序)執行。
當然在Executors中創先以上四種執行緒池的方式還有很多,一會有所涉及。
我們現在進入Executors類看看建立上述四種執行緒池有哪些方式:
(1)newFixedThreadPool
/** * Creates a thread pool that reuses a fixed number of threads * operating off a shared unbounded queue. At any point, at most * {@code nThreads} threads will be active processing tasks. * If additional tasks are submitted when all threads are active, * they will wait in the queue until a thread is available. * If any thread terminates due to a failure during execution * prior to shutdown, a new one will take its place if needed to * execute subsequent tasks. The threads in the pool will exist * until it is explicitly {@link ExecutorService#shutdown shutdown}. * * @param nThreads the number of threads in the pool * @return the newly created thread pool * @throws IllegalArgumentException if {@code nThreads <= 0} */ public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } /** * Creates a thread pool that reuses a fixed number of threads * operating off a shared unbounded queue, using the provided * ThreadFactory to create new threads when needed. At any point, * at most {@code nThreads} threads will be active processing * tasks. If additional tasks are submitted when all threads are * active, they will wait in the queue until a thread is * available. If any thread terminates due to a failure during * execution prior to shutdown, a new one will take its place if * needed to execute subsequent tasks. The threads in the pool will * exist until it is explicitly {@link ExecutorService#shutdown * shutdown}. * * @param nThreads the number of threads in the pool * @param threadFactory the factory to use when creating new threads * @return the newly created thread pool * @throws NullPointerException if threadFactory is null * @throws IllegalArgumentException if {@code nThreads <= 0} */ public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory); }
這邊有兩種方式是用來建立newFixedThreadPool執行緒池的,一種是直接傳入執行緒的個數,另外一種是傳進執行緒的個數,還有執行緒工廠(ThreadFactory)。有些任務需求對於如何建立執行緒有自己安排,所以需要傳入自己實現ThreadFactory介面的執行緒工廠類。仔細看,其實這兩個方法都使用了類ThreadPoolExecutor。可以去類ThreadPoolExecutor中看看裡面建構函式中引數的意思,其建構函式程式碼如下:可以看到他呼叫了this(....)建構函式,我順便把他底層的建構函式都寫在後面,方便檢視。
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); } /** * Creates a new {@code ThreadPoolExecutor} with the given initial * parameters. * * @param corePoolSize the number of threads to keep in the pool, even * if they are idle, unless {@code allowCoreThreadTimeOut} is set * @param maximumPoolSize the maximum number of threads to allow in the * pool * @param keepAliveTime when the number of threads is greater than * the core, this is the maximum time that excess idle threads * will wait for new tasks before terminating. * @param unit the time unit for the {@code keepAliveTime} argument * @param workQueue the queue to use for holding tasks before they are * executed. This queue will hold only the {@code Runnable} * tasks submitted by the {@code execute} method. * @param threadFactory the factory to use when the executor * creates a new thread * @param handler the handler to use when execution is blocked * because the thread bounds and queue capacities are reached * @throws IllegalArgumentException if one of the following holds:<br> * {@code corePoolSize < 0}<br> * {@code keepAliveTime < 0}<br> * {@code maximumPoolSize <= 0}<br> * {@code maximumPoolSize < corePoolSize} * @throws NullPointerException if {@code workQueue} * or {@code threadFactory} or {@code handler} is null */ public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
這裡面一共有六個引數,分別如下:
corePoolSize是執行緒池中所維持的執行緒池大小。這些執行緒即使是空閒狀態,也會存在於執行緒池中,不會被銷燬。這個執行緒池的核心執行緒數也可以設定為0,像newCachedThreadPool中就是將核心執行緒數設定為0的。
maxPoolSize是執行緒池的最大執行緒數。這個最大執行緒數是對於或等於核心執行緒數的,當你用newFixedThreadPool和newSingleThreadPool時,核心執行緒數和最大執行緒數是相等的,這一點你可以在這兩種執行緒池的建構函式中得到驗證。
keepAliveTime是用來約束某種執行緒的生存時間的,這個某種執行緒是什麼意思呢,當執行緒池所使用的執行緒數設為n,這個某種執行緒是一類執行緒,就是n-corePoolSize的執行緒,就是超過核心執行緒數的執行緒。當執行緒池所建立的執行緒數大於核心執行緒數時,此時會計算當前的這些執行緒中哪些執行緒是空閒的,當空閒時間超過這個設定的keepAliveTime時,該執行緒就會被回收。
unit就是時間單位
workQueue:工作佇列,該佇列是當核心執行緒數已經使用完了,之後提交的任務就將放入這個工作佇列,尤其需要注意的是,只有當你用execute()方法提交的任務才會放在這個工作佇列中。一般到佇列滿的時候,才會建立先的執行緒來執行任務,當然要保證執行緒池的所有執行緒數小於或等於最大執行緒數。
threadFactory:執行緒工廠用於建立執行緒的。有些需求可能需要自己建立的執行緒。
handler:是拒絕策略,也叫飽和策略,當執行緒池中的執行緒數達到maxPoolSize,並且workQueue(有界佇列)已經滿了,就需要採用飽和策略了,對於新提交的任務是直接拒絕還是怎麼樣,是在這邊設定的。
這邊瞭解了底層建立執行緒池的引數,回過頭來看看newFixedThreadPool是怎麼建立執行緒池的:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
newFixedThreadPool是是有一個引數,就是執行緒數nThread。這個引數用於設定核心執行緒數和最大執行緒數,其中的工作佇列是LinkedBlockingQueue,這個佇列是連結串列實現的,也就是說該工作佇列不會滿,除非你限制他的長度。而且該執行緒池中的最大執行緒數就是核心執行緒數。
接著我們看看其他幾個執行緒池的構造方式。
(2)newSingleThreadExecutor
看看建構函式,核心執行緒數和最大執行緒數都是1,生存時間0,也是LinkedBlockingQueue
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
(3)newCachedThreadPool
這邊有兩個建構函式,就是傳入承諾書不一樣,一個啥也沒傳,一個穿了執行緒工廠。最大執行緒數是Integer的最大值,核心執行緒數為0,執行緒最大空閒時間為60秒。工作佇列是SynchronousQueue,這個queue是即是交付的,就是任務一到就建立執行緒去執行。整個工作機制如下:來一個任務就建立執行緒去執行,當然有空閒執行緒讓這些空閒執行緒去執行,空閒執行緒有一個60s生存時間。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
/**
* Creates a thread pool that creates new threads as needed, but
* will reuse previously constructed threads when they are
* available, and uses the provided
* ThreadFactory to create new threads when needed.
* @param threadFactory the factory to use when creating new threads
* @return the newly created thread pool
* @throws NullPointerException if threadFactory is null
*/
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
(4)newScheuledThreadPool
/**
* Creates a thread pool that can schedule commands to run after a
* given delay, or to execute periodically.
* @param corePoolSize the number of threads to keep in the pool,
* even if they are idle
* @return a newly created scheduled thread pool
* @throws IllegalArgumentException if {@code corePoolSize < 0}
*/
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
/**
* Creates a thread pool that can schedule commands to run after a
* given delay, or to execute periodically.
* @param corePoolSize the number of threads to keep in the pool,
* even if they are idle
* @param threadFactory the factory to use when the executor
* creates a new thread
* @return a newly created scheduled thread pool
* @throws IllegalArgumentException if {@code corePoolSize < 0}
* @throws NullPointerException if threadFactory is null
*/
public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
這個定時執行緒池底層使用了ScheuledThreadPoolExecytor類,那我們去看看這個類是怎麼構造的
/**
* Creates a new {@code ScheduledThreadPoolExecutor} with the
* given core pool size.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @throws IllegalArgumentException if {@code corePoolSize < 0}
*/
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
該類繼承 extends ThreadPoolExecutor,所使用的引數就是執行緒池核心執行緒數,最大執行緒數是Integer的最大值,佇列用了延遲佇列,沒有生存時間這一說法。