1. 程式人生 > >2,Executor線程池

2,Executor線程池

submit order span 分享圖片 one scheduled cond tar target

一,Executor框架簡介

在Java 5之後,並發編程引入了一堆新的啟動、調度和管理線程的API。Executor框架便是Java 5中引入的,其內部使用了線程池機制,它在java.util.cocurrent 包下,通過該框架來控制線程的啟動、執行和關閉,可以簡化並發編程的操作。

技術分享圖片

Executor框架包括:線程池,Executor,Executors,ExecutorService,CompletionService(異步任務),Future,Callable等。

二,Executor接口

Executor接口是Executor框架中最基礎的部分,定義了一個用於執行Runnable的execute方法,它沒有實現類只有另一個重要的子接口ExecutorService。

public interface Executor {
    void execute(Runnable command);
}

三,ExecutorService接口

ExecutorService接口繼承自Executor接口,定義了終止、提交,執行任務、跟蹤任務返回結果等方法。

public interface ExecutorService extends Executor {
    //關閉線程池,當此方法被調用時,ExecutorService停止接收新的任務並且等待已經提交的任務(包含提交正在執行和提交未執行)執行完成。
    //當所有提交任務執行完畢,線程池即被關閉。
void shutdown(); //立即停止,將暫停所有等待處理的任務並返回這些任務的列表 List<Runnable> shutdownNow(); //判斷執行器是否已經關閉 boolean isShutdown(); //判斷關閉後所有任務是否都已完成 boolean isTerminated(); //接收timeout和TimeUnit(日期工具類)兩個參數,用於設定超時時間及單位。 //當等待超過設定時間時,會監測ExecutorService是否已經關閉,若關閉則返回true,否則返回false。
//一般情況下會和shutdown方法組合使用。 boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException; //提交一個Callable任務 <T> Future<T> submit(Callable<T> task); //提交一個Runable任務,result是要返回的結果 <T> Future<T> submit(Runnable task, T result); //提交一個任務 Future<?> submit(Runnable task); //執行所有給定的任務,當所有任務完成,返回保持任務狀態和結果的Future列表 <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException; //執行給定的任務,當所有任務完成或超時期滿時(無論哪個首先發生),返回保持任務狀態和結果的 Future 列表。 <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException; //執行給定的任務,如果某個任務已成功完成(也就是未拋出異常),則返回其結果。 <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException; //執行給定的任務,如果在給定的超時期滿前某個任務已成功完成(也就是未拋出異常),則返回其結果。 <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }

四,Executors

可以Executors類來創建的線程池的類型,可以創建5種類型的線程池。

NewFixedThreadPool(numberOfThreads:int)(固定線程池)

ExecutorService 創建一個固定線程數量的線程池,並行執行的線程數量不變,線程當前任務完成後,可以被重用執行另一個任務。

NewCachedThreadPool():(可緩存線程池)

ExecutorService 創建一個線程池,按需創建新線程,就是有任務時才創建,空閑線程保存60s,當前面創建的線程可用時,則重用它們。

newSingleThreadExecutor();(單線程執行器)

線程池中只有一個線程,依次執行任務。這個線程池可以在線程死後(或發生異常時)重新啟動一個線程來替代原來的線程繼續執行下去。

newScheduledThreadPool():

線程池按時間計劃來執行任務,允許用戶設定執行任務的時間來執行。

newSingleThreadExecutor();

線程池中只有一個線程,它按規定順序(FIFO, LIFO, 優先級)來執行任務。

4.1,newFixedThreadPool類型

創建一個定長線程池,可控制線程最大並發數,超出的線程會在隊列中等待。

public static void main(String[] args) {
    ExecutorService threadPool = Executors.newFixedThreadPool(3);
    for (int i = 0; i < 12; i++) {
        int index = i;
        threadPool.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    System.out.println(index);
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
    }
    //關閉線程池
    threadPool.shutdown();
}

因為線程池大小為3,每個任務輸出後間隔2秒,所以每兩秒打印3個數字。

技術分享圖片

4.2,newCachedThreadPool類型

創建一個可緩存線程池,如果線程池長度超過處理需要,可靈活回收空閑線程,若無可回收,則新建線程。

public static void main(String[] args) {
    ExecutorService threadPool = Executors.newCachedThreadPool();
    for (int i = 0; i < 4; i++) {
        int index = i;
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
        threadPool.execute(new Runnable() {
            @Override
            public void run() {
                System.out.println("threadName="+ Thread.currentThread().getName() + " index=" + index);
            }
        });
    }
    //關閉線程池
    threadPool.shutdown();
}

線程池為無限大,當執行第二個任務時第一個任務已經完成,會復用執行第一個任務的線程,而不用每次新建線程。

技術分享圖片

4.3,newSingleThreadExecutor類型

創建一個線程池(這個線程池只有一個線程),這個線程池可以在線程死後(或發生異常時)重新啟動一個線程來替代原來的線程繼續執行下去。

public class Thread4 extends Thread{
    @Override
    public void run() {
        int temp = 0;  
        int i = 0;  
        Random random =new Random();  
        while(true){  
            int j =random.nextInt(100);
            System.err.println("threadName="+ Thread.currentThread().getName() + "  temp="+ temp +"  i="+ (i++) + "  j=" + j);  
            try{  
                if(temp == 0 && j > 60 ) {
                    temp = 7/0;
                }  
                Thread.sleep(100);
            }catch(Exception e){  
                e.printStackTrace();  
                temp = 1;  
            }  
        }
    }
    
