1. 程式人生 > >Executor相關原始碼分析

Executor相關原始碼分析

Executor是一個用來執行提交的任務(Runnable)的物件。這個介面提供了一種將任務的提交和任務如何去執行解耦機制

Executor詳解

先來檢視Executor的介面定義:

public interface Executor {
    //在未來某個時間點執行給定的command任務,任務可能在一個新的執行緒、執行緒池、呼叫者執行緒中執行,這些取決與具體的Executor實現
    void execute(Runnable command);
}

Executor是更常用的方式,而不是使用傳統類似new Thread(new(RunnableTask())).start()來建立執行緒,使用Executor我們可以使用以下的方式來建立執行緒:

Executor executor = <em>anExecutor</em>;
executor.execute(new RunnableTask1());
executor.execute(new RunnableTask2());
...

當然,Executor介面並沒有很嚴格地要求任務的執行是非同步的,在一些簡單的場景中,能夠直接在呼叫的執行緒中執行提交的任務:

class DirectExecutor implements Executor {
   public void execute(Runnable r) {
      r.run();
   }
 }

但是,我們通常會使用另外一個額外的執行緒來非同步執行這些任務:

class ThreadPerTaskExecutor implements Executor {
    public void execute(Runnable r) {
      new Thread(r).start();
    }
  }

這裡也可以通過一個任務的執行去觸發下一個任務的執行:

 class SerialExecutor implements Executor {
    final Queue<Runnable> tasks = new ArrayDeque<Runnable>();
    final
Executor executor; Runnable active; SerialExecutor(Executor executor) { this.executor = executor; } public synchronized void execute(final Runnable r) { tasks.offer(new Runnable() { public void run() { try { r.run(); } finally { scheduleNext(); } } }); if (active == null) { scheduleNext(); } } protected synchronized void scheduleNext() { if ((active = tasks.poll()) != null) { executor.execute(active); } } }

ExecutorService介面詳解

ExecutorService是Executor介面的擴充套件,提供一系列的方法來管理任務的結束狀態和產生一個Future來追蹤一個或多個非同步任務的執行過程

介面定義:

public interface ExecutorService extends Executor{}

一個ExecutorService能夠被關閉,之後會導致無法繼續新增新的任務。這裡提供了兩種關閉的方法,一種是shutdown允許繼續執行之前已經提交的相關任務,全部執行完之後才會關閉;另一種是shutdownNow會阻止準備被接受的任務,同時會停止當前正在執行的任務。因此不用的ExecutorService應該關閉掉以有效的回收資源。

對於ExecutorService中的submit方法是基於對Executor中的execute方法的一個擴充套件,能夠建立一個Future用來取消任務的執行或者是等待任務的執行結束。另外對於很多個任務的執行invokeAny和invokeAll是也很常用的方法,前者用來等待至少一個任務執行完成,後者等待所有的任務執行完成。

常用的方法:

Future<?> submit(Runnable task);

Executors詳解

Executors提供了針對 Executor, ExecutorService, ScheduledExecutorService, ThreadFactory, and Callable這些類的工廠和輔助方法

核心的方法:

//建立一個執行緒池來執行佇列中的任務,並用給定的執行緒工廠來建立必要的執行緒
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>(),
                                      threadFactory);
    }

對應的底層ThreadPoolExecutor實現程式碼:

//以下引數部分是可以預設
public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             threadFactory, defaultHandler);
    }
//corePoolSize:池中的核心執行緒數,即使空閒也不會釋放(除非設定allowCoreThreadTimeOut)
//maximumPoolSize:池中能夠擁有的最大執行緒數
//keepAliveTime:當超過核心執行緒數的執行緒空閒該時長後會被釋放
//unit:上述時長的單位
//workQueue:佇列持有將會被執行的任務,這些任務(Runnable)是通過execute方法提交到佇列中
//threadFactory:用來建立池中執行緒的工廠
//單執行緒執行提交的任務,因為只有一個執行緒,所以不會在同一時刻執行超過一個任務數
public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
//執行緒資源被建立好之後能夠快取60秒,用來重複使用來執行後續的任務,使用於大量短時的非同步任務,但是執行緒數是沒有限定的,因此需要謹慎使用
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
//執行緒池用來定期執行佇列中的任務
public static ScheduledExecutorService newScheduledThreadPool(
            int corePoolSize, ThreadFactory threadFactory) {
        return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
    }

//底層實現與之前不同的在於佇列的實現:
public ScheduledThreadPoolExecutor(int corePoolSize,
                                       ThreadFactory threadFactory) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue(), threadFactory);
    }
//DelayedWorkQueue為延時佇列,也是實現了BlockingQueue