Netty核心概念(9)之Future
1.前言
第7節講解JAVA的線程模型中就說到了Future,並解釋了為什麽可以主線程可以獲得線程池任務的執行後結果,變成一種同步狀態。秘密就在於Java將所有的runnable和callable任務,統一變成了callable,最終包裝成了FutureTask對象,該類實現了Runnable接口和Future接口,所以FutureTask能夠被線程執行。最終異步執行過程全部由該類控制邏輯,所以在get的時候鎖住了該類,run方法執行的時候釋放了鎖,這樣就滿足了能夠在異步線程執行完畢獲取相關結果的能力。
本章介紹一下Netty對Future的設計,Netty的聲明就是一個異步事件驅動框架,上一節學習了整個線程調度的過程,並在最後給出了前幾節的一個綜合流程圖,雖然圖中提到了幾種Future,但是沒有具體介紹細節,這些將在本節得到解釋。
2.相關概念
2.1 Future
雖然Java中已經定義了Future,但是滿足不了Netty的需求,所以Netty新寫了一個Future接口,繼承了JDK的Future。額外方法定義如下圖:
接口主要追加了兩個功能:1.增加了判斷任務是否成功失敗的方法,以及失敗獲取異常信息;2.增加了任務完成時觸發的監聽器
2.2 Promise
該類繼承自Future,自然是增加了額外的功能了:這是一個可寫的Future。什麽意思呢?通過之前的知識,我們知道Future都是由異步線程控制的,主線程是無法控制線程執行的。Promise的作用就是主線程能夠控制一下執行的任務。
setSuccess():標記任務成功,並觸發所有listener。如果任務早就成功或失敗,則拋出異常
trySuccess():同上,但是失敗只是返回false,而不是拋出異常
這裏就解釋這兩個方法,其他方法是覆蓋了父接口的方法,確定返回的具體類型而已。根據方法我們也能大體明白可寫的含義了。
3 主要Future詳解
3.1 DefaultChannelPromise
該類是在註冊channel時創建的,SingleThreadEventLoop的register方法。和Java的FutureTask不同,FutureTask是作為一個任務交給線程池,在內部控制任務執行。DefaultChannelPromise則是持有了Channel和EventExecutor二者,在外邊處理邏輯。其上層有2個抽象父類:
1.AbstractFuture:
該類就實現了get方法,原理是調用了await()方法,await之後喚醒肯定就是任務結束了,判斷有無異常,最終返回結果還是拋出異常。
2.DefaultPromise:
該類提供了一個Promise應該具備的基本實現。對任務標記結果,觸發listener等。其主要有個result,對結果進行CAS操作來判斷任務是否完成。
setSuccess過程如下:1.設置成功的結果;2.觸發所有的listener。設置結果主要是CAS更新result字段,然後判斷是否有get請求等待任務執行完,直接notifyAll即可。觸發listener的過程在於先判斷當前線程是否是事件線程中,觸發方法必須由EventLoop線程執行,然後就是遍歷觸發listener的operationComplete方法。
await過程如下:1.判斷是否執行完,執行完直接返回;2.判斷線程是否中斷,中斷拋出異常;3.檢查死鎖,即wait操作不能在EventLoop的線程中執行;4.如果沒執行完,等待者計數,然後wait。
DefaultChannelPromise相對於DefaultPromise而言只是增加了一個channel字段,其它的方法都是調用父類方法。接下來我們看看register過程是怎麽使用這個Promise的吧。
public final void register(EventLoop eventLoop, final ChannelPromise promise) { if (eventLoop == null) { throw new NullPointerException("eventLoop"); } if (isRegistered()) { promise.setFailure(new IllegalStateException("registered to an event loop already")); return; } if (!isCompatible(eventLoop)) { promise.setFailure( new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName())); return; } AbstractChannel.this.eventLoop = eventLoop; if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } }); } catch (Throwable t) { logger.warn( "Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, t); closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } } }
可以看到在註冊過程中實際上就是使用setXXX方法來處理相關邏輯的,這個和Java的FutureTask采取了不同的方式。
3.2 SucceededFuture
上面講了一個Promise控制主線程和線程池的同步狀態,那個是依靠promise才有的setXXX接口來觸發的。那麽Future是怎麽控制的呢?答案是Future不需要控制,返回Future的時候就已經有結果了,並且返回一定是一個同步過程。
以SucceededFuture為例。Bootstrap中的doResolveAndConnect0方法有段:final Future<SocketAddress> resolveFuture = resolver.resolve(remoteAddress);其解析成功就會返回帶有結果的SucceededFuture。看這個類的sync方法也和Promise的不一樣,Promise是await,Future是直接返回。這個可以說明Future和Promise的區別:Future用於同步任務,Promise用於異步任務。不知道Netty為什麽會設計成這樣,讓人會有些疑惑。但是記住這點,再加上Promise的set方法達成的效果,就可以理解Netty的Future了。
4.總結
Netty的Future設計采取了和Java的FutureTask不同的設計思路。Java的思路是將Futuren包裝成一個任務,這樣異步線程執行這個FutureTask的時候,其就可以知道任務的執行狀態。Netty將Future擴展成了Promise。Future作為同步方法直接返回的結果類,使用較少。Promise提供了setXXX方法,給異步線程調用該方法告知執行狀態。相同的地方在於Promise也必須被異步線程持有,才能使用set方法。
Netty核心概念(9)之Future