1. 程式人生 > >Jetty9原始碼剖析 - Connection元件 - SelectChannelEndPoint

Jetty9原始碼剖析 - Connection元件 - SelectChannelEndPoint

轉載自ph0ly:http://www.ph0ly.com

一、概念

EndPoint表示一個邏輯上的傳輸端點,資料的讀取和寫入都是從端點開始,配合回撥機制(當讀操作發生時可以通知到回撥函式,而寫操作也提供回撥機制,當完成寫入後,呼叫寫回調函式),可以完成非同步處理的。端點的實現通常具有超時的特性,在端點未產生任何讀寫的情況下,超時機制可以保證EndPoint關閉(包括關聯的Channel或其他資源),避免一些資源的浪費。SelectChannelEndPoint是其中一種實現,能支援NIO任務生產邏輯的實現

二、繼承體系

繼承體系

我們看到整個繼承體系還是非常簡單的

AbstractEndPoint是抽象的EndPoint實現,絕大部分邏輯都放到了AbstractEndPoint,例如回撥註冊、端點超時等操作

ChannelEndPoint表示一個包裝了SocketChannel的EndPoint,主要封裝一些對SocketChannel的操作,包括對Channel的資料讀寫

 

SelectChannelEndPoint主要封裝EPC執行任務需要的介面,其實就是實現了ManagedSelector.SelectableEndPoint介面,後續IO事件發生時會觸發這個EndPoint的onSelected方法,獲取到待執行任務

三、總體架構

架構

從圖中可以看出SelectChannelEndPoint主要通過操縱SocketChannel完成IO的讀寫

當IO事件發生時,SelectChannelEndPoint在EPC的觸發下首先執行onSelected,完成回撥型別任務的選擇,完成後返回一個任務,這個任務由EPC繼續執行,呼叫到任務的run方法,這時候會觸發之前EndPoint上關聯的讀或者寫回調,從而呼叫到了HttpConnection.onFillable或者WriteFlusher的寫操作,這樣就能觸發到後續的操作了

四、原始碼剖析

1. 建構函式

建構函式-1

之前在SelectorManager裡面其實已經介紹過,EndPoint的建立是由ServerConnectorManager來建立,呼叫的是上圖的方法,都是賦值,比較簡單
重點看下super裡面在做什麼

建構函式-2

ChannelEndPoint也比較簡單,把channel放進來,同時取出socket,再看上一層的super

建構函式-3

仍然很簡單,再上一級就是IdleTimeout了

建構函式-4

都是賦值操作,比較簡單不解釋

2. 讀資料

讀操作

讀資料核心就是這個方法了,EPC在執行生產任務的時候呼叫了SelectorProducer.produce,實際就是先調這個方法拿任務,拿到任務再在當前執行緒執行(不清楚的讀者可以回去看下EPC和ManagedSelector相關的文章),下面我們來仔細分析下

首先看當前這個SocketChannel關聯的SelectionKey的interestOps,拿到準備好的readyOps,然後將當前的_desiredInterestOps去掉當前準備好的操作,那如果之前註冊的OP_READ,剛好來一個讀事件,其實這裡就會導致_desiredInterestOps變成0,而在updateKey裡面會觸發更改這個SelectionKey的interestOps,這樣就會導致其實什麼都不監聽了,相信讀者看到這裡比較懵逼,這是什麼操作?
其實這裡是保證在當前這條連線上的資料能按照順序處理,先讓前面這個事件處理完成後,在再HttpConnection.onFillable裡面會切回讀模式,繼續監聽接下來的事件,本身HTTP/1.x協議就是順序執行,正好適合這種模式。另外對於OP_WRITE來說,如果不取消,就會導致Selector一直檢測到寫事件,那就會導致無限write,這也是要去掉這個readyOps的原因

 

後面就檢測當前是讀還是寫,然後根據讀寫事件型別拿到一個任務,可能是3種情況:只寫、只讀、讀寫,這個不難理解。但是有些讀者可能會問,這裡為啥要有寫操作?其實對於NIO寫操作來說,往Channel裡面寫資料不一定一次效能寫完,在底層緩衝區滿了的情況下,write操作可能會返回0,也就是一個都沒寫成功,如果while一直寫入,CPU就打爆了,因此利用Selector來檢測OP_WRITE操作,當可以寫的時候嘗試去寫,這樣保證CPU不會輕易飆升

再來看下這3個任務的實現

3個任務

_runFillable提供的是讀操作,利用fillInterest註冊的回撥,呼叫它的fillable方法,通常就是呼叫到的就是HttpConnection.onFillable方法

_runCompleteWrite提供的是寫操作,從名字上來看是完成寫,其實就是在應用層拿到OutputStream往流裡面寫的時候,部分資料沒寫完,這個時候需要完成剩下的資料寫入,這裡會呼叫WriteFlusher來刷,WriteFlusher最終呼叫到EndPoint的flush,完成向Channel的寫入,這塊在下面的寫入裡面詳細講解

 

