Netty服務埠的繫結
呼叫netty的bootstrap的bind()方法會開始netty對本地埠的繫結與監聽。
在serverBootstrap的超類abstractBootstrap的bind()方法開始繫結的全過程。
public ChannelFuture bind() { validate(); SocketAddress localAddress = this.localAddress; if (localAddress == null) { throw new IllegalStateException("localAddress not set"); } return doBind(localAddress); }
首先會在validate()方法當中對netty中的配置進行驗證,主要是對事件處理迴圈器的eventGroup和通道工廠channelFactory是否已經被配置完畢。
然後判斷是否已經配置了本地地址,然後會通過doBind()繼續下面的繫結過程。
private ChannelFuture doBind(final SocketAddress localAddress) { final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.cause() != null) { return regFuture; } if (regFuture.isDone()) { // At this point we know that the registration was complete and succesful. ChannelPromise promise = channel.newPromise(); doBind0(regFuture, channel, localAddress, promise); return promise; } else { // Registration future is almost always fulfilled already, but just in case it's not. final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel); regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { Throwable cause = future.cause(); if (cause != null) { // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an // IllegalStateException once we try to access the EventLoop of the Channel. promise.setFailure(cause); } else { // Registration was successful, so set the correct executor to use. // See https://github.com/netty/netty/issues/2586 promise.executor = channel.eventLoop(); } doBind0(regFuture, channel, localAddress, promise); } }); return promise; } }
在doBind()方法中,首先會通過initAndRegister()方法開始將所需要的通道註冊到eventGroup當中去。
final ChannelFuture initAndRegister() { final Channel channel = channelFactory().newChannel(); try { init(channel); } catch (Throwable t) { channel.unsafe().closeForcibly(); // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t); } ChannelFuture regFuture = group().register(channel); if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } 首先通過channelFactory的newChannel()方法得到新的具體的通道例項。然後呼叫init()方法開始對通道進行初始化。 void init(Channel channel) throws Exception { final Map<ChannelOption<?>, Object> options = options(); synchronized (options) { channel.config().setOptions(options); } final Map<AttributeKey<?>, Object> attrs = attrs(); synchronized (attrs) { for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) { @SuppressWarnings("unchecked") AttributeKey<Object> key = (AttributeKey<Object>) e.getKey(); channel.attr(key).set(e.getValue()); } } ChannelPipeline p = channel.pipeline(); if (handler() != null) { p.addLast(handler()); } final EventLoopGroup currentChildGroup = childGroup; final ChannelHandler currentChildHandler = childHandler; final Entry<ChannelOption<?>, Object>[] currentChildOptions; final Entry<AttributeKey<?>, Object>[] currentChildAttrs; synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size())); } synchronized (childAttrs) { currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size())); } p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel(Channel ch) throws Exception { ch.pipeline().addLast(new ServerBootstrapAcceptor( currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); }
init()方法在abstractBootstrap中是作為虛擬方法實現的,而並沒有給出真正的具體實現,可以把目光移到serverBootstrap裡有init()方法的具體實現。
一開始則會在這裡這個服務端鎖生成的通道進行配置option和attr屬性的相關配置。
然後則會取出通道的責任鏈pipline,如果這個netty已經設定了相應的channelhandler則會在這裡將這個handler加入責任鏈。
如果還配置了相應的childGroup和childHandler作為channelInitializer中重寫initChannel()方法中生成的ServerBootStrapAcceptor的構造成員則會在這裡加入責任鏈的末尾。
在channelInitializer中,實現了channel的channelRegister方法。
public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
ChannelPipeline pipeline = ctx.pipeline();
boolean success = false;
try {
initChannel((C) ctx.channel());
pipeline.remove(this);
ctx.fireChannelRegistered();
success = true;
} catch (Throwable t) {
logger.warn("Failed to initialize a channel. Closing: " + ctx.channel(), t);
} finally {
if (pipeline.context(this) != null) {
pipeline.remove(this);
}
if (!success) {
ctx.close();
}
}
}
在之後的註冊通道過程中,將在這裡被呼叫這個register()方法。也就是說,這裡直接會呼叫initChannel()方法,而initChannel()方法的具體實現,在剛才serverBootstrap裡的init()方法中,被實現了。而本身,提供了這個註冊方法的通道初始化者也會被移除通道的責任鏈當中。
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new ServerBootstrapAcceptor(
currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
在這裡,重寫了initChaneel()方法。在這裡,將ServerBootstrapAcceptor生成,並且加入通道的責任鏈當中。ServerBootstrapAcceptor重寫了channelRead()方法,給出裡讀取通道資料的方式,但在這裡並不是重點。
在初始化完畢通道之後。把目光回到abatractBootstrap的initAndRegister()方法。顧名思義,既然已經初始化完畢了通道,那麼接下來將會去向eventLoopGroup註冊相應的通道。
在eventLoopGroup中的NioEventLoopGroup的超類的超類裡MutiThreadEventExecutorGroup中,存在著chooser根據註冊通道時index選擇相應的singleThreadEventLoop來註冊相應所需要註冊的通道。
而註冊方法也恰恰實現在了singleThreadEventLoop中。
public ChannelFuture register(Channel channel) {
return register(channel, new DefaultChannelPromise(channel, this));
}
在這裡根據相應的通道和eventExcutor生成相應的channelPromise並且繼續註冊。
public ChannelFuture register(final Channel channel, final ChannelPromise promise) {
if (channel == null) {
throw new NullPointerException("channel");
}
if (promise == null) {
throw new NullPointerException("promise");
}
channel.unsafe().register(this, promise);
return promise;
}
在這裡直接通過channel的unsafe方法註冊相應的eventLoop。
在unsasfe的註冊過程中,核心之處在於AbstractNioUnsafe的doRegister()方法。
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(eventLoop().selector, 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
// Force the Selector to select now as the "canceled" SelectionKey may still be
// cached and not removed because no Select.select(..) operation was called yet.
eventLoop().selectNow();
selected = true;
} else {
// We forced a select operation on the selector before but the SelectionKey is still cached
// for whatever reason. JDK bug ?
throw e;
}
}
}
這裡完成真正的java通道與eventLoop的selector的註冊。
在所有註冊完畢之後,也就是說initAndRegister()方法的初始化與註冊都已經完畢。這樣的話,在確保註冊已經完畢之後,將會通過bind0()方法開始正式的繫結。
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
// the pipeline in its channelRegistered() implementation.
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
在這裡,在eventLoop中非同步的去將本地埠和通道去繫結,並新增通道監聽器。
在channel的bind()方法中,在abstractChannel中實現了bind()方法,而其中是直接呼叫了pipeLine的bind()方法,而在pipeline中也是直接呼叫了tail的bind方法。在headContext中,bind()方法而是直接呼叫了Unsafe的bind()方法,由此可以看見,和註冊的過程一樣,最後還是在unsafe中完成bind()的具體實現。
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
// See: https://github.com/netty/netty/issues/576
if (!PlatformDependent.isWindows() && !PlatformDependent.isRoot() &&
Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
localAddress instanceof InetSocketAddress &&
!((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress()) {
// Warn a user about the fact that a non-root user can't receive a
// broadcast packet on *nix if the socket is bound on non-wildcard address.
logger.warn(
"A non-root user can't receive a broadcast packet if the socket " +
"is not bound to a wildcard address; binding to a non-wildcard " +
"address (" + localAddress + ") anyway as requested.");
}
boolean wasActive = isActive();
try {
doBind(localAddress);
} catch (Throwable t) {
safeSetFailure(promise, t);
closeIfClosed();
return;
}
if (!wasActive && isActive()) {
invokeLater(new OneTimeTask() {
@Override
public void run() {
pipeline.fireChannelActive();
}
});
}
safeSetSuccess(promise);
}
在這裡呼叫了doBind()方法。在nioServerSocketChannel中實現。
protected void doBind(SocketAddress localAddress) throws Exception {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
這樣,變成了java通道的埠繫結,由此,netty的埠繫結結束。