1. 程式人生 > 實用技巧 >Netty原始碼解析 -- 客戶端啟動過程

Netty原始碼解析 -- 客戶端啟動過程

上一篇文章分享了Netty服務端啟動過程,本文繼續分享Netty客戶端啟動過程。
原始碼分析基於Netty 4.1

Connect

客戶端啟動過程比較簡單,主要是Connect操作。
Netty客戶端啟動引導類是Bootstrap,同樣繼承了AbstractBootstrap,它只有一個EventLoopGroup,下文稱為ConnectGroup。

Bootstrap#connect -> doResolveAndConnect -> doResolveAndConnect0

private ChannelFuture doResolveAndConnect0(final Channel channel, SocketAddress remoteAddress,
										   final SocketAddress localAddress, final ChannelPromise promise) {
	try {
		final EventLoop eventLoop = channel.eventLoop();
		// #1
		final AddressResolver<SocketAddress> resolver = this.resolver.getResolver(eventLoop);
		
		...
		
		final Future<SocketAddress> resolveFuture = resolver.resolve(remoteAddress);

		if (resolveFuture.isDone()) {
			final Throwable resolveFailureCause = resolveFuture.cause();

			if (resolveFailureCause != null) {
				channel.close();
				promise.setFailure(resolveFailureCause);
			} else {
				// #2
				doConnect(resolveFuture.getNow(), localAddress, promise);
			}
			return promise;
		}

		...
	} catch (Throwable cause) {
		promise.tryFailure(cause);
	}
	return promise;
}

#1
AddressResolver負責解析SocketAddress。它可以做一些地址轉換工作。如Netty提供了RoundRobinInetAddressResolver,可以對下游服務叢集進行輪詢呼叫。
Bootstrap#resolver是一個AddressResolverGroup,它負責構造AddressResolver,預設使用DefaultAddressResolverGroup。
#2 呼叫doConnect,執行Connect操作。

doConnect -> AbstractChannel#connect -> DefaultChannelPipeline#connect -> HeadContext#connect -> AbstractNioUnsafe#connect
(這裡涉及DefaultChannelPipeline的內容後續有文章解析)

public final void connect(
		final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
	...

	try {
		...

		boolean wasActive = isActive();
		// #1
		if (doConnect(remoteAddress, localAddress)) {
			fulfillConnectPromise(promise, wasActive);
		} else {
			connectPromise = promise;
			requestedRemoteAddress = remoteAddress;

			// #2
			int connectTimeoutMillis = config().getConnectTimeoutMillis();
			if (connectTimeoutMillis > 0) {
				connectTimeoutFuture = eventLoop().schedule(new Runnable() {
					public void run() {
						ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
						ConnectTimeoutException cause =
								new ConnectTimeoutException("connection timed out: " + remoteAddress);
						if (connectPromise != null && connectPromise.tryFailure(cause)) {
							close(voidPromise());
						}
					}
				}, connectTimeoutMillis, TimeUnit.MILLISECONDS);
			}
			// #3
			promise.addListener(new ChannelFutureListener() {
				public void operationComplete(ChannelFuture future) throws Exception {
					if (future.isCancelled()) {
						if (connectTimeoutFuture != null) {
							connectTimeoutFuture.cancel(false);
						}
						connectPromise = null;
						close(voidPromise());
					}
				}
			});
		}
	} catch (Throwable t) {
		promise.tryFailure(annotateConnectException(t, remoteAddress));
		closeIfClosed();
	}
}

#1 呼叫SocketChannel#connect,如果是非阻塞Socket呼叫,該方法返回false。
#2 給EventLoop新增一個定時任務,如果連線超時則關閉Channel。
Netty中也提供了ReadTimeoutHandler處理讀超時的場景。
#3 給promise新增一個回撥方法,connect操作完成時,如果connect操作被取消了,則關閉Channel。

NioSocketChannel#doConnect

protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
    ...

    boolean success = false;
    try {
    	// #1
        boolean connected = SocketUtils.connect(javaChannel(), remoteAddress);
        // #2
        if (!connected) {
            selectionKey().interestOps(SelectionKey.OP_CONNECT);
        }
        success = true;
        return connected;
    } finally {
        if (!success) {
            doClose();
        }
    }
}

#1 呼叫(jvm)SocketChannel#connect方法,同樣,非阻塞SocketChannel呼叫該方法,返回false。
#2 關注OP_CONNECT事件。

EventLoop中負責處理OP_CONNECT事件(EventLoop後面有文章解析),呼叫AbstractNioUnsafe#finishConnect完成連線操作。

public final void finishConnect() {
	...
	try {
		boolean wasActive = isActive();
		// #1
		doFinishConnect();
		// #2
		fulfillConnectPromise(connectPromise, wasActive);
	} catch (Throwable t) {
		fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
	} finally {
		// #3
		if (connectTimeoutFuture != null) {
			connectTimeoutFuture.cancel(false);
		}
		connectPromise = null;
	}
}

#1 doFinishConnect方法由子類NioSocketChannel實現,就是呼叫(jvm)SocketChannel#finishConnect()方法
#2 設定connectPromise處理成功
#3 取消connectTimeoutFuture延遲任務

註冊關注Read事件
AbstractNioUnsafe#fulfillConnectPromise -> DefaultChannelPipeline#fireChannelActive -> HeadContext#channelActive
前面解析服務端啟動過程時說過,HeadContext#channelActive會呼叫readIfIsAutoRead方法,判斷是否開啟autoRead,開啟則自動觸發read事件處理方法。
HeadContext#readIfIsAutoRead -> AbstractChannel#read -> HeadContext#read -> AbstractUnsafe#beginRead -> AbstractNioChannel#doBeginRead
AbstractNioChannel#doBeginRead在解析服務端啟動過程時也說過,這裡會註冊關注Read事件。

客戶端啟動完成後,客戶端和服務端就可以開始進行Read/Write操作了。

如果您覺得本文不錯,歡迎關注我的微信公眾號。您的關注是我堅持的動力!