netty5原始碼分析(4)--學習筆記
NioMessageUnsafe註冊EventLoop和promise
Unsafe介面
/** * <em>Unsafe</em> operations that should <em>never</em> be called from user-code. These methods * are only provided to implement the actual transport, and must be invoked from an I/O thread except for the * following methods: * <ul> * <li>{@link #invoker()}</li> * <li>{@link #localAddress()}</li> * <li>{@link #remoteAddress()}</li> * <li>{@link #closeForcibly()}</li> * <li>{@link #register(EventLoop, ChannelPromise)}</li> * <li>{@link #deregister(ChannelPromise)}</li> * <li>{@link #voidPromise()}</li> * </ul> */ interface Unsafe { /** * Return the assigned {@link RecvByteBufAllocator.Handle} which will be used to allocate {@link ByteBuf}'s when * receiving data. */ RecvByteBufAllocator.Handle recvBufAllocHandle(); /** * Returns the {@link ChannelHandlerInvoker} which is used by default unless specified by a user. */ ChannelHandlerInvoker invoker(); /** * Return the {@link SocketAddress} to which is bound local or * {@code null} if none. */ SocketAddress localAddress(); /** * Return the {@link SocketAddress} to which is bound remote or * {@code null} if none is bound yet. */ SocketAddress remoteAddress(); /** * Register the {@link Channel} of the {@link ChannelPromise} and notify * the {@link ChannelFuture} once the registration was complete. * <p> * It's only safe to submit a new task to the {@link EventLoop} from within a * {@link ChannelHandler} once the {@link ChannelPromise} succeeded. Otherwise * the task may or may not be rejected. * </p> */ void register(EventLoop eventLoop, ChannelPromise promise); /** * Bind the {@link SocketAddress} to the {@link Channel} of the {@link ChannelPromise} and notify * it once its done. */ void bind(SocketAddress localAddress, ChannelPromise promise); /** * Connect the {@link Channel} of the given {@link ChannelFuture} with the given remote {@link SocketAddress}. * If a specific local {@link SocketAddress} should be used it need to be given as argument. Otherwise just * pass {@code null} to it. * * The {@link ChannelPromise} will get notified once the connect operation was complete. */ void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise); /** * Disconnect the {@link Channel} of the {@link ChannelFuture} and notify the {@link ChannelPromise} once the * operation was complete. */ void disconnect(ChannelPromise promise); /** * Close the {@link Channel} of the {@link ChannelPromise} and notify the {@link ChannelPromise} once the * operation was complete. */ void close(ChannelPromise promise); /** * Closes the {@link Channel} immediately without firing any events. Probably only useful * when registration attempt failed. */ void closeForcibly(); /** * Deregister the {@link Channel} of the {@link ChannelPromise} from {@link EventLoop} and notify the * {@link ChannelPromise} once the operation was complete. */ void deregister(ChannelPromise promise); /** * Schedules a read operation that fills the inbound buffer of the first {@link ChannelHandler} in the * {@link ChannelPipeline}. If there's already a pending read operation, this method does nothing. */ void beginRead(); /** * Schedules a write operation. */ void write(Object msg, ChannelPromise promise); /** * Flush out all write operations scheduled via {@link #write(Object, ChannelPromise)}. */ void flush(); /** * Return a special ChannelPromise which can be reused and passed to the operations in {@link Unsafe}. * It will never be notified of a success or error and so is only a placeholder for operations * that take a {@link ChannelPromise} as argument but for which you not want to get notified. */ ChannelPromise voidPromise(); /** * Returns the {@link ChannelOutboundBuffer} of the {@link Channel} where the pending write requests are stored. */ ChannelOutboundBuffer outboundBuffer(); }
Handle分配bytebuf
/** * Allocates a new receive buffer whose capacity is probably large enough to read all inbound data and small enough * not to waste its space. */ public interface RecvByteBufAllocator { /** * Creates a new handle. The handle provides the actual operations and keeps the internal information which is * required for predicting an optimal buffer capacity. */ Handle newHandle(); interface Handle { /** * Creates a new receive buffer whose capacity is probably large enough to read all inbound data and small * enough not to waste its space. */ ByteBuf allocate(ByteBufAllocator alloc); /** * Similar to {@link #allocate(ByteBufAllocator)} except that it does not allocate anything but just tells the * capacity. */ int guess(); /** * Records the the actual number of read bytes in the previous read operation so that the allocator allocates * the buffer with potentially more correct capacity. * * @param actualReadBytes the actual number of read bytes in the previous read operation */ void record(int actualReadBytes); } }
register方法設定channel的EventLoop 也就是NioEventLoop,並且
eventLoop.execute(new OneTimeTask() {
@Override
public void run() {
register0(promise);
}
});
通過執行execute(Runable)方法設定SingleThreadEventExecutor裡的thread物件,用於判斷eventLoop.inEventLoop()..(原來是在這賦值的)
boolean firstRegistration = neverRegistered; doRegister(); neverRegistered = false; registered = true; eventLoop.acceptNewTasks(); safeSetSuccess(promise); pipeline.fireChannelRegistered();
接著呼叫AbstractNioChannel的doRegister(),將ServerSocketChannel註冊到NioEventLoop的Selector上
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(((NioEventLoop) eventLoop().unwrap()).selector, 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
// Force the Selector to select now as the "canceled" SelectionKey may still be
// cached and not removed because no Select.select(..) operation was called yet.
((NioEventLoop) eventLoop().unwrap()).selectNow();
selected = true;
} else {
// We forced a select operation on the selector before but the SelectionKey is still cached
// for whatever reason. JDK bug ?
throw e;
}
}
}
}
javaChannel()是NioServerSocketChannel建立的時候的serversocketchannel伺服器套接字通道,pipeline.fireChannelRegistered();pipeline在NioServerSocketChannel初始化是構造,把NioServerSocketChannel當前傳入
DefaultChannelPipeline類構造方法 head的下一個contexthandler是TailContext,tail前一個是head,2個Context裡方法上@Skip標記的方法不同,用來區別inbound和outbound,
DefaultChannelPipeline(AbstractChannel channel) {
if (channel == null) {
throw new NullPointerException("channel");
}
this.channel = channel;
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
平常程式碼裡經常呼叫的pipeline.addLast就是把我們寫的inbound和outbound載入連結串列中
@Override
public ChannelPipeline addLast(String name, ChannelHandler handler) {
return addLast((ChannelHandlerInvoker) null, name, handler);
}
@Override
public ChannelPipeline addLast(ChannelHandlerInvoker invoker, String name, ChannelHandler handler) {
synchronized (this) {
name = filterName(name, handler);
addLast0(name, new DefaultChannelHandlerContext(this, invoker, name, handler));
}
return this;
}
private void addLast0(final String name, AbstractChannelHandlerContext newCtx) {
checkMultiplicity(newCtx);
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
tail.prev = newCtx;
name2ctx.put(name, newCtx);
callHandlerAdded(newCtx);
}
生成AbstractChannelHandlerContext的tail,head
public ChannelPipeline fireChannelRegistered() {
head.fireChannelRegistered();
return this;
}
HeadContext是AbstractChannelHandlerContext子類,AbstractChannelHandlerContext裡持有AbstractChannel,DefaultChannelPipeline,ChannelHandlerInvoker,
@Override
public ChannelHandlerContext fireChannelRegistered() {
AbstractChannelHandlerContext next = findContextInbound();
next.invoker().invokeChannelRegistered(next);
return this;
}
next.invoker().invokeChannelRegistered(next);
掉的預設的 DefaultChannelHandlerInvoker
@Override
public void invokeChannelRegistered(final ChannelHandlerContext ctx) {
if (executor.inEventLoop()) {
invokeChannelRegisteredNow(ctx);
} else {
executor.execute(new OneTimeTask() {
@Override
public void run() {
invokeChannelRegisteredNow(ctx);
}
});
}
}
ChannelHandlerInvokerUtil類
public static void invokeChannelRegisteredNow(ChannelHandlerContext ctx) {
try {
ctx.handler().channelRegistered(ctx);
} catch (Throwable t) {
notifyHandlerException(ctx, t);
}
}
說白了就是模板模式,呼叫的我們自己的handler的響應方法
在ChannelHandlerContext連結串列結構裡查詢Inbound的Context,ctx.skipFlags的生成通過反射HeadContext,根據方法名和@Skip註解生成
static int skipFlags0(Class<? extends ChannelHandler> handlerType) {
int flags = 0;
try {
if (isSkippable(handlerType, "handlerAdded")) {
flags |= MASK_HANDLER_ADDED;
}
if (isSkippable(handlerType, "handlerRemoved")) {
flags |= MASK_HANDLER_REMOVED;
}
if (isSkippable(handlerType, "exceptionCaught", Throwable.class)) {
flags |= MASK_EXCEPTION_CAUGHT;
}
if (isSkippable(handlerType, "channelRegistered")) {
flags |= MASK_CHANNEL_REGISTERED;
}
if (isSkippable(handlerType, "channelUnregistered")) {
flags |= MASK_CHANNEL_UNREGISTERED;
}
if (isSkippable(handlerType, "channelActive")) {
flags |= MASK_CHANNEL_ACTIVE;
}
if (isSkippable(handlerType, "channelInactive")) {
flags |= MASK_CHANNEL_INACTIVE;
}
if (isSkippable(handlerType, "channelRead", Object.class)) {
flags |= MASK_CHANNEL_READ;
}
if (isSkippable(handlerType, "channelReadComplete")) {
flags |= MASK_CHANNEL_READ_COMPLETE;
}
if (isSkippable(handlerType, "channelWritabilityChanged")) {
flags |= MASK_CHANNEL_WRITABILITY_CHANGED;
}
if (isSkippable(handlerType, "userEventTriggered", Object.class)) {
flags |= MASK_USER_EVENT_TRIGGERED;
}
if (isSkippable(handlerType, "bind", SocketAddress.class, ChannelPromise.class)) {
flags |= MASK_BIND;
}
if (isSkippable(handlerType, "connect", SocketAddress.class, SocketAddress.class, ChannelPromise.class)) {
flags |= MASK_CONNECT;
}
if (isSkippable(handlerType, "disconnect", ChannelPromise.class)) {
flags |= MASK_DISCONNECT;
}
if (isSkippable(handlerType, "close", ChannelPromise.class)) {
flags |= MASK_CLOSE;
}
if (isSkippable(handlerType, "deregister", ChannelPromise.class)) {
flags |= MASK_DEREGISTER;
}
if (isSkippable(handlerType, "read")) {
flags |= MASK_READ;
}
if (isSkippable(handlerType, "write", Object.class, ChannelPromise.class)) {
flags |= MASK_WRITE;
}
if (isSkippable(handlerType, "flush")) {
flags |= MASK_FLUSH;
}
} catch (Exception e) {
// Should never reach here.
PlatformDependent.throwException(e);
}
return flags;
}
AbstractChannelHandlerContext類
static final int MASK_HANDLER_ADDED = 1;
static final int MASK_HANDLER_REMOVED = 1 << 1;
private static final int MASK_EXCEPTION_CAUGHT = 1 << 2;
private static final int MASK_CHANNEL_REGISTERED = 1 << 3;
private static final int MASK_CHANNEL_UNREGISTERED = 1 << 4;
private static final int MASK_CHANNEL_ACTIVE = 1 << 5;
private static final int MASK_CHANNEL_INACTIVE = 1 << 6;
private static final int MASK_CHANNEL_READ = 1 << 7;
private static final int MASK_CHANNEL_READ_COMPLETE = 1 << 8;
private static final int MASK_CHANNEL_WRITABILITY_CHANGED = 1 << 9;
private static final int MASK_USER_EVENT_TRIGGERED = 1 << 10;
private static final int MASK_BIND = 1 << 11;
private static final int MASK_CONNECT = 1 << 12;
private static final int MASK_DISCONNECT = 1 << 13;
private static final int MASK_CLOSE = 1 << 14;
private static final int MASK_DEREGISTER = 1 << 15;
private static final int MASK_READ = 1 << 16;
private static final int MASK_WRITE = 1 << 17;
private static final int MASK_FLUSH = 1 << 18;
private static final int MASKGROUP_INBOUND = MASK_EXCEPTION_CAUGHT |
MASK_CHANNEL_REGISTERED |
MASK_CHANNEL_UNREGISTERED |
MASK_CHANNEL_ACTIVE |
MASK_CHANNEL_INACTIVE |
MASK_CHANNEL_READ |
MASK_CHANNEL_READ_COMPLETE |
MASK_CHANNEL_WRITABILITY_CHANGED |
MASK_USER_EVENT_TRIGGERED;
private static final int MASKGROUP_OUTBOUND = MASK_BIND |
MASK_CONNECT |
MASK_DISCONNECT |
MASK_CLOSE |
MASK_DEREGISTER |
MASK_READ |
MASK_WRITE |
MASK_FLUSH;
通過@Skip過濾ChannelHandlerContext
private AbstractChannelHandlerContext findContextInbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.next;
} while ((ctx.skipFlags & MASKGROUP_INBOUND) == MASKGROUP_INBOUND);
return ctx;
}
到這裡bootstrap裡initAndRegister()方法執行完畢,再次回到boostrap的bind()方法,doBind(final SocketAddress localAddress) 繼續
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.executor = channel.eventLoop();
}
doBind0(regFuture, channel, localAddress, promise);
}
});
return promise;
}
}
過會繼續..