我們在接收一個新的連線請求之後是如何生成一個新的socketchannel並註冊讀寫事件的呢
阿新 • • 發佈:2019-01-30
我們知道,在NioEventLoop當中,我們會迴圈處理得到的selectedKeys,呼叫的方法
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
其中有程式碼會去處理Accept事件
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead // to a spin loop if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); }
呼叫了Channel的unsafe屬性的read方法,裡面會有程式碼產生並獲取SocketChannel
int localRead = doReadMessages(readBuf);
int size = readBuf.size(); for (int i = 0; i < size; i ++) { readPending = false; pipeline.fireChannelRead(readBuf.get(i));//pipeline就會將這個訊息交給他的channelHandler處理, } readBuf.clear(); allocHandle.readComplete(); pipeline.fireChannelReadComplete();
我們知道,在初始化Channel的時候,會在這個Channel的pipeline中新增一些handler,並在註冊完成之後完成新增操作
p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel(final Channel ch) throws Exception { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null) { pipeline.addLast(handler); } ch.eventLoop().execute(new Runnable() { @Override public void run() { pipeline.addLast(new ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } })
對於ChannelInitializer,他也是一個channelHandler,但是和其他的Handler不同的是,他在註冊完成之後,完成了響應的channelAdd回撥的過程中,會執行它的initChannel方法,並將自身從pipeline中移除掉。
所以在initChannel方法中,我們又看到了 ch.eventLoop().execute,這個會向eventLoop的提交一個任務,這個任務會執行一個新增handler的任務,所以unsafe.read()方法中的
pipeline.fireChannelRead(readBuf.get(i));
會進入到ServerBootstrapAcceptor的channelRead方法中,在這裡完成對socketChannel的初始化和註冊工作。
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
child.pipeline().addLast(childHandler);
setChannelOptions(child, childOptions, logger);
for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
try {
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = SocketUtils.accept(javaChannel());//看見沒有,這個就是生產一個socketChannel的方法,基本的方法就在這個裡面
try {
if (ch != null) {
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
logger.warn("Failed to create a new channel from an accepted socket.", t);
try {
ch.close();
} catch (Throwable t2) {
logger.warn("Failed to close a socket.", t2);
}
}
return 0;
}