1. 程式人生 > >Java執行緒池技術之二 Java自帶執行緒池實現

Java執行緒池技術之二 Java自帶執行緒池實現

一,介紹

  類檢視如下:

 

  自Java 1.5後,Java對執行緒相關的庫做了很大的拓展,執行緒池就是其中之一。Java執行緒的新特性多數在java.util.concurrent,其包含眾多的介面和類。其中java.util.concurrent.Executor是這些類的最頂級介面。其是執行執行緒的一個工具。ExecutorService在Executor的基礎上增加了一些方法,以向執行緒池提交任務。包括實現Runnable介面的任務和實現Callable介面的任務。兩個方法為:

  Future<?> submit(Runnable task)

  Future<T> submit(Callable<T> task)

  實現Runnable介面的任務,其run方法的返回值為void,因此submit的返回值為Future<?>型別,?表示匹配任意一種型別。實現Callable<T>介面的任務,其call方法的返回值型別為T,對應submit的返回值型別則是Future<T>。具有返回值的Callable的任務對於多個執行緒中傳遞狀態和結果是非常有用的。另外使用這兩個方法返回的Future物件可以阻塞當前執行緒直到提交的任務執行完畢以獲取結果,也可以用以取消任務的執行,或者檢測任務是否被取消或者是否執行完畢。如果不使用Future,我們檢測一個執行緒是否執行完畢通常使用Thread.join()或者對狀態標識進行輪詢。

  阻塞等待結果,或者在某個時候獲取結果的方法是,呼叫Futrue的get()方法。這樣呼叫執行緒便會等待線上程執行完成並將結果反饋到Futrue物件。之後呼叫執行緒可以對處理結果進行判別處理。isDone(),isCancelled()可以用以判斷當前的任務執行狀態(注意被打斷或取消也是Done的一種)。cancel(Boolean)方法則可以用以取消執行緒執行。如果引數是true表示即使任務已經執行也將試圖取消它,如果是false則表示如果任務沒有被執行則取消,執行了則不取消執行。

  可以使用Executor的靜態方法建立實現ExecutorService的執行緒池類。對應方法建立的執行緒池描述如下:

  1,newSingleThreadExecutor,單執行緒執行緒池。單個執行緒執行器,內有一個執行緒執行Runnable的任務。如果該執行緒發生異常終了,則建立新的進行補充。可以保證任務順 序的執行。

  2,newFixedThreadPool,建立固定執行緒數目(引數指定)的執行緒池。每加入一個任務,建立一個執行緒,直到達到固定數目後,將會有固定大小的執行緒執行Runnable的任務。如果有執行緒異常終了,則建立新的執行緒來進行補充。

  3,newCachedThreadPool, 緩衝執行緒池,產生一個大小可變的執行緒池。當執行緒池的執行緒多於執行任務所需要的執行緒的時候,對空閒執行緒(即60s沒有任務執行)進行回收;當執行任務的執行緒數不足的時候,自動拓展執行緒數量。因此執行緒數量是JVM可建立執行緒的最大數目。

  4,newSingleThreadScheduledExecutor,單個執行緒排程執行器,產生單個執行緒執行任務,採用schedule方法可以延期或者定期的執行任務

  5,newScheduledThreadPool,排程執行緒執行器,當有任務的時候,建立執行緒,直到預設執行緒數量(引數指定),當執行任務所需的執行緒多於該數目的時候,自動拓展執行緒數量,沒有上限;如果執行緒空閒則被回收,直到預設執行緒數量。

  6,另外可以自定義執行緒池,請參見使用程式碼例子。

  另外注意CompletionService介面,其能夠根據任務的執行先後順序得到執行結果。

二,使用例子

//兩種任務

package poolmanager;
public class RunnableTask implements Runnable {

    int i = 0;

    public RunnableTask(int i) {

        this.i = i;

    }

    public void run() {

        System.out.println("******************");
        System.out.println(Thread.currentThread().getName());

        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("Task is :" + i);

    }

}

package poolmanager;
import java.util.concurrent.Callable;

public  class CallableTask implements Callable<String> {

    public String ok = "OK!";
    public String ng = "Error Happened in Running!";