    public static void main(String[] args) {
        ExecutorService threadPool = Executors.newSingleThreadExecutor();
        Thread4 thread4 = new Thread4();
        threadPool.execute(thread4);
    }
}

技術分享圖片

4.4,newScheduledThreadPool類型

創建一個定長線程池,支持定時及周期性任務執行。

延遲3秒執行:

public static void main(String[] args) {
    ScheduledExecutorService threadPool = Executors.newScheduledThreadPool(5);
    threadPool.schedule(new Runnable() {
        @Override
        public void run() {
            System.out.println("delay 3 seconds");
        }
    }, 3, TimeUnit.SECONDS);
    //關閉線程池
    threadPool.shutdown();
}

技術分享圖片

延遲1秒後每3秒執行一次:

public static void main(String[] args) {
    ScheduledExecutorService threadPool = Executors.newScheduledThreadPool(5);
    threadPool.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            System.out.println("delay 3 seconds");
        }
    },1 ,3 , TimeUnit.SECONDS);
}

技術分享圖片

4.5,newSingleThreadExecutor類型

public static void main(String[] args) {
    ExecutorService threadPool = Executors.newSingleThreadExecutor();
    for (int i = 0; i < 10; i++) {
        final int index = i;
        threadPool.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    System.out.println(index);
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
    }
}

結果依次輸出,相當於順序執行各個任務。

技術分享圖片

五,Callable接口和 Future接口

5.1,Callable接口

與Runnable接口的區別在於它接收泛型,同時它執行任務後帶有返回內容

//Callable同樣是任務,與Runnable接口的區別在於它接收泛型,同時它執行任務後帶有返回內容  
public interface Callable<V> {  
     //call方法相當於Runnable的run方法,多了一個返回值而已
     V call() throws Exception;  
} 

Runnable對象可以轉換成Callable對象,需要用到Executors類,具體方法如下。

把一個Runnable對象轉換成Callable對象:

public static Callable<Object> callable(Runnbale task); 

把一個Runnable和一個待返回的結果包裝成一個Callable:

public static<T> Callable<T> callable(Runnbale task,T result);

5.2,Future接口

public interface Future<V> {
    
    //嘗試取消一個任務,如果取消任務成功則返回true,如果取消任務失敗則返回false。
    //參數mayInterruptIfRunning表示是否允許取消正在執行卻沒有執行完畢的任務,如果設置true,則表示可以取消正在執行過程中的任務。
    boolean cancel(boolean mayInterruptIfRunning);
    
    //檢測任務是否被取消成功,如果在任務正常完成前被取消成功,則返回 true。
    boolean isCancelled();
    
    //檢測任務是否被取消成功,如果在任務正常完成前被取消成功,則返回 true。
    boolean isDone();
    
    //獲取異步任務的執行結果(如果任務沒執行完將等待) 
    V get() throws InterruptedException, ExecutionException;
    
    //獲取異步任務的執行結果(有最常等待時間的限制)如果在指定時間內,還沒獲取到結果,就直接返回null。
    //timeout表示等待的時間,unit是它時間單位 
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

Future就是對於具體的Runnable或者Callable任務的執行結果進行取消、查詢是否完成、獲取結果。必要時可以通過get方法獲取執行結果,該方法會阻塞直到任務返回結果。

5.3,如何使用

1. 創建 Callable 接口的實現類,並實現 call() 方法,該 call() 方法將作為線程執行體,並且有返回值。

2. 創建 Callable 實現類的實例,使用 FutureTask 類來包裝 Callable 對象,該 FutureTask 對象封裝了該 Callable 對象的 call() 方法的返回值。

3. 使用 FutureTask 對象作為 Thread 對象的 target 創建並啟動新線程。

4. 調用 FutureTask 對象的 get() 方法來獲得子線程執行結束後的返回值。

/**
 * 
 * @類名稱:Thread3
 * @類描述:通過 Callable 和 Future 創建線程.
 * @創建人:Zender
 */
public class Thread3 implements Callable<Integer>{
    //該方法將作為線程執行體,並且有返回值。
    @Override
    public Integer call() throws Exception {
        int i = 100;
        System.err.println("通過 Callable 和 Future 創建線程.i=" + i);
        return i;
    }
}

TestMain.java

/**
 * 
 * @類名稱:TestMain
 * @類描述:測試
 * @創建人:Zender
 */
public class TestMain {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        Thread3 thread3 = new Thread3();
        FutureTask<Integer> ft = new FutureTask<Integer>(thread3);
        new Thread(ft).start();
        System.out.println("線程的返回值:" + ft.get());  
    }
}

技術分享圖片

2,Executor線程池