Netty原始碼解析 -- 客戶端啟動過程
上一篇文章分享了Netty服務端啟動過程,本文繼續分享Netty客戶端啟動過程。
原始碼分析基於Netty 4.1
Connect
客戶端啟動過程比較簡單,主要是Connect操作。
Netty客戶端啟動引導類是Bootstrap,同樣繼承了AbstractBootstrap,它只有一個EventLoopGroup,下文稱為ConnectGroup。
Bootstrap#connect -> doResolveAndConnect -> doResolveAndConnect0
private ChannelFuture doResolveAndConnect0(final Channel channel, SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) { try { final EventLoop eventLoop = channel.eventLoop(); // #1 final AddressResolver<SocketAddress> resolver = this.resolver.getResolver(eventLoop); ... final Future<SocketAddress> resolveFuture = resolver.resolve(remoteAddress); if (resolveFuture.isDone()) { final Throwable resolveFailureCause = resolveFuture.cause(); if (resolveFailureCause != null) { channel.close(); promise.setFailure(resolveFailureCause); } else { // #2 doConnect(resolveFuture.getNow(), localAddress, promise); } return promise; } ... } catch (Throwable cause) { promise.tryFailure(cause); } return promise; }
#1
AddressResolver負責解析SocketAddress。它可以做一些地址轉換工作。如Netty提供了RoundRobinInetAddressResolver,可以對下游服務叢集進行輪詢呼叫。
Bootstrap#resolver是一個AddressResolverGroup,它負責構造AddressResolver,預設使用DefaultAddressResolverGroup。
#2
呼叫doConnect,執行Connect操作。
doConnect -> AbstractChannel#connect -> DefaultChannelPipeline#connect -> HeadContext#connect -> AbstractNioUnsafe#connect
(這裡涉及DefaultChannelPipeline的內容後續有文章解析)
public final void connect( final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) { ... try { ... boolean wasActive = isActive(); // #1 if (doConnect(remoteAddress, localAddress)) { fulfillConnectPromise(promise, wasActive); } else { connectPromise = promise; requestedRemoteAddress = remoteAddress; // #2 int connectTimeoutMillis = config().getConnectTimeoutMillis(); if (connectTimeoutMillis > 0) { connectTimeoutFuture = eventLoop().schedule(new Runnable() { public void run() { ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise; ConnectTimeoutException cause = new ConnectTimeoutException("connection timed out: " + remoteAddress); if (connectPromise != null && connectPromise.tryFailure(cause)) { close(voidPromise()); } } }, connectTimeoutMillis, TimeUnit.MILLISECONDS); } // #3 promise.addListener(new ChannelFutureListener() { public void operationComplete(ChannelFuture future) throws Exception { if (future.isCancelled()) { if (connectTimeoutFuture != null) { connectTimeoutFuture.cancel(false); } connectPromise = null; close(voidPromise()); } } }); } } catch (Throwable t) { promise.tryFailure(annotateConnectException(t, remoteAddress)); closeIfClosed(); } }
#1
呼叫SocketChannel#connect,如果是非阻塞Socket呼叫,該方法返回false。
#2
給EventLoop新增一個定時任務,如果連線超時則關閉Channel。
Netty中也提供了ReadTimeoutHandler處理讀超時的場景。
#3
給promise新增一個回撥方法,connect操作完成時,如果connect操作被取消了,則關閉Channel。
NioSocketChannel#doConnect
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
...
boolean success = false;
try {
// #1
boolean connected = SocketUtils.connect(javaChannel(), remoteAddress);
// #2
if (!connected) {
selectionKey().interestOps(SelectionKey.OP_CONNECT);
}
success = true;
return connected;
} finally {
if (!success) {
doClose();
}
}
}
#1
呼叫(jvm)SocketChannel#connect方法,同樣,非阻塞SocketChannel呼叫該方法,返回false。
#2
關注OP_CONNECT事件。
EventLoop中負責處理OP_CONNECT事件(EventLoop後面有文章解析),呼叫AbstractNioUnsafe#finishConnect完成連線操作。
public final void finishConnect() {
...
try {
boolean wasActive = isActive();
// #1
doFinishConnect();
// #2
fulfillConnectPromise(connectPromise, wasActive);
} catch (Throwable t) {
fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
} finally {
// #3
if (connectTimeoutFuture != null) {
connectTimeoutFuture.cancel(false);
}
connectPromise = null;
}
}
#1
doFinishConnect方法由子類NioSocketChannel實現,就是呼叫(jvm)SocketChannel#finishConnect()方法
#2
設定connectPromise處理成功
#3
取消connectTimeoutFuture延遲任務
註冊關注Read事件
AbstractNioUnsafe#fulfillConnectPromise -> DefaultChannelPipeline#fireChannelActive -> HeadContext#channelActive
前面解析服務端啟動過程時說過,HeadContext#channelActive會呼叫readIfIsAutoRead方法,判斷是否開啟autoRead,開啟則自動觸發read事件處理方法。
HeadContext#readIfIsAutoRead -> AbstractChannel#read -> HeadContext#read -> AbstractUnsafe#beginRead -> AbstractNioChannel#doBeginRead
AbstractNioChannel#doBeginRead在解析服務端啟動過程時也說過,這裡會註冊關注Read事件。
客戶端啟動完成後,客戶端和服務端就可以開始進行Read/Write操作了。
如果您覺得本文不錯,歡迎關注我的微信公眾號。您的關注是我堅持的動力!