1. 程式人生 > >java執行緒池框架Executor

java執行緒池框架Executor

在面向物件程式設計中,建立和銷燬物件是很浪費時間的,因為建立一個物件要獲取記憶體資源或者更多資源。在java中更是如此,虛擬機器將試圖跟蹤每一個物件,以便能夠在物件銷燬後進行垃圾回收。

所以提高服務程式效率的一個手段就是儘可能減少建立和銷燬物件的次數,特別是一些很耗資源的物件建立和銷燬。如何利用已有物件來服務就是一個需要解決的關鍵問題,其實這就是一些池化資源技術產生的原因

執行緒池的優點:

1.重用執行緒池中的執行緒,減少因物件建立,銷燬所帶來的效能開銷

2.能有效控制執行緒的最大併發數,提高系統資源利用率,同時避免過多的資源競爭,避免堵塞

3.能夠對多執行緒進行簡單有效的管理,使執行緒的使用簡單、高效

若採用"為每個任務分配一個執行緒"的方式會存在一些缺陷,尤其是當需要建立大量執行緒時:

執行緒生命週期的開銷非常高、資源消耗、穩定性

任務是一組邏輯工作單元,執行緒則是使任務非同步執行的機制。當存在大量併發任務時,建立、銷燬執行緒需要很大的開銷,運用執行緒池可以大大減小開銷

執行緒池框架Executor

Java中的執行緒池是通過Executor框架實現的,Executor框架包括類:

Executor、Executors、ExecutorService、ThreadPoolExecutor、Callable、Future和FutureTask

關係如下:

Executor所有執行緒池的介面,只有一個方法,該介面定義執行Runnable任務的方式

Public interface Executor{

Void execute(Runnable command);

}

ExecutorService 增加Executor的行為,是Executor實現類的最直接的介面,該介面定義提供對Executor的服務

Executors 執行緒池工廠類,提供了一系列工廠方法用於建立執行緒池,返回的執行緒池都實現了ExecutorService介面

ScheduledExecutorService 定時排程介面。

AbstractExecutorService 執行框架抽象類。

ThreadPoolExecutor   JDK中執行緒池的具體實現,一般用的各種執行緒池都是基於這個類實現的

ThreadPoolExecutor 執行緒池類

執行緒池是一個複雜的任務排程工具,它涉及到任務、執行緒池等的生命週期問題。要配置一個執行緒池是比較複雜的,尤其是對於執行緒池的原理不是很清楚的情況下,很有可能配置的執行緒池不是較優的。

JDK中的執行緒池均由ThreadPoolExecutor類實現。其構造方法如下:

Public ThreadPoolExecutor(int  corePoolSIze,

Int  maximumPoolSize,

long  keepAliveTime,

TimeUnit  unit,

BlockingQueue<Runnable>workQueue){

This(corePoolSize,maxmumPoolSize,keepAliveTime,unit,workQueue,            Executors.defaultThreaFactory(),defaultHandler);

}

引數說明:

corePoolSize:核心執行緒數,執行緒池中執行的執行緒數永遠不會超過corePoolSize個,預設情況下一直存活,可以設定allowCoreThreadTimeOut為true,此時核心執行緒數就是0,此時keepAliveTime控制所有執行緒的超時時間

maximumPoolSize:執行緒池允許的最大大執行緒數

keepAliveTime:空閒執行緒結束的超時時間也叫執行緒存活時間,當執行緒數大於core數,那麼超過該時間的執行緒將會被終結

Unit:是一個列舉,表示keepAliveTime的單位

keepAliveTime的單位, java.util.concurrent.TimeUnit類存在靜態靜態屬性: NANOSECONDS、MICROSECONDS、MILLISECONDS、SECONDS

workQueue: 阻塞佇列是java.util.concurrent下的主要用來控制執行緒同步的工具,如果BlockingQueue是空的,從BlockingQueue取東西的操作將會被阻斷進入等待狀態,直到BlockingQueue進了東西才會被喚醒。同樣,如果BlockingQueue是滿的,任何試圖往裡面存東西的操作也會被阻斷進入等待狀態,直到BlockingQueue裡有空間才會被喚醒繼續操作。

阻塞佇列常用於生產者和消費者的場景,生產者是往佇列裡新增元素的執行緒,消費者是從佇列裡拿元素的執行緒,阻塞佇列就是生產者存放元素的容器,而消費者也只從容器裡拿元素。具體實現類有:LinkedBlickingQueue,ArrayBlockingQueued等。一般其內部的都是通過Lock和Condition(顯示鎖(Lock)及Condition的學習與使用)來實現阻塞和喚醒

