Java執行緒池實現原理
引數配置
核心池大小、最大池大小
/**
* Core pool size is the minimum number of workers to keep alive
* (and not allow to time out etc) unless allowCoreThreadTimeOut
* is set, in which case the minimum is zero.
*/
private volatile int corePoolSize;
/**
* Maximum pool size. Note that the actual maximum is internally
* bounded by CAPACITY.
*/
private volatile int maximumPoolSize;
/**
* Timeout in nanoseconds for idle threads waiting for work.
* Threads use this timeout when there are more than corePoolSize
* present or if allowCoreThreadTimeOut. Otherwise they wait
* forever for new work.
*/
private volatile long keepAliveTime;
/**
* If false (default), core threads stay alive even when idle.
* If true, core threads use keepAliveTime to time out waiting
* for work.
*/
private volatile boolean allowCoreThreadTimeOut;
ThreadPoolExecutor根據這兩個引數自動分配池大小。當執行緒數>corePoolSize 時,多於corePoolSize 的執行緒在超過keepAliveTime時間後,會終止執行。
另外如果設定了allowCoreThreadTimeOut為true,核心執行緒在空閒時間超過keepAliveTime 後終止執行。
執行緒分配流程:
- If fewer than corePoolSize threads are running, the Executor
- always prefers adding a new thread
- rather than queuing.
- If corePoolSize or more threads are running, the Executor
- always prefers queuing a request rather than adding a new
- thread.
- If a request cannot be queued, a new thread is created *unless this would exceed maximumPoolSize, in which case, the *task will be
- rejected.
如果有新請求提交到執行緒池,且執行中的執行緒數<corePoolSize,新建一個執行緒處理請求(即使有工作執行緒處於空閒狀態)。
如果corePoolSize<=執行執行緒數,請求被放入佇列。
如果corePoolSize<執行執行緒<maximumPoolSize,除非佇列已滿,才會新建一個執行緒處理請求。
如果執行執行緒達到maximumPoolSize且佇列已滿,將會執行rejectedExecution方法。
執行過程示意圖
佇列實現策略
SynchronousQueue:如果執行緒池中沒有可用執行緒,將會建立一個新的執行緒。maximumPoolSize無限大,以防止提交的任務達到maximumPoolSize大小後被拒絕執行。
LinkedBlockingQueue:基於連結串列結構的阻塞佇列,如果corePoolSize全部執行緒用完,在佇列中等待,不會建立新執行緒,適合彼此獨立的任務。
- ArrayBlockingQueue:配合有限的maximumPoolSizes,防止資源用盡,但難以協調控制。佇列的大小和maximumPoolSizes需要權衡。
執行緒池的飽和策略
1.AbortPolicy
預設實現,直接丟擲異常
/**
* A handler for rejected tasks that throws a
* {@code RejectedExecutionException}.
*/
public static class AbortPolicy implements RejectedExecutionHandler {
/**
* Creates an {@code AbortPolicy}.
*/
public AbortPolicy() { }
/**
* Always throws RejectedExecutionException.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
* @throws RejectedExecutionException always
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
2.CallerRunsPolicy
executor呼叫執行緒直接執行被拒絕的任務(Runnable),除非呼叫執行緒executor被關閉,此時任務被丟棄。
/**
* A handler for rejected tasks that runs the rejected task
* directly in the calling thread of the {@code execute} method,
* unless the executor has been shut down, in which case the task
* is discarded.
*/
public static class CallerRunsPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code CallerRunsPolicy}.
*/
public CallerRunsPolicy() { }
/**
* Executes task r in the caller's thread, unless the executor
* has been shut down, in which case the task is discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
3.DiscardPolicy
直接丟棄被拒絕的任務,不做任何處理.
/**
* A handler for rejected tasks that silently discards the
* rejected task.
*/
public static class DiscardPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardPolicy}.
*/
public DiscardPolicy() { }
/**
* Does nothing, which has the effect of discarding task r.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
4.DiscardOldestPolicy
從佇列中丟棄被拒絕的請求,executor呼叫執行緒執行被拒絕的任務(Runnable),除非呼叫執行緒executor被關閉,此時任務被丟棄。
/**
* A handler for rejected tasks that discards the oldest unhandled
* request and then retries {@code execute}, unless the executor
* is shut down, in which case the task is discarded.
*/
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardOldestPolicy} for the given executor.
*/
public DiscardOldestPolicy() { }
/**
* Obtains and ignores the next task that the executor
* would otherwise execute, if one is immediately available,
* and then retries execution of task r, unless the executor
* is shut down, in which case task r is instead discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}