1. 程式人生 > >java併發程式設計--Executor框架

java併發程式設計--Executor框架

摘要:

Eexecutor作為靈活且強大的非同步執行框架,其支援多種不同型別的任務執行策略,提供了一種標準的方法將任務的提交過程和執行過程解耦開發,基於生產者-消費者模式,其提交任務的執行緒相當於生產者,執行任務的執行緒相當於消費者,並用Runnable來表示任務,Executor的實現還提供了對生命週期的支援,以及統計資訊收集,應用程式管理機制和效能監視等機制。

Exexctor簡介

Executor的UML圖:(常用的幾個介面和子類)
Executor的UML圖

  • Executor:一個介面,其定義了一個接收Runnable物件的方法executor,其方法簽名為executor(Runnable command),
  • ExecutorService:是一個比Executor使用更廣泛的子類介面,其提供了生命週期管理的方法,以及可跟蹤一個或多個非同步任務執行狀況返回Future的方法
  • AbstractExecutorService:ExecutorService執行方法的預設實現
  • ScheduledExecutorService:一個可定時排程任務的介面
  • ScheduledThreadPoolExecutor:ScheduledExecutorService的實現,一個可定時排程任務的執行緒池
  • ThreadPoolExecutor:執行緒池,可以通過呼叫Executors以下靜態工廠方法來建立執行緒池並返回一個ExecutorService物件

ThreadPoolExecutor建構函式的各個引數說明

ThreadPoolExecutor方法簽名:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) //後兩個引數為可選引數

引數說明:

  • corePoolSize:核心執行緒數,如果執行的執行緒少於corePoolSize,則建立新執行緒來執行新任務,即使執行緒池中的其他執行緒是空閒的
  • maximumPoolSize:最大執行緒數,可允許建立的執行緒數,corePoolSize和maximumPoolSize設定的邊界自動調整池大小:
    • corePoolSize <執行的執行緒數< maximumPoolSize:僅當佇列滿時才建立新執行緒
    • corePoolSize=執行的執行緒數= maximumPoolSize:建立固定大小的執行緒池
  • keepAliveTime:如果執行緒數多於corePoolSize,則這些多餘的執行緒的空閒時間超過keepAliveTime時將被終止
  • unit:keepAliveTime引數的時間單位
  • workQueue:儲存任務的阻塞佇列,與執行緒池的大小有關:
    當執行的執行緒數少於corePoolSize時,在有新任務時直接建立新執行緒來執行任務而無需再進佇列
    當執行的執行緒數等於或多於corePoolSize,在有新任務新增時則選加入佇列,不直接建立執行緒
    當佇列滿時,在有新任務時就建立新執行緒
  • threadFactory:使用ThreadFactory建立新執行緒,預設使用defaultThreadFactory建立執行緒
  • handle:定義處理被拒絕任務的策略,預設使用ThreadPoolExecutor.AbortPolicy,任務被拒絕時將丟擲RejectExecutorException

Executors:提供了一系列靜態工廠方法用於建立各種執行緒池

  • newFixedThreadPool:建立可重用且固定執行緒數的執行緒池,如果執行緒池中的所有執行緒都處於活動狀態,此時再提交任務就在佇列中等待,直到有可用執行緒;如果執行緒池中的某個執行緒由於異常而結束時,執行緒池就會再補充一條新執行緒。
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  //使用一個基於FIFO排序的阻塞佇列,在所有corePoolSize執行緒都忙時新任務將在佇列中等待
                                  new LinkedBlockingQueue<Runnable>());
}
  • newSingleThreadExecutor:建立一個單執行緒的Executor,如果該執行緒因為異常而結束就新建一條執行緒來繼續執行後續的任務
public static ExecutorService newSingleThreadExecutor() {
   return new FinalizableDelegatedExecutorService
                     //corePoolSize和maximumPoolSize都等於1,表示固定執行緒池大小為1
                        (new ThreadPoolExecutor(1, 1,
                                                0L, TimeUnit.MILLISECONDS,
                                                new LinkedBlockingQueue<Runnable>()));
}
  • newScheduledThreadPool:建立一個可延遲執行或定期執行的執行緒池
    這裡寫圖片描述
    例1:(使用newScheduledThreadPool來模擬心跳機制)
