1. 程式人生 > >執行緒池基本使用

執行緒池基本使用

jdk1.5引入Executor執行緒池框架,通過它把任務的提交和執行進行解耦,只需要定義好任務,然後提交給執行緒池,而不用關心該任務是如何執行、被哪個執行緒執行,以及什麼時候執行。

初始化執行緒池(4種)

簡介:

Java執行緒池的工廠類:Executors類,

初始化4種類型的執行緒池:

newCachedThreadPool()
說明:初始化一個可以快取執行緒的執行緒池,預設快取60s,執行緒池的執行緒數可達到Integer.MAX_VALUE,即2147483647,內部使用SynchronousQueue作為阻塞佇列;
特點:在沒有任務執行時,當執行緒的空閒時間超過keepAliveTime,會自動釋放執行緒資源;當提交新任務時,如果沒有空閒執行緒,則建立新執行緒執行任務,會導致一定的系統開銷;
因此,使用時要注意控制併發的任務數,防止因建立大量的執行緒導致而降低效能。

newFixedThreadPool()
說明:初始化一個指定執行緒數的執行緒池,其中corePoolSize == maxiPoolSize,使用LinkedBlockingQuene作為阻塞佇列
特點:即使當執行緒池沒有可執行任務時,也不會釋放執行緒。
newSingleThreadExecutor()
說明:初始化只有一個執行緒的執行緒池,內部使用LinkedBlockingQueue作為阻塞佇列。
特點:保證所有任務按照指定順序(FIFO, LIFO, 優先順序)執行。如果該執行緒異常結束,會重新建立一個新的執行緒繼續執行任務,唯一的執行緒可以保證所提交任務的順序執行
newScheduledThreadPool()


特定:初始化的執行緒池可以在指定的時間內週期性的執行所提交的任務,在實際的業務場景中可以使用該執行緒池定期的同步資料。

總結:除了newScheduledThreadPool的內部實現特殊一點之外,其它執行緒池內部都是基於ThreadPoolExecutor類(Executor的子類)實現的。

newCachedThreadPool和newSingleThreadExecutor使用例子結果對比:

public class MyRunnable implements Runnable {
    private Integer index;

    public MyRunnable(Integer index) {
        
this.index = index; } @Override public void run() { System.out.println(index); } public static void main(String[] args) { ExecutorService executorService = Executors.newCachedThreadPool(); for (int i = 0; i < 10; i++) { executorService.execute(new MyRunnable(i)); } executorService.shutdown(); } }

結果:

public class MyRunnable implements Runnable {
    private Integer index;

    public MyRunnable(Integer index) {
        this.index = index;
    }

    @Override
    public void run() {
        System.out.println(index);
    }

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        for (int i = 0; i < 10; i++) {
            executorService.execute(new MyRunnable(i));
        }
        executorService.shutdown();
    }
}

結果:

newScheduleThreadPool使用例子:

建立一個定長的執行緒池,而且支援定時的以及週期性的任務執行,支援定時及週期性任務執行。

延遲3秒執行,延遲執行示例程式碼如下:

public class MyThreadPool {
    public static void main(String[] args) {
        ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
        scheduledThreadPool.schedule(new Runnable() {
            @Override
            public void run() {
                System.out.println("delay 3 seconds");
            }
        }, 3, TimeUnit.SECONDS);
        scheduledThreadPool.shutdown();
    }
}

表示延遲1秒後每3秒執行一次,定期執行示例程式碼如下:

public class MyThreadPool {
    public static void main(String[] args) {
        ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
        scheduledThreadPool.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                System.out.println("delay 1 seconds, and excute every 3 seconds");
            }
        }, 1, 3, TimeUnit.SECONDS);
        try {
            Thread.sleep(10000);
            scheduledThreadPool.shutdown();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

呼叫shutdown就會結束執行緒池退休迴圈

 

ThreadPoolExecutor內部具體實現:

ThreadPoolExecutor類構造器語法形式:

ThreadPoolExecutor(corePoolSize,maxPoolSize,keepAliveTime,timeUnit,workQueue,threadFactory,handle);   

方法引數:
   corePoolSize:核心執行緒數
   maxPoolSize:最大執行緒數
     keepAliveTime:執行緒存活時間(在corePore<*<maxPoolSize情況下有用)
     timeUnit:存活時間的時間單位
     workQueue:阻塞佇列(用來儲存等待被執行的任務)

注:關於workQueue引數的取值,JDK提供了4種阻塞佇列型別供選擇:
        ArrayBlockingQueue:基於陣列結構的有界阻塞佇列,按FIFO排序任務;
        inkedBlockingQuene:基於連結串列結構的阻塞佇列,按FIFO排序任務,吞吐量通常要高於  

            SynchronousQuene:一個不儲存元素的阻塞佇列,每個插入操作必須等到另一個執行緒呼叫移除操作,否則插入操作一直處於阻塞狀態,吞吐量通常要高於ArrayBlockingQuene;

        PriorityBlockingQuene:具有優先順序的無界阻塞佇列

     threadFactory:執行緒工廠,主要用來建立執行緒;

     handler:表示當拒絕處理任務時的策略,有以下四種取值

 注: 當執行緒池的飽和策略,當阻塞佇列滿了,且沒有空閒的工作執行緒,如果繼續提交任務,必須採取一種策略處理該任務,執行緒池提供了4種策略:

    ThreadPoolExecutor.AbortPolicy:丟棄任務並丟擲RejectedExecutionException異常。

    ThreadPoolExecutor.DiscardPolicy:也是丟棄任務,但是不丟擲異常。

    ThreadPoolExecutor.DiscardOldestPolicy:丟棄佇列最前面的任務,然後重新嘗試執行任務(重複此過程)

    ThreadPoolExecutor.CallerRunsPolicy:由呼叫執行緒處理該任務

    當然也可以根據應用場景實現RejectedExecutionHandler介面,自定義飽和策略,如記錄日誌或持久化儲存不能處理的任務。

執行緒池的狀態(5種)

其中AtomicInteger變數ctl的功能非常強大:利用低29位表示執行緒池中執行緒數,通過高3位表示執行緒池的執行狀態:
1、RUNNING:-1 << COUNT_BITS,即高3位為111,該狀態的執行緒池會接收新任務,並處理阻塞佇列中的任務;
2、SHUTDOWN: 0 << COUNT_BITS,即高3位為000,該狀態的執行緒池不會接收新任務,但會處理阻塞佇列中的任務;
3、STOP : 1 << COUNT_BITS,即高3位為001,該狀態的執行緒不會接收新任務,也不會處理阻塞佇列中的任務,而且會中斷正在執行的任務;
4、TIDYING : 2 << COUNT_BITS,即高3位為010,該狀態表示執行緒池對執行緒進行整理優化;
5、TERMINATED: 3 << COUNT_BITS,即高3位為011,該狀態表示執行緒池停止工作;


 向執行緒池提交任務(2種)

有兩種方式:

      Executor.execute(Runnable command);

      ExecutorService.submit(Callable<T> task);

execute()內部實現

1.首次通過workCountof()獲知當前執行緒池中的執行緒數,

  如果小於corePoolSize, 就通過addWorker()建立執行緒並執行該任務;

 否則,將該任務放入阻塞佇列;

2. 如果能成功將任務放入阻塞佇列中,  

如果當前執行緒池是非RUNNING狀態,則將該任務從阻塞佇列中移除,然後執行reject()處理該任務;

如果當前執行緒池處於RUNNING狀態,則需要再次檢查執行緒池(因為可能在上次檢查後,有執行緒資源被釋放),是否有空閒的執行緒;如果有則執行該任務;

3、如果不能將任務放入阻塞佇列中,說明阻塞佇列已滿;那麼將通過addWoker()嘗試建立一個新的執行緒去執行這個任務;如果addWoker()執行失敗,說明執行緒池中執行緒數達到maxPoolSize,則執行reject()處理任務;

 sumbit()內部實現

會將提交的Callable任務會被封裝成了一個FutureTask物件

FutureTask類實現了Runnable介面,這樣就可以通過Executor.execute()提交FutureTask到執行緒池中等待被執行,最終執行的是FutureTask的run方法; 

比較:

 兩個方法都可以向執行緒池提交任務,execute()方法的返回型別是void,它定義在Executor介面中, 而submit()方法可以返回持有計算結果的Future物件,它定義在ExecutorService介面中,它擴充套件了Executor介面,其它執行緒池類像ThreadPoolExecutor和ScheduledThreadPoolExecutor都有這些方法。 


執行緒池的關閉(2種)

  ThreadPoolExecutor提供了兩個方法,用於執行緒池的關閉,分別是shutdown()和shutdownNow(),其中:

    shutdown():不會立即終止執行緒池,而是要等所有任務快取佇列中的任務都執行完後才終止,但再也不會接受新的任務

    shutdownNow():立即終止執行緒池,並嘗試打斷正在執行的任務,並且清空任務快取佇列,返回尚未執行的任務


 執行緒池容量的動態調整

  ThreadPoolExecutor提供了動態調整執行緒池容量大小的方法:setCorePoolSize()和setMaximumPoolSize(),