ThreadPoolExecutor的使用需要注意以下概念:

若執行緒池中的執行緒數量小於corePoolSize,即使執行緒池中的執行緒都處於空閒狀態,也要建立新的執行緒來處理被新增的任務。

若執行緒池中的執行緒數量等於corePoolSize且緩衝佇列workQueue未滿,則任務被放入緩衝佇列。

若執行緒池中執行緒的數量大於corePoolSize且緩衝佇列workQueue滿,且執行緒池中的數量小於maximumPoolSize,則建新的執行緒來處理被新增的任務。

若執行緒池中執行緒的數量大於corePoolSize且緩衝佇列workQueue滿,且執行緒池中的數量等於maximumPoolSize,那麼通過 handler所指定的策略來處理此任務。

當執行緒池中的執行緒數量大於corePoolSize時,如果某執行緒空閒時間超過keepAliveTime,執行緒將被終止

執行緒池的工作過程:

1.執行緒池剛建立時,裡面沒有一個執行緒。任務佇列是作為引數傳進來的。不過,就算佇列裡面有任務,執行緒池也不會馬上執行他們

2.當呼叫executor()方法新增一個任務時,執行緒池會做如下判斷:

 A.如果正在執行的執行緒數量小於corePoolSize,那麼馬上建立執行緒執行這個任務

 B.如果正在執行的執行緒數量大於或小於或等於corePoolSize,那麼將這個任務放入佇列

 C.如果這時候佇列滿了,而且正在執行的數量小於maximumPoolSize,那麼還是要建立非核心線

 程立刻執行這個任務

 D.如果佇列滿了,而且正在執行的執行緒數量大於或等於maximumPoolSize,那麼執行緒池會丟擲異

 常RejectExecutionException

 E.當一個執行緒完成任務時,他會從佇列中取下一個任務來執行

 F.當一個執行緒無事可做,超過一定的時間(keepAliveTime)時,執行緒會判斷,如果當前執行的執行緒數

 大於corePoolSize,那麼這個執行緒就被停掉,所以執行緒池的所有任務完成後,它最終會收縮到

 corePoolSize大小

Executors 工廠方法

JDK內部提供了五種最常見的執行緒池。由Executors類的五個靜態工廠方法建立。

newFixedThreadPool(...)

newSingleThreadExecutor(...)

newCachedThreadPool(...)

newScheduledThreadPool(...)

newSingleThreadScheduledExecutor()

單執行緒的執行緒池newSingleThreadExecutor:單個後臺執行緒(其緩衝佇列是無界的)

這個執行緒池只有一個執行緒在工作,也就是相當於單執行緒序列執行所有任務

Public static ExecutorService newSingleThreadExecutor(){

Return new FinalizableDelegatedExecutorService(

new ThreadPoolExecutor(1,1,0L,TimeOut.MILLISECONDS,

new LinkedBlockingQueue<Runnable>()));

}

返回單執行緒的Executor,將多個任務交給此Exector時,這個執行緒處理完一個任務後接著處理下一個任務,若該執行緒出現異常,將會有一個新的執行緒來替代。此執行緒池保證所有任務的執行順序按照任務的提交順序執行。

建立固定大小的執行緒池newFixedThreadPool:只有核心執行緒的執行緒池,大小固定(其緩衝佇列是無界的)

Public static ExecutorService newFixedThreadPool(){

Return newThreadPoolExecutor(nThreads,nThreads,0L,TimeUnit.MILLISECONDS,

newLinkedBlockingQueue<>(Runnable));

}

每次提交一個任務就建立一個執行緒,直到執行緒達到執行緒池的最大大小。執行緒池的大小一旦達到最大值就會保持不變,如果某個執行緒因為執行異常而結束,那麼執行緒池會補充一個新執行緒。

返回一個包含指定數目執行緒的執行緒池,如果任務數量多於執行緒數目,那麼沒有執行的任務必須等待,直到有任務完成為止。

可快取的執行緒池newCachedThreadPool:無界執行緒池,可以進行自動執行緒回收

Public static ExecutorService new CachedThreadPool(){

Return newThreadPoolExecutor(0,Integer.MAX_VALUE,60L,TimeUnit.SECONDS,

newSyschronousQueue<Runnable>() );

}

如果執行緒池的大小超過了處理任務所需要的執行緒,那麼就會回收部分空閒(60秒不執行任務)的執行緒,當任務數增加時,此執行緒池又可以智慧的新增新執行緒來處理任務。此執行緒池不會對執行緒池大小做限制,執行緒池大小完全依賴於作業系統(或者說JVM)能夠建立的最大執行緒大小。