public class HeartBeat {
    public static void main(String[] args) {
        ScheduledExecutorService executor = Executors.newScheduledThreadPool(5);
        Runnable task = new Runnable() {
            public void run() {
                System.out.println("HeartBeat.........................");
            }
        };
        executor.scheduleAtFixedRate(task,5,3, TimeUnit.SECONDS);   //5秒後第一次執行,之後每隔3秒執行一次
    }
}

輸出:

HeartBeat....................... //5秒後第一次輸出
HeartBeat....................... //每隔3秒輸出一個
  • newCachedThreadPool:建立可快取的執行緒池,如果執行緒池中的執行緒在60秒未被使用就將被移除,在執行新的任務時,當執行緒池中有之前建立的可用執行緒就重 用可用執行緒,否則就新建一條執行緒
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  //使用同步佇列,將任務直接提交給執行緒
                                  new SynchronousQueue<Runnable>());
}

例2:

public class ThreadPoolTest {
    public static void main(String[] args) throws InterruptedException {
     ExecutorService threadPool = Executors.newCachedThreadPool();//執行緒池裡面的執行緒數會動態變化,並可線上程線被移除前重用
        for (int i = 1; i <= 3; i ++) {
            final  int task = i;   //10個任務
            //TimeUnit.SECONDS.sleep(1);
            threadPool.execute(new Runnable() {    //接受一個Runnable例項
                public void run() {
                        System.out.println("執行緒名字: " + Thread.currentThread().getName() +  "  任務名為: "+task);
                }
            });
        }
    }
}

輸出:(為每個任務新建一條執行緒,共建立了3條執行緒)

執行緒名字: pool-1-thread-1 任務名為: 1
執行緒名字: pool-1-thread-2 任務名為: 2
執行緒名字: pool-1-thread-3 任務名為: 3

去掉第6行的註釋其輸出如下:(始終重複利用一條執行緒,因為newCachedThreadPool能重用可用執行緒)

執行緒名字: pool-1-thread-1 任務名為: 1
執行緒名字: pool-1-thread-1 任務名為: 2
執行緒名字: pool-1-thread-1 任務名為: 3

通過使用Executor可以很輕易的實現各種調優、管理、監視、記錄日誌和錯誤報告等待。

Executor的生命週期

ExecutorService提供了管理Eecutor生命週期的方法,ExecutorService的生命週期包括了:執行、關閉和終止三種狀態。

  • ExecutorService在初始化建立時處於執行狀態
  • shutdown方法等待提交的任務執行完成並不再接受新任務,在完成全部提交的任務後關閉
  • shutdownNow方法將強制終止所有執行中的任務並不再允許提交新任務

可以將一個Runnable(如例2)或Callable(如例3)提交給ExecutorService的submit方法執行,最終返回一上Future用來獲得任務的執行結果或取消任務
例3:(任務執行完成後並返回執行結果)

public class CallableAndFuture {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executor = Executors.newSingleThreadExecutor();
        Future<String> future = executor.submit(new Callable<String>() {   //接受一上callable例項
            public String call() throws Exception {
                return "MOBIN";
            }
        });
        System.out.println("任務的執行結果:"+future.get());
    }
}

輸出:

任務的執行結果:MOBIN

ExecutorCompletionService

實現了CompletionService,將執行完成的任務放到阻塞佇列中,通過take或poll方法來獲得執行結果
例4:(啟動10條執行緒,誰先執行完成就返回誰)

public class CompletionServiceTest {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ExecutorService executor = Executors.newFixedThreadPool(10);        //建立含10.條執行緒的執行緒池
        CompletionService completionService = new ExecutorCompletionService(executor);
        for (int i =1; i <=10; i ++) {
            final  int result = i;
            completionService.submit(new Callable() {
                public Object call() throws Exception {
                    Thread.sleep(new Random().nextInt(5000));   //讓當前執行緒隨機休眠一段時間
                    return result;
                }
            });
        }
        System.out.println(completionService.take().get());   //獲取執行結果
    }
}

輸出結果可能每次都不同(在1到10之間)

3

通過Executor來設計應用程式可以簡化開發過程,提高開發效率,並有助於實現併發,在開發中如果需要建立執行緒可優先考慮使用Executor。