1. 程式人生 > >ThreadPoolExecutor的submit方法以及FutureTask原始碼學習

ThreadPoolExecutor的submit方法以及FutureTask原始碼學習

ThreadPoolExecutor#Submit:
public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task); //將Callable包裝成FutureTask
        execute(ftask);            //最後還是呼叫execute方法執行 , FutureTask實現Runnable介面
        return ftask;
    }

ThreadPoolExecutor#execute: 新增task到執行緒池,不多說 。 
public void execute(Runnable command) {
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }
這裡說下,worker新增成功後啟動與其關聯的執行緒,不斷從ThreadPool維護的BlockQueue中獲取新的Runnable物件。
private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
       ...
        public void run() {
            runWorker(this);//啟動worker
        }
       ...
}

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        ...
        try {
            while (task != null || (task = getTask()) != null) { //不斷的從BlockQueue獲取新的task
               ...
                        task.run();//呼叫task的run方法
               ...
               }
         ...
    }
注意這裡task的實際是FutureTask
FutureTask#run:
public void run() {
        ...
            Callable<V> c = callable; //futureTask關聯的Callabel
            ...
               try {
                    result = c.call(); //呼叫call,完成業務邏輯,呼叫完成又返回了,說明task以及完成 。 
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex); //如果出錯了,將狀態改為EXCEPTIONAL
                };
                ...
                    set(result); //處理返回
               ...
    }

protected void set(V v) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { //這裡通過CAS將task執行的 
  //狀態改為COMPLETING,前提是先前的狀態是NEW 
            outcome = v;
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state  //將狀態該為NORMAL,即任 //務正常完成 .
            finishCompletion(); //
        }
    }

private void finishCompletion() {
        // assert state > COMPLETING;
        ...
        done();//FutureTask的邏輯是為空的,但是這個方法可以被子類重寫。
        ...
    }

可以看看Spring實現的 org.springframework.util.concurrent.ListenableFutureTask對該方法的實現。
    protected void done() {
		Throwable cause;
		try {
			T result = get(); //掉用FutureTask的get方法,注意該方法是沒有超時設定的
			this.callbacks.success(result); //然後呼叫註冊的回撥方法,這樣做可以保證在task完成的瞬間就執行回 
// 調方法
			return;
		}
		catch (InterruptedException ex) {
			Thread.currentThread().interrupt();
			return;
		}
		catch (ExecutionException ex) {
			cause = ex.getCause();
			if (cause == null) {
				cause = ex;
			}
		}
		catch (Throwable ex) {
			cause = ex;
		}
		this.callbacks.failure(cause); //呼叫失敗的回撥方法
	}

可以看出ListenableFutureTask如果是通過回撥來處理結果的話,正常情況下,可以做到任務完成時立即就執行回撥函式。
但是在異常情況下,比如呼叫遠端服務很久沒有返回也不拋錯,當前的執行緒就會一直被佔用 ,如果這樣的task一多,執行緒池的資源
就會很快被耗盡,如果reject策略設定的是CallerRunsPolicy的話,後果就是阻賽主執行緒,本來非同步的邏輯,變得又主執行緒執行,系統崩潰。 所以如果擔心上述情況發生的話,建議使用FutureTask的get方法,並設定超時時間,同時CallerRunsPolicy小心使用。一次線上慘痛的教訓。