    int i = 0;

    public CallableTask(int i){
        this.i = i;
    }

    public String call(){

        System.out.println("******************");
        System.out.println(Thread.currentThread().getName());

        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("Task is :" + i);

        return ok;

    }

}

//執行緒管理類

package poolmanager;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class JavaThreadPoolManager {

    int ThreadSize = 2;
    int ThreadMaxSize = 2*ThreadSize;

    ExecutorService executorSingleThreadExecutor = null;
    ExecutorService executorFixedThreadPool = null;
    ExecutorService executorCachedThreadPool = null;
    ScheduledExecutorService executorSingleThreadScheduledExcutor = null;
    ScheduledExecutorService executorScheduledThreadPool = null;

    //自定義執行緒池
    BlockingQueue<Runnable> bqueue = null;
    ThreadPoolExecutor pool = null;

    public void startJavaThreadPoolManager(){


        //單個執行緒執行器,內有一個執行緒執行Runnable的任務。
        //如果該執行緒發生異常終了,則建立新的進行補充。
        //可以保證任務順序的執行。
        executorSingleThreadExecutor = Executors.newSingleThreadExecutor();

        //固定數目的執行緒執行器。
        //每加入一個任務,建立一個執行緒,直到達到固定數目後,
        //將會有固定大小的執行緒執行Runnable的任務。
        //如果有執行緒異常終了,則建立新的執行緒來進行補充。
        executorFixedThreadPool = Executors.newFixedThreadPool(ThreadSize);

        //緩衝執行緒執行器,產生一個大小可變的執行緒池。
        //當執行緒池的執行緒多於執行任務所需要的執行緒的時候,
        //對空閒執行緒(即60s沒有任務執行)進行回收;
        //當執行任務的執行緒數不足的時候,自動拓展執行緒數量。因此執行緒數量是JVM
        //可建立執行緒的最大數目。
        executorCachedThreadPool = Executors.newCachedThreadPool();

        //單個執行緒排程執行器,產生單個執行緒執行任務,採用schedule方法可以延期或者定期的執行任務
        executorSingleThreadScheduledExcutor = Executors
            .newSingleThreadScheduledExecutor();

        //排程執行緒執行器,當有任務的時候,建立執行緒,直到預設執行緒數量(引數指定),
        //當執行任務所需的執行緒多於該數目的時候,
        //自動拓展執行緒數量,沒有上限;
        //如果執行緒空閒則被回收,直到預設執行緒數量。
        executorScheduledThreadPool = Executors.newScheduledThreadPool(ThreadSize);

        //自定義執行緒池
        bqueue = new ArrayBlockingQueue<Runnable>(20);
        //第三個引數是執行緒池執行緒所允許的空閒時間
        //第四個引數是執行緒池執行緒空閒時間的單位
        //第五個引數是緩衝任務佇列
        //第六個引數是緩衝區滿的時候,對任務的處理策略
        //第六個引數有如下幾種選擇:
        //ThreadPoolExecutor.AbortPolicy()
        //丟擲java.util.concurrent.RejectedExecutionException異常
        //ThreadPoolExecutor.CallerRunsPolicy()
        //重試添加當前的任務,即自動以任務為引數,再次呼叫execute()方法
        //ThreadPoolExecutor.DiscardOldestPolicy()
        //拋棄一個已有的任務(拋棄當前任務佇列的頭部任務,即最開始加入的任務)
        //ThreadPoolExecutor.DiscardPolicy()
        //拋棄當前這個任務

        pool = new ThreadPoolExecutor(
                ThreadSize,ThreadMaxSize,2,TimeUnit.MILLISECONDS,bqueue
                ,new ThreadPoolExecutor.DiscardOldestPolicy());

    }

    public void endJavaThreadPoolManager(){

        executorSingleThreadExecutor.shutdown();
        executorFixedThreadPool.shutdown();
        executorCachedThreadPool.shutdown();
        executorSingleThreadScheduledExcutor.shutdown();
        executorScheduledThreadPool.shutdown();
        pool.shutdown();

    }

}

//使用

package poolmanager;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

public class MainThread {

