Executor相關原始碼分析
Executor是一個用來執行提交的任務(Runnable)的物件。這個介面提供了一種將任務的提交和任務如何去執行解耦機制
Executor詳解
先來檢視Executor的介面定義:
public interface Executor {
//在未來某個時間點執行給定的command任務,任務可能在一個新的執行緒、執行緒池、呼叫者執行緒中執行,這些取決與具體的Executor實現
void execute(Runnable command);
}
Executor是更常用的方式,而不是使用傳統類似new Thread(new(RunnableTask())).start()來建立執行緒,使用Executor我們可以使用以下的方式來建立執行緒:
Executor executor = <em>anExecutor</em>;
executor.execute(new RunnableTask1());
executor.execute(new RunnableTask2());
...
當然,Executor介面並沒有很嚴格地要求任務的執行是非同步的,在一些簡單的場景中,能夠直接在呼叫的執行緒中執行提交的任務:
class DirectExecutor implements Executor {
public void execute(Runnable r) {
r.run();
}
}
但是,我們通常會使用另外一個額外的執行緒來非同步執行這些任務:
class ThreadPerTaskExecutor implements Executor {
public void execute(Runnable r) {
new Thread(r).start();
}
}
這裡也可以通過一個任務的執行去觸發下一個任務的執行:
class SerialExecutor implements Executor {
final Queue<Runnable> tasks = new ArrayDeque<Runnable>();
final Executor executor;
Runnable active;
SerialExecutor(Executor executor) {
this.executor = executor;
}
public synchronized void execute(final Runnable r) {
tasks.offer(new Runnable() {
public void run() {
try {
r.run();
} finally {
scheduleNext();
}
}
});
if (active == null) {
scheduleNext();
}
}
protected synchronized void scheduleNext() {
if ((active = tasks.poll()) != null) {
executor.execute(active);
}
}
}
ExecutorService介面詳解
ExecutorService是Executor介面的擴充套件,提供一系列的方法來管理任務的結束狀態和產生一個Future來追蹤一個或多個非同步任務的執行過程
介面定義:
public interface ExecutorService extends Executor{}
一個ExecutorService能夠被關閉,之後會導致無法繼續新增新的任務。這裡提供了兩種關閉的方法,一種是shutdown允許繼續執行之前已經提交的相關任務,全部執行完之後才會關閉;另一種是shutdownNow會阻止準備被接受的任務,同時會停止當前正在執行的任務。因此不用的ExecutorService應該關閉掉以有效的回收資源。
對於ExecutorService中的submit方法是基於對Executor中的execute方法的一個擴充套件,能夠建立一個Future用來取消任務的執行或者是等待任務的執行結束。另外對於很多個任務的執行invokeAny和invokeAll是也很常用的方法,前者用來等待至少一個任務執行完成,後者等待所有的任務執行完成。
常用的方法:
Future<?> submit(Runnable task);
Executors詳解
Executors提供了針對 Executor, ExecutorService, ScheduledExecutorService, ThreadFactory, and Callable這些類的工廠和輔助方法
核心的方法:
//建立一個執行緒池來執行佇列中的任務,並用給定的執行緒工廠來建立必要的執行緒
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
對應的底層ThreadPoolExecutor實現程式碼:
//以下引數部分是可以預設
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
//corePoolSize:池中的核心執行緒數,即使空閒也不會釋放(除非設定allowCoreThreadTimeOut)
//maximumPoolSize:池中能夠擁有的最大執行緒數
//keepAliveTime:當超過核心執行緒數的執行緒空閒該時長後會被釋放
//unit:上述時長的單位
//workQueue:佇列持有將會被執行的任務,這些任務(Runnable)是通過execute方法提交到佇列中
//threadFactory:用來建立池中執行緒的工廠
//單執行緒執行提交的任務,因為只有一個執行緒,所以不會在同一時刻執行超過一個任務數
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
//執行緒資源被建立好之後能夠快取60秒,用來重複使用來執行後續的任務,使用於大量短時的非同步任務,但是執行緒數是沒有限定的,因此需要謹慎使用
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
//執行緒池用來定期執行佇列中的任務
public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
//底層實現與之前不同的在於佇列的實現:
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory);
}
//DelayedWorkQueue為延時佇列,也是實現了BlockingQueue