1. 程式人生 > >Netty服務埠的繫結

Netty服務埠的繫結

呼叫nettybootstrapbind()方法會開始netty對本地埠的繫結與監聽

serverBootstrap的超類abstractBootstrapbind()方法開始繫結的全過程

public ChannelFuture bind() {
    validate();
    SocketAddress localAddress = this.localAddress;
    if (localAddress == null) {
        throw new IllegalStateException("localAddress not set");
    }
    return doBind(localAddress);
}

首先會在validate()方法當中對netty中的配置進行驗證,主要是對事件處理迴圈器的eventGroup和通道工廠channelFactory是否已經被配置完畢。

然後判斷是否已經配置了本地地址然後會通過doBind()繼續下面的繫結過程。

private ChannelFuture doBind(final SocketAddress localAddress) {
    final ChannelFuture regFuture = initAndRegister();
    final Channel channel = regFuture.channel();
    if (regFuture.cause() != null) {
        return regFuture;
    }

    if (regFuture.isDone()) {
        // At this point we know that the registration was complete and succesful.
        ChannelPromise promise = channel.newPromise();
        doBind0(regFuture, channel, localAddress, promise);
        return promise;
    } else {
        // Registration future is almost always fulfilled already, but just in case it's not.
        final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
        regFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                Throwable cause = future.cause();
                if (cause != null) {
                    // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                    // IllegalStateException once we try to access the EventLoop of the Channel.
                    promise.setFailure(cause);
                } else {
                    // Registration was successful, so set the correct executor to use.
                    // See https://github.com/netty/netty/issues/2586
                    promise.executor = channel.eventLoop();
                }
                doBind0(regFuture, channel, localAddress, promise);
            }
        });
        return promise;
    }
}

doBind()方法中,首先會通過initAndRegister()方法開始將所需要的通道註冊到eventGroup當中去

