1. 程式人生 > >多執行緒執行框架Executor詳解

多執行緒執行框架Executor詳解

為了能夠更好的進行多執行緒程式設計,JDK提供了一套Executor執行框架,簡化開發人員的對多執行緒的程式設計工作。

其框架結構圖如下:


框架圖比較龐大,但我們只需要關注如下幾個實現:

  • Executor介面:只有一個 execute 方法,用來接收一個可執行物件。
  • ExecutorService介面:表示具有接收任務功能。
  • Executors類:執行緒池工廠,通過它可以取得一個特定功能的執行緒池。
  • ScheduleExecutorService:可以指定時間或週期執行的執行緒池。

我們平時用的最多的便是 Executors工廠類,這個工廠類提供了能產生多個不同功能執行緒池的方法。

  • newFixedThreadPool() 方法:具有固定數量的執行緒池,執行緒數量始終不變。當有一個新任務提交時,執行緒中若有空閒程序變會執行它。若沒有,則新的任務會被暫停在一個任務佇列中。
  • newCachedThreadPool() 方法:執行緒數量不固定,當執行緒不足時便會產生新的執行緒。
  • newSingleThreadExecutor()方法:只有一個執行緒的執行緒池,按先進先出的順序執行佇列中的任務。
  • newSingleThreadScheduledExecutor() 方法:只有一個執行緒的執行緒池,但是可以指定執行時間或執行週期
  • newScheduleThreadPool()方法:同上一個方法,只不過可以指定執行緒數量

自定義執行緒池

newFixedThreadPool()、newSingleThreadExecutor()和newCachedThreadPool()方法其內部都使用了 ThreadPoolExecutor執行緒池。

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>(),
                                      threadFactory);
    }
public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

我們可以看到,以上執行緒池都只是 ThreadPoolExecutor 類的封裝。這是一個比較複雜的類,我們通過建構函式來慢慢了解它。
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)

  • corePoolSize:執行緒池執行緒數量
  • maxMumPoolSize:最大執行緒數量
  • keepAliveTime:超過corePoolSize數量的執行緒的存活時間
  • unit: keepAliveTime的單位
  • workQueue:任務佇列,儲存了被提交但是未被執行的任務
  • threadFactory:用於建立執行緒的執行緒工廠

workQueue用來盛放被提交但未執行的任務佇列,它是一個BlockingQueue 介面的物件,僅用於存放 Runnable物件。可以使用以下一種阻塞佇列
直接提交佇列:由SynchronousQueue 物件提供。即每一次提交任務時,如果沒有空閒的執行緒,就會提交失敗或者執行拒絕策略。因此,此時要設定很大的maximumPoolSize值。
有界的任務佇列:可以使用 ArrayBlockingQueue。這時,如果有新任務需要執行,且實際執行緒數小於corePoolSize 時,會優先建立執行緒。若大於corePoolSize,則會將任務加入等待佇列。若佇列已滿,則會建立新執行緒且保證執行緒數不大於 maximumPoolSize。若大於 maximumPoolSize,則執行拒絕策略。
無界任務佇列:使用 LinkedBlockingQueue 類實現。和有界任務佇列類似,只不過系統執行緒數到達corePoolSize後就不在增加。後續任務都會放入阻塞佇列中,直到耗盡系統資源。
優先任務佇列: 通過 PriorityBlockingQueue 實現。可以控制任務的執行先後順序,是一個特殊的無界佇列。

ThreadPoolExecutor 最後一個引數 handler 指定了拒絕策略,有如下幾種:

  • AbortPolicy策略:直接丟擲異常,阻止系統正常工作。
  • CallerRunsPolicy策略:只要執行緒池未關閉,該策略直接在呼叫者執行緒中,運行當前被丟棄的任務。
  • DiscardOledestPolicy策略:丟棄最老的一個請求(即將被執行的任務),並嘗試再次提交當前任務。
  • DiscardPolicy策略:丟棄無法處理的任務,不作任何處理。

以上拒絕策略都實現了 RejectedExecutionHandler 介面,我們也可以擴充套件這個介面來實現自己的拒絕策略。

下面,我們使用優先佇列自定義執行緒池來實現具有優先順序排程功能的執行緒池。使用優先佇列時,任務執行緒必須實現 Comparable 介面。

package executor;

import java.util.concurrent.Executor;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class MyThread implements Runnable,Comparable<MyThread>{
	private String name;
	public MyThread(String name){
		this.name=name;
	}
	public MyThread(){
		name="";
	}
	
	@Override
	public void run(){
		try{
			Thread.sleep(100);
			System.out.println(name+" running....");
		}catch(InterruptedException e){
			e.printStackTrace();
		}
	}
	@Override
	public int compareTo(MyThread o){
		int me = Integer.valueOf(name.split("_")[1]);
		int other = Integer.valueOf(o.name.split("_")[1]);
		return me-other;
	}
	
	public static void main(String[] args){
		Executor exe = new ThreadPoolExecutor(100, 100, 0L, TimeUnit.SECONDS, new PriorityBlockingQueue<Runnable>());
		for(int i =0;i<1000;i++){
			exe.execute(new MyThread("testThread_"+(999-i)));
		}
		
	}
}

輸出如下:
testThread_998 running....
testThread_999 running....
testThread_993 running....
testThread_995 running....
testThread_997 running....
。。。
testThread_912 running....
testThread_900 running....
testThread_910 running....
testThread_1 running....
testThread_2 running....
testThread_3 running....
testThread_4 running....
testThread_0 running....

可以看到,900-999號執行緒是任務提交時,未經過優先佇列而直接被執行的任務(程式剛執行時有大量的空閒程序100個,無需使用等待佇列),之後是經過優先佇列中轉之後被執行的任務。可以看到,優先順序較高的任務(數字越小,優先順序越高)優先被執行。

擴充套件ThreadPoolExecutor

ThreadPoolExecutor 也是一個可以擴充套件的執行緒池,它提供了 beforeExecute()、afterExecute()和terminated()3個介面對其進行控制。

在 Worker.runWorker() 的方法內部提供了這樣一個呼叫過程:

try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }

ThreadPoolExecutor.Worker 是預設的工作執行緒,在其執行過程中,提供了空的 beforeExecute() 和 afterExecute() 的實現。實際應用中,可以對其進行拓展,實現對執行緒池執行狀態的跟蹤,輸入一些有用的除錯資訊,以幫助系統故障診斷。
class MyThreadPoolExecutor extends ThreadPoolExecutor {
	public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
			long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
		super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
	}

	protected void beforeExecute(Thread t, Runnable r) {
		System.out.println("beforeExecute MyThread name:" + t.getName()
				+ " ID:" + t.getId());
	}

	protected void afterExecute(Runnable r, Throwable t) {
		System.out.println("afterExecute MyThread name:" + Thread.currentThread().getName() + " ID:"
				+ Thread.currentThread().getId());
		System.out.println("afterExecute PoolSize:" + this.getPoolSize());
	}
}

以上程式碼實現了一個帶有日誌輸出功能的執行緒池,該執行緒池會在任務執行前輸入即將要執行的任務名稱和執行緒ID,同時會在任務完成後,輸出執行緒的 ID 和當前執行緒池的執行緒數量。