Netty4.0原始碼解析:NioServerSocketChannel
一、引言
Netty的Channel在JDK NIO的Channel基礎上做了一層封裝,提供了更多的功能。Netty的中的Channel實現類主要有:NioServerSocketChannel(用於服務端非阻塞地接收TCP連線)、NioSocketChannel(用於維持非阻塞的TCP連線)、NioDatagramChannel(用於非阻塞地處理UDP連線)、OioServerSocketChannel(用於服務端阻塞地接收TCP連線)、OioSocketChannel(用於阻塞地接收TCP連線)、OioDatagramChannel(用於阻塞地處理UDP連線):
一個EventLoop一般持有多個Channel,每個EventLoop持有一個對應的執行緒,有這個執行緒負責處理這些Channel發出的事件。
本篇文章我們先從服務端的NioServerSocketChannel開始分析:
二、初始化過程
NioServerSocketChannel在JDK的ServerSocketChannel的基礎上做了一層封裝。在NioServerSocketChannel的初始化過程中,可以簡要地分為三步:
1、例項化NioServerSocketChannel
2、完成NioServerSocketChannel的管道初始化的第一步驟(向管道新增初始的ChannelHandler)
3、將NioServerSocketChannel註冊到NioEventLoopGroup,在此期間完成管道初始化的第二步驟(比如執行ChannelInitializer的handlerAdd方法)
4、繫結埠,開始接受客戶端連線。
我們在ServerBootstrap進行引導的過程中,需要呼叫channel方法指定一個ServerChannel實現類的Class物件,channel方法隨即會生成一個ChannelFactory工廠於ServerBootstrap例項中。NioServerSocketChannel的例項化在ServerBootstrap的bind方法中完成。bind方法呼叫的doBind方法中的initAndRegister方法中,會通過ChannelFactory例項構造出一個NioServerSocketChannel:
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
//通過ChannelFactory構造一個Channel例項
channel = channelFactory().newChannel();
init(channel);
} catch (Throwable t) {
if (channel != null) {
channel.unsafe().closeForcibly();
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
}
//獲取bossGroup例項並呼叫register方法繫結
ChannelFuture regFuture = group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}
通過ChannelFactory構造的Channel都是通過無參構造方法構造的,我們來分析NioServerSocketChannel的構造方法:
public NioServerSocketChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
這裡通過靜態newSocket方法構造了一個JDK的ServerSocketChannel:
private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
private static ServerSocketChannel newSocket(SelectorProvider provider) {
try {
return provider.openServerSocketChannel();
} catch (IOException e) {
throw new ChannelException("Failed to open a server socket.", e);
}
}
這裡通過預設的SelectorProvider例項通過呼叫它的openServerSocketChannel構造了一個JDK的ServerSocketChannel例項,接著呼叫另外的過載的構造方法
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
NioServerSocketChannelConfig是NioServerSocketChannel的內部類,儲存了以下內容:NioServerSocketChannel(this)、和剛剛構造的ServerSocketChannel通過socket方法獲取的ServerSocket例項、引導過程通過options方法設定的引數。
NioServerSocketChannel父類AbstractNioMessageChannel的構造方法:
protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent, ch, readInterestOp);
}
AbstractNioMessageChannel父類AbstractNioChannel構造方法:
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch; //傳入JDK ServerSocketChannel
this.readInterestOp = readInterestOp; //感興趣通道事件為OP_ACCEPT
try { //將ServerSocketChannel設為非阻塞
ch.configureBlocking(false);
} catch (IOException e) { //如果丟擲異常那麼關閉ServerSocketChannel
try {
ch.close();
} catch (IOException e2) {
if (logger.isWarnEnabled())
logger.warn("Failed to close a partially initialized socket.", e2);
}
throw new ChannelException("Failed to enter non-blocking mode.", e);
}
}
AbstractNioChannel父類AbstractChannel構造方法:
protected AbstractChannel(Channel parent) {
this.parent = parent; //ServerSocketChannel不存在父Channel
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
ServerSocketChannel不存在父Channel,所以parent為null,只有SocketChannel存在(其parent為ServerSocketChannel例項)。
接著通過newUnsafe方法構造一個Channel.Unsafe介面實現類。在客戶端引導過程已經提到過,Unsafe是Netty到JDK NIO的橋樑,Unsafe介面定義了以下方法:
方法簽名 | 作用 |
---|---|
SocketAddress localAddress() | 獲取該Channel的本地地址 |
SocketAddress remoteAddress() | 獲取該Channel連線地址 |
void register(EventLoop, ChannelPromise) | 向EventLoop註冊這個Channel,並通知指定的ChannelPromise |
void bind(SocketAddress, ChannelPromise) | 指定一個本地地址,將Channel繫結在這個地址,並通知指定的ChannelPromise |
void connect(SocketAddress, SocketAddress, ChannelPromise) | 通過本地地址向指定的地址發起連線,並通知ChannelPromise |
void disconnect(ChannelPromise) | 取消連線,並通知指定的ChannelPromise |
void close(ChannelPromise) | 關閉Channel通道,並通知指定的ChannelPromise |
void closeForcibly() | 立刻關閉Channel通道,不通知ChannelPromise |
void deregister(ChannelPromise) | 將這個Channel從EventLoop取消註冊,並通知指定的ChannelPromise |
void beginRead() | 寫入緩衝區,以便進站處理器能夠讀到進站資料 |
void write(Object, ChannelPromise) | 向緩衝區中寫入資料,並通知指定的ChannelPromise |
void flush() | 輸出呼叫write方法寫入的緩衝區資料 |
ChannelPromise voidPromise() | 返回這個Channel的特殊ChannelPromise(Void Promise),用於將ChannelPromise作為引數但不希望得到通知的操作。 |
ChannelOutboundBuffer outboundBuffer() | 獲取出站緩衝區 |
對於NioServerSocketChannel,其newUnsafe方法實現在父類AbstractNioMessageChannel中:
@Override
protected AbstractNioUnsafe newUnsafe() {
return new NioMessageUnsafe();
}
newChannelPipeline方法則是比較簡單,直接構造一個DefaultChannelPipeline就完了。
回到ServerBootstrap:當initAndRegister方法執行結束後,init方法隨即會被呼叫並傳入剛才構造的ServerSocketChannel例項作為引數:
@Override
void init(Channel channel) throws Exception {
//將通過option方法指定引數加入到Map中
final Map<ChannelOption<?>, Object> options = options();
synchronized (options) {
setChannelOptions(channel, options, logger);
}
//將初始化屬性傳入到Map中
final Map<AttributeKey<?>, Object> attrs = attrs();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}
ChannelPipeline p = channel.pipeline();
//獲取workGroup、childGroup指定的ChannelHandler
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
}
//向管道尾部新增一個ChannelInitializer例項
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = handler();
//新增使用者自定義的bossGroup的ChannelHandler
if (handler != null)
pipeline.addLast(handler);
//向管道尾部新增一個ServerBootstrapAcceptor例項
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(ch, currentChildGroup,
currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
init方法將引導過程通過options方法設定的引數匯入到NioServerSocketChannel的ChannelConfig中,然後再將初始屬性匯入到NioServerSocketChannel間接父類DefaultAttributeMap中。
完成上述步驟後,向ServerSocketChannel所屬的管道尾部新增一個ChannelInitializer例項,它重寫了initChannel方法,並指定向管道新增使用者自定義的ChannelHandler(通過handler方法指定的),然後再非同步新增一個ServerBootstrapAcceptor例項,用於將接收到的客戶端連線對應的NioSocketChannel轉交給workGroup進行管理。
前幾篇部落格我們已經提到過,ChannelInitializer在完成自己的工作(執行完initChannel方法)後會將自己從管道中移除。但是就目前而言還僅僅只是完成了管道初始化的第一步驟,因為ChannelInitializer的handlerAdd方法還尚未呼叫。
init方法執行完畢後,就會將這個ServerSocketChannel註冊到bossGroup中,最終是通過AbstractUnsafe的register方法進行註冊:
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
if (eventLoop == null)
throw new NullPointerException("eventLoop");
if (isRegistered()) { //不允許重複註冊或者同一個Channel註冊到不同的EventLoop中
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) { //如果不能註冊到這種型別的EventLoop中
promise.setFailure(new IllegalStateException("incompatible event loop type: " +
eventLoop.getClass().getName()));
return;
}
AbstractChannel.this.eventLoop = eventLoop;
//如果當前執行緒就是eventLoop所屬的執行緒,那麼直接執行register0方法,否則非同步執行
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn("Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
register方法在進行一系列引數檢查和狀態檢查後,繼而會執行register0方法:
private void register0(ChannelPromise promise) {
try {
if (!promise.setUncancellable() || !ensureOpen(promise))
return;
boolean firstRegistration = neverRegistered; //預設為true
doRegister(); //註冊到Selector中
neverRegistered = false;
registered = true;
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
//向管道發出Channel註冊事件
pipeline.fireChannelRegistered();
if (isActive()) { //如果Channel是可用的
if (firstRegistration) {
//向管道發出Channel可用事件
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
} catch (Throwable t) {
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
register0方法首先呼叫doRegister進行註冊:
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
eventLoop().selectNow();
selected = true;
} else {
throw e;
}
}
}
}
doRegister將ServerSocketChannel註冊到了EventLoop的Selector上,並將附加物件設為this。doRegister方法執行結束後,就可以認為Channel已經註冊到了EventLoop中,因為現在這個Channel持有的JDK Channel已經被EventLoop持有的Selector管理。
隨後,就會呼叫管道物件的invokeHandlerAddedIfNeeded方法:
final void invokeHandlerAddedIfNeeded() {
assert channel.eventLoop().inEventLoop();
if (firstRegistration) {
firstRegistration = false;
callHandlerAddedForAllHandlers();
}
}
invokeHandlerAddedIfNeeded此時就會呼叫callHandlerAddedForAllHandlers執行回撥任務。
讓我們回顧下服務端的引導過程,當呼叫addLast方法將ChannelInitializer新增到管道中後,如果管道檢測到這是它第一次呼叫addLast方法,並不會馬上執行ChannelInitializer的handlerAdd方法去執行我們重寫的方法,而是註冊了一個回撥任務,並把這個回撥任務新增到了DefaultChannelPipeline的成員變數pendingHandlerCallbackHead中,如果有多個回撥任務,那麼它可以通過它的成員變數next去維護一個單向連結串列。
瞭解這些後,我們再來看callHandlerAddedForAllHandlers這個方法的實現:
private void callHandlerAddedForAllHandlers() {
final PendingHandlerCallback pendingHandlerCallbackHead;
synchronized (this) {
assert !registered;
registered = true;
pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;
this.pendingHandlerCallbackHead = null;
}
//遍歷該連結串列,依次執行回撥任務
PendingHandlerCallback task = pendingHandlerCallbackHead;
while (task != null) {
task.execute();
task = task.next;
}
}
這樣,我們指定的ChannelInitializer的handlerAdd方法隨即會被呼叫,管道初始化的第二步驟也就隨之完成。
回到register0方法:
接下來會依次向管道傳遞ChannelRegister、ChannelActive事件。傳遞完成後,NioServerSocketChannel開始進入埠繫結過程。
回到ServerBootstrap的doBind方法,doBind方法會開始進行埠的繫結過程,最終會呼叫到NioServerSocketChannel的doBind方法:
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) { //如果JDK版本在1.7以上
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}
這裡直接呼叫到了JDK ServerSocketChannel的bind方法直接繫結指定的埠,並指定了最大連線數(預設為128),可以通過呼叫options並傳入ChannelOption.SO_BACKLOG,然後指定一個最大連線數。當連線數超出後,客戶端的連線請求會被阻塞。
至此,NioServerSocketChannel初始化過程完成。
三、事件處理
初始化過程完成後,此時的NioServerSocketChannel已經被EventLoop的Selector所管理,Selector則由EventLoop所屬的執行緒進行輪詢。這個執行緒執行NioEventLoop的run方法,通過一個無限迴圈不停地處理IO事件和一般任務。
當有客戶端發起連線時,ServerSocketChannel會發出OP_ACCEPT事件,就會通過Unsafe的read方法(實現在AbstractNioMessageUnsafe)處理這個事件:
@Override
public void read() {
assert eventLoop().inEventLoop();
final ChannelConfig config = config();
if (!config.isAutoRead() && !isReadPending()) {
// ChannelConfig.setAutoRead(false) was called in the meantime
//從事件集中移除這個事件
removeReadOp();
return;
}
//獲取每個迴圈讀取訊息的最大位元組數
final int maxMessagesPerRead = config.getMaxMessagesPerRead();
final ChannelPipeline pipeline = pipeline();
boolean closed = false;
Throwable exception = null;
try {
try {
for (;;) {
//將資料讀到readBuf中,返回讀取到的位元組數
int localRead = doReadMessages(readBuf);
if (localRead == 0)
break;
if (localRead < 0) {
closed = true;
break;
}
if (!config.isAutoRead())
break;
//如果readBuf的長度大於maxMessagesPerRead,退出迴圈
if (readBuf.size() >= maxMessagesPerRead)
break;
}
} catch (Throwable t) {
exception = t;
}
setReadPending(false);
//獲取讀取到的ByteBuf數量,並通過迴圈將這些ByteBuf傳遞給ChannelInboundHandler
int size = readBuf.size
相關推薦
Netty4.0原始碼解析:NioServerSocketChannel
一、引言
Netty的Channel在JDK NIO的Channel基礎上做了一層封裝,提供了更多的功能。Netty的中的Channel實現類主要有:NioServerSocketChannel(用於服務端非阻塞地接收TCP連線)、NioSocketChanne
Netty4.0原始碼解析:位元組容器UnpooledHeapByteBuf
一、引言
Java NIO提供了ByteBuffer作為位元組容器,供Channel讀入和寫入資料。但ByteBuffer使用過於繁瑣,靈活性不夠強。Netty實現了ByteBuf來替代JDK的ByteBuffer。
ByteBuf有以下幾大優點:
1、它可以被
Netty4.0原始碼解析:TCP粘包半包問題的解決方案
一、引言
TCP是一個基於流的協議,TCP作為傳輸層協議並不知道應用層協議的具體含義,它會根據TCP緩衝區的實際情況進行資料包的劃分,所以在應用層上認為是一個完整的包,可能會被TCP拆分成多個包進行傳送,也有可能把多個小的包封裝成一個大的資料包傳送,這就是所謂的
Redis原始碼解析:15Resis主從複製之從節點流程
Redis原始碼解析:15Resis主從複製之從節點流程
版權宣告:本文為博主原創文章,未經博主允許不得轉載。 https://blog.csdn.net/gqtcgq/article/details/51172085
Java集合類原始碼解析:AbstractMap
目錄
引言
原始碼解析
抽象函式entrySet()
兩個集合檢視
操作方法
兩個子類
參考:
引言
今天學習一個Java集合的一個抽象類 AbstractMap ,AbstractMap 是Map介面的 實現類之一,也是HashMap、T
Java集合類原始碼解析:HashMap (基於JDK1.8)
目錄
前言
HashMap的資料結構
深入原始碼
兩個引數
成員變數
四個構造方法
插入資料的方法:put()
雜湊函式:hash()
動態擴容:resize()
節點樹化、紅黑樹的拆分
節點樹化
Java集合類原始碼解析:Vector
引言
之前的文章我們學習了一個集合類 ArrayList,今天講它的一個兄弟 Vector。 為什麼說是它兄弟呢?因為從容器的構造來說,Vector 簡直就是 ArrayList 的翻版,也是基於陣列的資料結構,不同的是,Vector的每個方法都加了 synchronized 修飾符,是執行緒安全的。
類
jQuery原始碼解析:變數與函式
//原始碼剖析都基於jQuery-2.0.3版本,主要考慮到相容IE
2行:jQuery javaScript Library v2.0.3——jQuery版本
3行:http://jQuery.com——官網
5~6行:Includes Sizzle.js;http://sizzlejs.
Spark2.2.2原始碼解析: 3.啟動worker節點啟動流程分析
本文啟動worker節點啟動流程分析
啟動命令:
${SPARK_HOME}/sbin/start-slave.sh spark://sysadmindeMacBook-Pro.local:7077
檢視start-slave.sh
Spark2.2.2原始碼解析: 2.啟動master節點流程分析
本文主要說明在啟動master節點的時候,程式碼的流程走向。
授予檔案執行許可權
chmod755
兩個目錄裡的檔案:
/workspace/spark-2.2.2/bin --所有檔案
/workspace/spark-2.2.2/sb
Spring4原始碼解析:BeanDefinition架構及實現
一、架構圖
首先共同看下總體的 Java Class Diagrams 圖:
二、具體類實現
2.1 AttributeAccessor
介面定義了一個通用的可對任意物件獲取、修改等操作元資料的附加契約。主要方法如下:
public interface AttributeAcce
Tomcat原始碼解析:Container中的Pipeline和Valve
前言:
我們在上一篇部落格 中分析了關於tomcat處理請求的全過程,在最後的時候交給了當前Engine的pipeline去處理。
Engine.pipeline獲取了first_valve,然後執行其invoke方法,即完成了請求
Tomcat原始碼解析:Jsp檔案的編譯、實現
1.Jsp簡介
jsp(java server page),其根本是一個簡化的Servlet技術,是一種動態網頁技術標準。
它是在傳統的網頁HTML頁面中插入java程式碼段,從而形成jsp檔案,字尾為.jsp。
Tomcat原始碼解析:Web請求處理過程
前言:
Catalina是Tomcat提供的Servlet容器實現,它負責處理來自客戶端的請求並處理響應。
但是僅有Servlet容器伺服器是無法對外提供服務的,還需要由聯結器接收來自客戶端的請求,並按照既定協議進行解析,然後交由S
Tomcat原始碼解析:Catalina原始碼解析
1.Catalina
對於Tomcat來說,Catalina是其核心元件,所有基於JSP/Servlet的Java Web應用均需要依託Servlet容器執行並對外提供服務。
4.0版本後,Tomcat完全重新設計了其Servlet
Redis5.0原始碼解析(七)----------字串物件
基於Redis5.0
字串物件
字串物件的編碼可以是 int 、 raw 或者 embstr
如果一個字串物件儲存的是整數值, 並且這個整數值可以用 long 型別來表示, 那麼字串物件會將整數值儲存在字串物件結構的 ptr 屬性裡面(將 void* 轉
Redis5.0原始碼解析(六)----------Redis物件
基於Redis5.0
之前介紹了 Redis 用到的所有主要資料結構, 比如簡單動態字串(SDS)、雙端連結串列、字典、跳躍表、整數集合, 等等,但Redis 並沒有直接使用這些資料結構來實現鍵值對資料庫, 而是基於這些資料結構建立了一個物件系統, 這個系統包含字串物件
Redis5.0原始碼解析(五)----------整數集合
基於redis5.0
整數集合(intset)是集合鍵的底層實現之一: 當一個集合只包含整數值元素, 並且這個集合的元素數量不多時, Redis 就會使用整數集合作為集合鍵的底層實現
redis> SADD numbers 1 3 5 7 9
(integer
Redis5.0原始碼解析(四)----------跳躍表
基於Redis5.0
跳躍表(skiplist)是一種有序資料結構, 它通過在每個節點中維持多個指向其他節點的指標, 從而達到快速訪問節點的目的
跳躍表支援平均 O(log N) 最壞 O(N) 複雜度的節點查詢, 還可以通過順序性操作來批量處理節點。
在大部分
Redis5.0原始碼解析(三)----------字典(詳細)
基於Redis5.0
在字典中, 一個鍵(key)可以和一個值(value)進行關聯(或者說將鍵對映為值), 這些關聯的鍵和值就被稱為鍵值對
字典中的每個鍵都是獨一無二的, 程式可以在字典中根據鍵查詢與之關聯的值, 或者通過鍵來更新值, 又或者根據鍵來刪除整個鍵值對