final ChannelFuture initAndRegister() {
    final Channel channel = channelFactory().newChannel();
    try {
        init(channel);
    } catch (Throwable t) {
        channel.unsafe().closeForcibly();
        // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
        return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
    }

    ChannelFuture regFuture = group().register(channel);
    if (regFuture.cause() != null) {
        if (channel.isRegistered()) {
            channel.close();
        } else {
            channel.unsafe().closeForcibly();
        }
    }
首先通過channelFactory的newChannel()方法得到新的具體的通道例項。然後呼叫init()方法開始對通道進行初始化。
void init(Channel channel) throws Exception {
    final Map<ChannelOption<?>, Object> options = options();
    synchronized (options) {
        channel.config().setOptions(options);
    }

    final Map<AttributeKey<?>, Object> attrs = attrs();
    synchronized (attrs) {
        for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
            @SuppressWarnings("unchecked")
            AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
            channel.attr(key).set(e.getValue());
        }
    }

    ChannelPipeline p = channel.pipeline();
    if (handler() != null) {
        p.addLast(handler());
    }

    final EventLoopGroup currentChildGroup = childGroup;
    final ChannelHandler currentChildHandler = childHandler;
    final Entry<ChannelOption<?>, Object>[] currentChildOptions;
    final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
    synchronized (childOptions) {
        currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
    }
    synchronized (childAttrs) {
        currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
    }

    p.addLast(new ChannelInitializer<Channel>() {
        @Override
        public void initChannel(Channel ch) throws Exception {
            ch.pipeline().addLast(new ServerBootstrapAcceptor(
                    currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
        }
    });
}

init()方法在abstractBootstrap中是作為虛擬方法實現的而並沒有給出真正的具體實現可以把目光移到serverBootstrap裡有init()方法的具體實現

一開始則會在這裡這個服務端鎖生成的通道進行配置optionattr屬性的相關配置。

然後則會取出通道的責任鏈pipline,如果這個netty已經設定了相應的channelhandler則會在這裡將這個handler加入責任鏈。

如果還配置了相應的childGroupchildHandler作為channelInitializer中重寫initChannel()方法中生成的ServerBootStrapAcceptor的構造成員則會在這裡加入責任鏈的末尾

channelInitializer實現了channelchannelRegister方法

public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
    ChannelPipeline pipeline = ctx.pipeline();
    boolean success = false;
    try {
        initChannel((C) ctx.channel());
        pipeline.remove(this);
        ctx.fireChannelRegistered();
        success = true;
    } catch (Throwable t) {
        logger.warn("Failed to initialize a channel. Closing: " + ctx.channel(), t);
    } finally {
        if (pipeline.context(this) != null) {
            pipeline.remove(this);
        }
        if (!success) {
            ctx.close();
        }
    }
}

在之後的註冊通道過程中,將在這裡被呼叫這個register()方法。也就是說,這裡直接會呼叫initChannel()方法,而initChannel()方法的具體實現,在剛才serverBootstrap裡的init()方法中,被實現了。而本身,提供了這個註冊方法的通道初始化者也會被移除通道的責任鏈當中。

p.addLast(new ChannelInitializer<Channel>() {
    @Override
    public void initChannel(Channel ch) throws Exception {
        ch.pipeline().addLast(new ServerBootstrapAcceptor(
                currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
    }
});

在這裡,重寫了initChaneel()方法。在這裡,將ServerBootstrapAcceptor生成並且加入通道的責任鏈當中ServerBootstrapAcceptor重寫了channelRead()方法給出裡讀取通道資料的方式但在這裡並不是重點

在初始化完畢通道之後把目光回到abatractBootstrapinitAndRegister()方法顧名思義既然已經初始化完畢了通道那麼接下來將會去向eventLoopGroup註冊相應的通道。

eventLoopGroup中的NioEventLoopGroup的超類的超類裡MutiThreadEventExecutorGroup存在著chooser根據註冊通道時index選擇相應的singleThreadEventLoop來註冊相應所需要註冊的通道

而註冊方法也恰恰實現在了singleThreadEventLoop

public ChannelFuture register(Channel channel) {
    return register(channel, new DefaultChannelPromise(channel, this));
}

在這裡根據相應的通道和eventExcutor生成相應的channelPromise並且繼續註冊。

public ChannelFuture register(final Channel channel, final ChannelPromise promise) {
    if (channel == null) {
        throw new NullPointerException("channel");
    }
    if (promise == null) {
        throw new NullPointerException("promise");
    }

    channel.unsafe().register(this, promise);
    return promise;
}

在這裡直接通過channelunsafe方法註冊相應的eventLoop

unsasfe的註冊過程中,核心之處在於AbstractNioUnsafedoRegister()方法。

protected void doRegister() throws Exception {
    boolean selected = false;
    for (;;) {
        try {
            selectionKey = javaChannel().register(eventLoop().selector, 0, this);
            return;
        } catch (CancelledKeyException e) {
            if (!selected) {
                // Force the Selector to select now as the "canceled" SelectionKey may still be
                // cached and not removed because no Select.select(..) operation was called yet.
                eventLoop().selectNow();
                selected = true;
            } else {
                // We forced a select operation on the selector before but the SelectionKey is still cached
                // for whatever reason. JDK bug ?
                throw e;
            }
        }
    }

這裡完成真正的java通道與eventLoopselector的註冊。

在所有註冊完畢之後也就是說initAndRegister()方法的初始化與註冊都已經完畢。這樣的話,在確保註冊已經完畢之後,將會通過bind0()方法開始正式的繫結。

private static void doBind0(
        final ChannelFuture regFuture, final Channel channel,
        final SocketAddress localAddress, final ChannelPromise promise) {

    // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
    // the pipeline in its channelRegistered() implementation.
    channel.eventLoop().execute(new Runnable() {
        @Override
        public void run() {
            if (regFuture.isSuccess()) {
                channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            } else {
                promise.setFailure(regFuture.cause());
            }
        }
    });
}

在這裡,在eventLoop非同步的去將本地埠和通道去繫結,並新增通道監聽器。

channelbind()方法中,在abstractChannel中實現了bind()方法,而其中是直接呼叫了pipeLinebind()方法而在pipeline中也是直接呼叫了tailbind方法。在headContextbind()方法而是直接呼叫了Unsafebind()方法由此可以看見和註冊的過程一樣最後還是在unsafe中完成bind()的具體實現。

public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
    if (!promise.setUncancellable() || !ensureOpen(promise)) {
        return;
    }

    // See: https://github.com/netty/netty/issues/576
    if (!PlatformDependent.isWindows() && !PlatformDependent.isRoot() &&
        Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
        localAddress instanceof InetSocketAddress &&
        !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress()) {
        // Warn a user about the fact that a non-root user can't receive a
        // broadcast packet on *nix if the socket is bound on non-wildcard address.
        logger.warn(
                "A non-root user can't receive a broadcast packet if the socket " +
                "is not bound to a wildcard address; binding to a non-wildcard " +
                "address (" + localAddress + ") anyway as requested.");
    }

    boolean wasActive = isActive();
    try {
        doBind(localAddress);
    } catch (Throwable t) {
        safeSetFailure(promise, t);
        closeIfClosed();
        return;
    }

    if (!wasActive && isActive()) {
        invokeLater(new OneTimeTask() {
            @Override
            public void run() {
                pipeline.fireChannelActive();
            }
        });
    }

    safeSetSuccess(promise);
}

在這裡呼叫了doBind()方法nioServerSocketChannel中實現

protected void doBind(SocketAddress localAddress) throws Exception {
    javaChannel().socket().bind(localAddress, config.getBacklog());
}

這樣,變成了java通道的埠繫結,由此,netty的埠繫結結束。