SynchronousQueue是一個緩衝去為1的阻塞佇列。

newCachedThreadPool方法建立的執行緒池可以自動的擴充套件執行緒池的容量。核心執行緒數量為0。

SynchronousQueue是個特殊的佇列。SynchronousQueue佇列的容量為0。當試圖為SynchronousQueue新增Runnable,則執行會失敗。只有當一邊從SynchronousQueue取資料,一邊向SynchronousQueue新增資料才可以成功。SynchronousQueue僅僅起到資料交換的作用,並不儲存執行緒。但newCachedThreadPool()方法沒有執行緒上限。Runable新增到SynchronousQueue會被立刻取出。

根據使用者的任務數建立相應的執行緒來處理,該執行緒池不會對執行緒數目加以限制,完全依賴於JVM能建立執行緒的數量,可能引起記憶體不足。

定時任務排程的執行緒池newScheduledThreadPool:

建立一個大小無限的執行緒池。此執行緒池支援定時以及週期性執行任務的需求。

例:

public class ScheduledThreadPoolTest {

    public static voidmain(String[] args) {

       ScheduledExecutorService ses = Executors.newScheduledThreadPool(10);

       ses.scheduleWithFixedDelay(new Runnable() {

            @Override

            publicvoid run() {

                try {

                   Thread.sleep(1000);

                }catch (InterruptedException e) {

                    e.printStackTrace();

                }

               System.out.println(new Date());

            }

        }, 1000, 2000,TimeUnit.MILLISECONDS);

    }

}

單執行緒的定時任務排程執行緒池newSingleThreadScheduledExecutor

此執行緒池支援定時以及週期性執行任務的需求

執行緒池最常用的提交任務的方法有兩種:

executor:

ExecutorService.executor(Runnable runnable);

submit:

FutureTask task=ExecutorService.submit(Runnable runnable);

FutureTask<T>task=ExecutorService.submit(Runnablerunnable);

FuturnTask<T>task=ExecutorService.submit(Callablecallable);

Submit實現:

public<T>Future<T>submit(Callable<T>task){

If(task==null)thow newNullPointerException(){

FutureTask<T>ftask=newTaskFor(task);

executor(ftask);

Return ftask;

}

}

可以看出submit開啟的是有返回結果的任務,會返回一個FutureTask物件,這樣就能通過get()方法得到結果。Submit最終呼叫的也是executor(Runnablerunnable),submit只是將callable物件或Runnable封裝成一個FutureTask物件,因為FutureTask是個Runnable,所以可以在executor中執行。關於Callable物件和Runnable怎麼封裝成FuturnTask物件,見Callable和Future、FutureTask的使用。

Executor介面

Executor是一個執行緒執行介面。任務執行的主要抽象不是Thead,而是Executor。

public interface Executor{

    voidexecutor(Runnable command);

}

Executor將任務的提交過程與執行過程解耦,並用Runnable來表示任務。執行的任務放入run方法中即可,將Runnable介面的實現類交給執行緒池的execute方法,作為它的一個引數。如果需要給任務傳遞引數,可以通過建立一個Runnable介面的實現類來完成。

Executor可以支援多種不同型別的任務執行策略。

Executor基於生產者消費者模式,提交任務的操作相當於生產者,執行任務的執行緒則相當於消費者

ExecutorService介面

執行緒池介面。ExecutorService在Executor的基礎上增加了一些方法,其中有兩個核心的方法:

Future<?> submit(Runnable task)

<T> Future<T> submit(Callable<T> task)

這兩個方法都是向執行緒池中提交任務,它們的區別在於Runnable在執行完畢後沒有結果,Callable執行完畢後有一個結果。這在多個執行緒中傳遞狀態和結果是非常有用的。另外他們的相同點在於都返回一個Future物件。Future物件可以阻塞執行緒直到執行完畢(獲取結果,如果有的話),也可以取消任務執行,當然也能夠檢測任務是否被取消或者是否執行完畢。

在沒有Future之前我們檢測一個執行緒是否執行完畢通常使用Thread.join()或者用一個死迴圈加狀態位來描述執行緒執行完畢。現在有了更好的方法能夠阻塞執行緒,檢測任務執行完畢甚至取消執行中或者未開始執行的任務。

ScheduledExecutorService介面