    public static void main(String[] args) throws InterruptedException {

        JavaThreadPoolManager jThreadPoolManager = new JavaThreadPoolManager();

        jThreadPoolManager.startJavaThreadPoolManager();

        RunnableTask command1 = new RunnableTask(1);
        CallableTask command2 = new CallableTask(2);

        jThreadPoolManager.executorSingleThreadExecutor.execute(command1);
        jThreadPoolManager.executorSingleThreadExecutor.submit(command2);

        jThreadPoolManager.executorFixedThreadPool.execute(command1);
        jThreadPoolManager.executorFixedThreadPool.submit(command2);

        jThreadPoolManager.executorCachedThreadPool.execute(command1);
        jThreadPoolManager.executorCachedThreadPool.submit(command2);

        jThreadPoolManager.executorScheduledThreadPool.execute(command1);
        jThreadPoolManager.executorScheduledThreadPool.submit(command2);

        jThreadPoolManager.executorSingleThreadScheduledExcutor.execute(command1);
        jThreadPoolManager.executorSingleThreadScheduledExcutor.submit(command2);

        jThreadPoolManager.executorScheduledThreadPool.execute(command1);
        jThreadPoolManager.executorScheduledThreadPool.schedule(command1,60, TimeUnit.SECONDS);
        jThreadPoolManager.executorScheduledThreadPool.schedule(command2,60, TimeUnit.SECONDS);

        jThreadPoolManager.executorSingleThreadScheduledExcutor.scheduleWithFixedDelay(command1, 0, 20, TimeUnit.SECONDS);

        Future<String> returnFuture =
            jThreadPoolManager.executorSingleThreadScheduledExcutor.submit(command2);

        System.out.println(returnFuture.isCancelled());
        System.out.println(returnFuture.isDone());

        returnFuture.cancel(true);

        System.out.println(returnFuture.isCancelled());
        System.out.println(returnFuture.isDone());

        String resultString =null;

        if(!returnFuture.isCancelled()){

            try {
                resultString = returnFuture.get();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }

            System.out.println(resultString);

        }

        jThreadPoolManager.pool.execute(command1);

    }

}

三,再討論

  我們看到自定義執行緒池的時候,可以自定義任務佇列,其中有如下幾種任務佇列的自定義方法:

  1,直接提交  選擇任務佇列為 SynchronousQueue的,則預設的佇列大小為1,在這種情況下,如果沒有空閒執行緒,則往往直接建立新的執行緒執行該任務,因此往往需要不設立執行緒的數目上限。該策略可以避免在處理可能具有內部依賴的請求集時出現鎖,因為其任務執行順序必然是先加入的先被執行。

  2,無界佇列 選擇任務佇列為LinkedBlockingQueue 其意義即任務佇列大小無限制。這樣建立的執行緒往往不超過corePoolSize。因此maximumPoolSize的值就沒有什麼意義。適合於任務相互獨立。

  3,有界佇列 使用具有最大執行緒數目限制maximumPoolSize的時候,有界佇列例如ArrayBlockingQueue有助於防止資源耗盡,但可能較難控制。

  使用大佇列小池子有利於降低CPU使用,上下文切換,但是吞吐量不高;如果小佇列大池子,則CPU使用率高,但是可能排程開銷大,也會降低吞吐量。

   另外,選擇上述任務佇列的方案,需要與corePoolSize和maximumPoolSizes這對引數配合。

  因此,執行緒池的選擇要根據任務的型別,比如任務是數量大、單個執行時間短,還是任務數量小、單個執行長,還是任務數量又多,執行時間又長,來進行不同的選擇。而自定義執行緒池也是需要根據任務的型別選擇任務佇列、corePoolSize和maximumPoolSizes。

  此外,keepAliveTime和maximumPoolSize及BlockingQueue的型別均有關係。如果BlockingQueue是無界的,那麼永遠不會觸發maximumPoolSize,自然keepAliveTime也就沒有了意義。如果很容易就產生執行緒的回收,也會導致效能下降。


參考:

  1,JDK Document

  2, http://www.cnblogs.com/jersey/archive/2011/03/30/2000231.html