ThreadPoolExecutor的submit方法以及FutureTask原始碼學習
阿新 • • 發佈:2018-11-30
ThreadPoolExecutor#Submit: public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); //將Callable包裝成FutureTask execute(ftask); //最後還是呼叫execute方法執行 , FutureTask實現Runnable介面 return ftask; } ThreadPoolExecutor#execute: 新增task到執行緒池,不多說 。 public void execute(Runnable command) { int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); } 這裡說下,worker新增成功後啟動與其關聯的執行緒,不斷從ThreadPool維護的BlockQueue中獲取新的Runnable物件。 private final class Worker extends AbstractQueuedSynchronizer implements Runnable { ... public void run() { runWorker(this);//啟動worker } ... } final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; ... try { while (task != null || (task = getTask()) != null) { //不斷的從BlockQueue獲取新的task ... task.run();//呼叫task的run方法 ... } ... } 注意這裡task的實際是FutureTask FutureTask#run: public void run() { ... Callable<V> c = callable; //futureTask關聯的Callabel ... try { result = c.call(); //呼叫call,完成業務邏輯,呼叫完成又返回了,說明task以及完成 。 ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); //如果出錯了,將狀態改為EXCEPTIONAL }; ... set(result); //處理返回 ... } protected void set(V v) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { //這裡通過CAS將task執行的 //狀態改為COMPLETING,前提是先前的狀態是NEW outcome = v; UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state //將狀態該為NORMAL,即任 //務正常完成 . finishCompletion(); // } } private void finishCompletion() { // assert state > COMPLETING; ... done();//FutureTask的邏輯是為空的,但是這個方法可以被子類重寫。 ... } 可以看看Spring實現的 org.springframework.util.concurrent.ListenableFutureTask對該方法的實現。 protected void done() { Throwable cause; try { T result = get(); //掉用FutureTask的get方法,注意該方法是沒有超時設定的 this.callbacks.success(result); //然後呼叫註冊的回撥方法,這樣做可以保證在task完成的瞬間就執行回 // 調方法 return; } catch (InterruptedException ex) { Thread.currentThread().interrupt(); return; } catch (ExecutionException ex) { cause = ex.getCause(); if (cause == null) { cause = ex; } } catch (Throwable ex) { cause = ex; } this.callbacks.failure(cause); //呼叫失敗的回撥方法 }
可以看出ListenableFutureTask如果是通過回撥來處理結果的話,正常情況下,可以做到任務完成時立即就執行回撥函式。
但是在異常情況下,比如呼叫遠端服務很久沒有返回也不拋錯,當前的執行緒就會一直被佔用 ,如果這樣的task一多,執行緒池的資源
就會很快被耗盡,如果reject策略設定的是CallerRunsPolicy的話,後果就是阻賽主執行緒,本來非同步的邏輯,變得又主執行緒執行,系統崩潰。 所以如果擔心上述情況發生的話,建議使用FutureTask的get方法,並設定超時時間,同時CallerRunsPolicy小心使用。一次線上慘痛的教訓。