_runCompleteWriteFillable提供先寫後讀操作,優先完成之前未完成寫入的資料,之後再執行讀操作

最後附上FillInterest.fillable的實現

FillInterest.fillable

比較簡單,提供了回撥機制,呼叫succeeded,AbstractConnection裡面的ReadCallback實現的succeeded就是自身的onFillable方法,自然就調到HttpConnection.onFillable了

接下來再來看下如何從Channel讀到資料的,後面HttpConnection會觸發這個方法,拿到真實的資料

EndPoint.fill

HttpConnection.onFillable裡面會利用EndPoint提供的fill方法,完成向Buffer裡面刷入資料,這樣才能開始自己的解析
從上面可以看到,其實很簡單,就是切換ByteBuffer的模式為讀,如果讀到了資料,那就把定時器置為非空閒,保證端點的活性,如果讀到-1了,那就是EOF,連線的讀方向已經跪了,所以關閉輸入,最後返回讀到了多個位元組,出現IO異常也是關了輸入方向,最後把Buffer切回到寫模式

3. 寫資料

3.1 直接寫

EndPoint本身對外提供的其實是write方法作為寫入方法

EndPoint.write

可以看到這裡其實還是呼叫了WriteFlusher來完成寫入

WriteFlusher.write

WriteFlusher的實現首先調自己的flush方法,完成刷緩衝(這個下面會解釋),這個flush方法會返回未完成寫入的Buffer,如果存在未完成的Buffer,這裡會將這些Buffer作為PendingState存下來,並利用onIncompleteFlush來向Channel的SelectionKey修改OP_WRITE的操作,這樣其實就能在之後仍然保證之前的資料能刷完。如果flush一次性刷完了,那其實也不用改OP_WRITE,因此直接就callback.succeeded了

接下來再來看下WriteFlusher.flush的實現

WriteFlusher.flush

可以看到這裡會呼叫EndPoint.flush來刷資料(下面會講解它的實現),刷完後,會給出標識是否完成了,如果完成了,那這裡就返回了,如果沒完成,這裡就會去把這些未完成的Buffer靠背到新的數組裡面去,並返回給上一級呼叫

再來看下EndPoint.flush是如何實現的,其實更具體說是ChannelEndPoint,當然也是我們的SelectChannelEndPoint

EndPoint.flush

可以看到就是直接操作了Channel.write,直接寫到了SocketChannel,其實看起來Jetty這裡是有個bug,else分支的buffers應該一直是0,那什麼也不會做
因為Channel.write會操作ByteBuffer的postion,所以最後判斷是否為空,那就能看出哪些Buffer未完成寫入,只要有一個未完成,那其實整體就是未完成寫入

3.2 完成寫

上面其實提到了寫操作可能並不能一次性寫完,那就要利用WriteFlusher的completeWrite來完成寫入,接下來我們看下它的實現

WriteFlusher.completeWrite

這裡從之前存下來的PendingState拿出未完成的寫,之前說到PendingState會記錄未完成寫入的Buffer陣列,因此這裡直接拿出來,調WriterFlusher.flush(上面已經講解過了),如果仍然存在未完成的,那繼續放到PendingState,等待下一次完成寫入,否則就讓PendingState.complete,其實就是調了callback.succeeded

4. 超時

之前我們看到繼承體系中,EndPoint其實是繼承了IdleTimeout,那也就是它具有超時處理的能力,超時處理在EndPoint發揮了什麼作用呢?接下來我們一起來看下
IdleTimeout的邏輯比較簡單,就是定時器定時檢測當前是否已經到了超時時間,如果超時了,則執行一個抽象的onIdleExpired方法,讓業務層自己處理,在AbstractEndPoint裡面就實現了這部分邏輯

AbstractEndPoint.onIdleExpired

這裡會觸發_fillInterest.onFail,這個操作會導致整個連線進入讀關閉狀態,_writeFlusher.onFail有可能會觸發寫操作關閉狀態,這樣其實就導致在超時後,Http連線直接被關了,通常超時時間是30秒,這兩個操作這裡就不細講了,感興趣的讀者可以下來看下原始碼

五、總結

EndPoint作為傳輸的端點(在實際用的通常就是SelectChannelEndPoint)它封裝了對Channel的資料操作,同時對EPC暴露出任務獲取介面,利用Callback調到下一級操作,可以說EndPoint是真實資料互動的橋樑,幫助上層的Connection讀寫資料,在Jetty的通訊層具有非常核心的作用,實現相對還是有些複雜,希望讀者們能有很多收穫。接下來的文章我會帶領大家一起來看下HttpConnection是如何完成後續的HTTP協議解析的,歡迎大家持續關注~