ScheduledExecutorService描述的功能和Timer/TimerTask類似,解決那些需要任務重複執行的問題。這包括延遲時間一次性執行、延遲時間週期性執行以及固定延遲時間週期性執行等。當然了繼承ExecutorService的ScheduledExecutorService擁有ExecutorService的全部特性

執行緒池生命週期

執行緒是有多種執行狀態的,同樣管理執行緒的執行緒池也有多種狀態。JVM會在所有執行緒(非後臺daemon執行緒)全部終止後才退出,為了節省資源和有效釋放資源關閉一個執行緒池就顯得很重要。有時候無法正確的關閉執行緒池,將會阻止JVM的結束。

執行緒池Executor是非同步的執行任務,因此任何時刻不能夠直接獲取提交的任務的狀態。這些任務有可能已經完成,也有可能正在執行或者還在排隊等待執行。因此關閉執行緒池可能出現一下幾種情況:

平緩關閉:已經啟動的任務全部執行完畢,同時不再接受新的任務。

立即關閉:取消所有正在執行和未執行的任務。

另外關閉執行緒池後對於任務的狀態應該有相應的反饋資訊

啟動執行緒池

執行緒池在構造前(new操作)是初始狀態,一旦構造完成執行緒池就進入了執行狀態RUNNING。嚴格意義上講執行緒池構造完成後並沒有執行緒被立即啟動,只有進行"預啟動"或者接收到任務的時候才會啟動執行緒。

執行緒池是處於執行狀態,隨時準備接受任務來執行。

關閉執行緒池

執行緒池執行中可以通過shutdown()和shutdownNow()來改變執行狀態。

shutdown():平緩的關閉執行緒池。執行緒池停止接受新的任務,同時等待已經提交的任務執行完畢,包括那些進入佇列還沒有開始的任務。shutdown()方法執行過程中,執行緒池處於SHUTDOWN狀態。

shutdownNow():立即關閉執行緒池。執行緒池停止接受新的任務,同時執行緒池取消所有執行的任務和已經進入佇列但是還沒有執行的任務。

shutdownNow()方法執行過程中,執行緒池處於STOP狀態

例:使用固定大小的執行緒池。並將任務新增到執行緒池。

import java.util.concurrent.Executors;

import java.util.concurrent.ExecutorService;

public class JavaThreadPool {

    public static voidmain(String[] args) {

        // 建立一個可重用固定執行緒數的執行緒池

       ExecutorService pool = Executors.newFixedThreadPool(2);

        // 建立實現了Runnable介面物件,Thread物件當然也實現了Runnable介面

        Thread t1 =new MyThread();

        Thread t2 =new MyThread();

        Thread t3 =new MyThread();

        Thread t4 =new MyThread();

        Thread t5 = newMyThread();

        // 將執行緒放入池中進行執行

       pool.execute(t1);

       pool.execute(t2);

       pool.execute(t3);

       pool.execute(t4);

       pool.execute(t5);

        // 關閉執行緒池

       pool.shutdown();

    }

}

class MyThread extends Thread {

    @Override

   public void run() {

       System.out.println(Thread.currentThread().getName() + "正在執行。。。");

    }

}

執行緒池實現原理:

執行緒池的實現過程中沒有用到Synchronized關鍵字,用的都是volatile,Lock和同步(阻塞)佇列,Atomic相關類,FutureTask等等,因為後者的效能更優。理解的過程可以很好的學習原始碼中併發控制的思想。

執行緒池的三大優點:

1.執行緒複用

2.控制最大併發數

3.管理執行緒

                            1.執行緒複用過程

線上程宣告週期中,它要經過新建(New)、就緒(Runnable)、執行(Running)、阻塞(Blocked)和死亡(Dead) 5中狀態

Thread通過new新建一個執行緒,這個過程是初始化一些執行緒資訊,如:執行緒名,id,執行緒所屬group等,可以認為只是個普通物件。呼叫Thread的start()後java虛擬機器會為其建立方法呼叫棧和程式計數器,同時將hasBeenStarted為true,之後呼叫start方法就會有異常

處於這個狀態中的執行緒並沒有開始執行,只是表示該執行緒可以運行了。至於該執行緒何時開始執行,取決於JVM裡執行緒排程器的排程。當執行緒獲取cpu後,run()方法會被呼叫。不要自己去呼叫Thread的run()方法。之後根據cpu的排程再就緒--執行--阻塞間切換,直到run()方法結束或其他方式停止執行緒,進入Dead狀態

所以實現執行緒複用的原理應該就是要保持執行緒處於存活狀態(就緒,執行或阻塞)。