     * <em>Unsafe</em> operations that should <em>never</em> be called from user-code. These methods
     * are only provided to implement the actual transport, and must be invoked from an I/O thread except for the
     * following methods:
     * <ul>
     *   <li>{@link #invoker()}</li>
     *   <li>{@link #localAddress()}</li>
     *   <li>{@link #remoteAddress()}</li>
     *   <li>{@link #closeForcibly()}</li>
     *   <li>{@link #register(EventLoop, ChannelPromise)}</li>
     *   <li>{@link #deregister(ChannelPromise)}</li>
     *   <li>{@link #voidPromise()}</li>
     * </ul>
    interface Unsafe {

         * Return the assigned {@link RecvByteBufAllocator.Handle} which will be used to allocate {@link ByteBuf}'s when
         * receiving data.
        RecvByteBufAllocator.Handle recvBufAllocHandle(); 

         * Returns the {@link ChannelHandlerInvoker} which is used by default unless specified by a user.
        ChannelHandlerInvoker invoker();

         * Return the {@link SocketAddress} to which is bound local or
         * {@code null} if none.
        SocketAddress localAddress();

         * Return the {@link SocketAddress} to which is bound remote or
         * {@code null} if none is bound yet.
        SocketAddress remoteAddress();

         * Register the {@link Channel} of the {@link ChannelPromise} and notify
         * the {@link ChannelFuture} once the registration was complete.
         * <p>
         * It's only safe to submit a new task to the {@link EventLoop} from within a
         * {@link ChannelHandler} once the {@link ChannelPromise} succeeded. Otherwise
         * the task may or may not be rejected.
         * </p>
        void register(EventLoop eventLoop, ChannelPromise promise);

         * Bind the {@link SocketAddress} to the {@link Channel} of the {@link ChannelPromise} and notify
         * it once its done.
        void bind(SocketAddress localAddress, ChannelPromise promise);

         * Connect the {@link Channel} of the given {@link ChannelFuture} with the given remote {@link SocketAddress}.
         * If a specific local {@link SocketAddress} should be used it need to be given as argument. Otherwise just
         * pass {@code null} to it.
         * The {@link ChannelPromise} will get notified once the connect operation was complete.
        void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise);

         * Disconnect the {@link Channel} of the {@link ChannelFuture} and notify the {@link ChannelPromise} once the
         * operation was complete.
        void disconnect(ChannelPromise promise);

         * Close the {@link Channel} of the {@link ChannelPromise} and notify the {@link ChannelPromise} once the
         * operation was complete.
        void close(ChannelPromise promise);

         * Closes the {@link Channel} immediately without firing any events.  Probably only useful
         * when registration attempt failed.
        void closeForcibly();

         * Deregister the {@link Channel} of the {@link ChannelPromise} from {@link EventLoop} and notify the
         * {@link ChannelPromise} once the operation was complete.
        void deregister(ChannelPromise promise);

         * Schedules a read operation that fills the inbound buffer of the first {@link ChannelHandler} in the
         * {@link ChannelPipeline}.  If there's already a pending read operation, this method does nothing.
        void beginRead();

         * Schedules a write operation.
        void write(Object msg, ChannelPromise promise);

         * Flush out all write operations scheduled via {@link #write(Object, ChannelPromise)}.
        void flush();

         * Return a special ChannelPromise which can be reused and passed to the operations in {@link Unsafe}.
         * It will never be notified of a success or error and so is only a placeholder for operations
         * that take a {@link ChannelPromise} as argument but for which you not want to get notified.
        ChannelPromise voidPromise();

         * Returns the {@link ChannelOutboundBuffer} of the {@link Channel} where the pending write requests are stored.
        ChannelOutboundBuffer outboundBuffer();

 * Allocates a new receive buffer whose capacity is probably large enough to read all inbound data and small enough
 * not to waste its space.
public interface RecvByteBufAllocator {

     * Creates a new handle.  The handle provides the actual operations and keeps the internal information which is
     * required for predicting an optimal buffer capacity.
    Handle newHandle();

    interface Handle {
         * Creates a new receive buffer whose capacity is probably large enough to read all inbound data and small
         * enough not to waste its space.
        ByteBuf allocate(ByteBufAllocator alloc);

         * Similar to {@link #allocate(ByteBufAllocator)} except that it does not allocate anything but just tells the
         * capacity.
        int guess();

         * Records the the actual number of read bytes in the previous read operation so that the allocator allocates
         * the buffer with potentially more correct capacity.
         * @param actualReadBytes the actual number of read bytes in the previous read operation
        void record(int actualReadBytes);

register方法設定channel的EventLoop 也就是NioEventLoop,並且

 eventLoop.execute(new OneTimeTask() {
                        public void run() {
                boolean firstRegistration = neverRegistered;
                neverRegistered = false;
                registered = true;


    protected void doRegister() throws Exception {
        boolean selected = false;
        for (;;) {
            try {
                selectionKey = javaChannel().register(((NioEventLoop) eventLoop().unwrap()).selector, 0, this);
            } catch (CancelledKeyException e) {
                if (!selected) {
                    // Force the Selector to select now as the "canceled" SelectionKey may still be
                    // cached and not removed because no Select.select(..) operation was called yet.
                    ((NioEventLoop) eventLoop().unwrap()).selectNow();
                    selected = true;
                } else {
                    // We forced a select operation on the selector before but the SelectionKey is still cached
                    // for whatever reason. JDK bug ?
                    throw e;


DefaultChannelPipeline類構造方法 head的下一個contexthandler是TailContext,tail前一個是head,2個Context裡方法上@Skip標記的方法不同,用來區別inbound和outbound,

    DefaultChannelPipeline(AbstractChannel channel) {
        if (channel == null) {
            throw new NullPointerException("channel");
        this.channel = channel;

        tail = new TailContext(this);
        head = new HeadContext(this);

        head.next = tail;
        tail.prev = head;


    public ChannelPipeline addLast(String name, ChannelHandler handler) {
        return addLast((ChannelHandlerInvoker) null, name, handler);

    public ChannelPipeline addLast(ChannelHandlerInvoker invoker, String name, ChannelHandler handler) {
        synchronized (this) {
            name = filterName(name, handler);
            addLast0(name, new DefaultChannelHandlerContext(this, invoker, name, handler));
        return this;

    private void addLast0(final String name, AbstractChannelHandlerContext newCtx) {

        AbstractChannelHandlerContext prev = tail.prev;
        newCtx.prev = prev;
        newCtx.next = tail;
        prev.next = newCtx;
        tail.prev = newCtx;

        name2ctx.put(name, newCtx);



    public ChannelPipeline fireChannelRegistered() {
        return this;


    public ChannelHandlerContext fireChannelRegistered() {
        AbstractChannelHandlerContext next = findContextInbound();
        return this;


掉的預設的 DefaultChannelHandlerInvoker

    public void invokeChannelRegistered(final ChannelHandlerContext ctx) {
        if (executor.inEventLoop()) {
        } else {
            executor.execute(new OneTimeTask() {
                public void run() {

    public static void invokeChannelRegisteredNow(ChannelHandlerContext ctx) {
        try {
        } catch (Throwable t) {
            notifyHandlerException(ctx, t);



 static int skipFlags0(Class<? extends ChannelHandler> handlerType) {
        int flags = 0;
        try {
            if (isSkippable(handlerType, "handlerAdded")) {
                flags |= MASK_HANDLER_ADDED;
            if (isSkippable(handlerType, "handlerRemoved")) {
                flags |= MASK_HANDLER_REMOVED;
            if (isSkippable(handlerType, "exceptionCaught", Throwable.class)) {
                flags |= MASK_EXCEPTION_CAUGHT;
            if (isSkippable(handlerType, "channelRegistered")) {
                flags |= MASK_CHANNEL_REGISTERED;
            if (isSkippable(handlerType, "channelUnregistered")) {
                flags |= MASK_CHANNEL_UNREGISTERED;
            if (isSkippable(handlerType, "channelActive")) {
                flags |= MASK_CHANNEL_ACTIVE;
            if (isSkippable(handlerType, "channelInactive")) {
                flags |= MASK_CHANNEL_INACTIVE;
            if (isSkippable(handlerType, "channelRead", Object.class)) {
                flags |= MASK_CHANNEL_READ;
            if (isSkippable(handlerType, "channelReadComplete")) {
                flags |= MASK_CHANNEL_READ_COMPLETE;
            if (isSkippable(handlerType, "channelWritabilityChanged")) {
                flags |= MASK_CHANNEL_WRITABILITY_CHANGED;
            if (isSkippable(handlerType, "userEventTriggered", Object.class)) {
                flags |= MASK_USER_EVENT_TRIGGERED;
            if (isSkippable(handlerType, "bind", SocketAddress.class, ChannelPromise.class)) {
                flags |= MASK_BIND;
            if (isSkippable(handlerType, "connect", SocketAddress.class, SocketAddress.class, ChannelPromise.class)) {
                flags |= MASK_CONNECT;
            if (isSkippable(handlerType, "disconnect", ChannelPromise.class)) {
                flags |= MASK_DISCONNECT;
            if (isSkippable(handlerType, "close", ChannelPromise.class)) {
                flags |= MASK_CLOSE;
            if (isSkippable(handlerType, "deregister", ChannelPromise.class)) {
                flags |= MASK_DEREGISTER;
            if (isSkippable(handlerType, "read")) {
                flags |= MASK_READ;
            if (isSkippable(handlerType, "write", Object.class, ChannelPromise.class)) {
                flags |= MASK_WRITE;
            if (isSkippable(handlerType, "flush")) {
                flags |= MASK_FLUSH;
        } catch (Exception e) {
            // Should never reach here.

        return flags;


    static final int MASK_HANDLER_ADDED = 1;
    static final int MASK_HANDLER_REMOVED = 1 << 1;

    private static final int MASK_EXCEPTION_CAUGHT = 1 << 2;
    private static final int MASK_CHANNEL_REGISTERED = 1 << 3;
    private static final int MASK_CHANNEL_UNREGISTERED = 1 << 4;
    private static final int MASK_CHANNEL_ACTIVE = 1 << 5;
    private static final int MASK_CHANNEL_INACTIVE = 1 << 6;
    private static final int MASK_CHANNEL_READ = 1 << 7;
    private static final int MASK_CHANNEL_READ_COMPLETE = 1 << 8;
    private static final int MASK_CHANNEL_WRITABILITY_CHANGED = 1 << 9;
    private static final int MASK_USER_EVENT_TRIGGERED = 1 << 10;

    private static final int MASK_BIND = 1 << 11;
    private static final int MASK_CONNECT = 1 << 12;
    private static final int MASK_DISCONNECT = 1 << 13;
    private static final int MASK_CLOSE = 1 << 14;
    private static final int MASK_DEREGISTER = 1 << 15;
    private static final int MASK_READ = 1 << 16;
    private static final int MASK_WRITE = 1 << 17;
    private static final int MASK_FLUSH = 1 << 18;

    private static final int MASKGROUP_INBOUND = MASK_EXCEPTION_CAUGHT |
            MASK_CHANNEL_READ |

    private static final int MASKGROUP_OUTBOUND = MASK_BIND |
            MASK_CONNECT |
            MASK_DISCONNECT |
            MASK_CLOSE |
            MASK_DEREGISTER |
            MASK_READ |
            MASK_WRITE |

    private AbstractChannelHandlerContext findContextInbound() {
        AbstractChannelHandlerContext ctx = this;
        do {
            ctx = ctx.next;
        } while ((ctx.skipFlags & MASKGROUP_INBOUND) == MASKGROUP_INBOUND);
        return ctx;

到這裡bootstrap裡initAndRegister()方法執行完畢,再次回到boostrap的bind()方法,doBind(final SocketAddress localAddress) 繼續
 private ChannelFuture doBind(final SocketAddress localAddress) {
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();
        if (regFuture.cause() != null) {
            return regFuture;

        if (regFuture.isDone()) {
            // At this point we know that the registration was complete and successful.
            ChannelPromise promise = channel.newPromise();
            doBind0(regFuture, channel, localAddress, promise);
            return promise;
        } else {
            // Registration future is almost always fulfilled already, but just in case it's not.
            final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
            regFuture.addListener(new ChannelFutureListener() {
                public void operationComplete(ChannelFuture future) throws Exception {
                    Throwable cause = future.cause();
                    if (cause != null) {
                        // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                        // IllegalStateException once we try to access the EventLoop of the Channel.
                    } else {
                        // Registration was successful, so set the correct executor to use.
                        // See https://github.com/netty/netty/issues/2586
                        promise.executor = channel.eventLoop();
                    doBind0(regFuture, channel, localAddress, promise);
            return promise;
