netty 原理
阿新 • • 發佈:2018-05-23
ted 初始化 fir gravity list acc cme reat eat
netty 實現
1. 各組件之間的關系
每個ServerBootstrap與一個事件循環對象(一個線程)都會與一個Channel綁定,如NioServerSocketChannel
2. 如何綁定
在做bind操作時,會執行方法,register進行註冊
ChannelFuture regFuture = config().group().register(channel);
關鍵接口及類之間的關系:
EventLoopGroup與EventLoop及其類關系圖之間形成組合模式。
@Override
public ChannelFuture register(Channel channel) {
return next().register(channel);//選擇一個線程執行器,調用register方法,綁定channel
}
@Override
public ChannelFuture register(Channel channel) {
return register(new DefaultChannelPromise(channel, this));
}
3. 代表客戶端的channel創建
註意到ServerBootstrap有兩個EventLoopGroup,parent 負責代表客戶端channel的分發,child負責代碼客戶端channel的處理。
- Accept事件,創建代表客戶端的channel
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
try {
do {
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break ;
}
allocHandle.incMessagesRead(localRead);
} while (allocHandle.continueReading());
} catch (Throwable t) {
exception = t;
}
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = SocketUtils.accept(javaChannel());
try {
if (ch != null) {
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
logger.warn("Failed to create a new channel from an accepted socket.", t);
try {
ch.close();
} catch (Throwable t2) {
logger.warn("Failed to close a socket.", t2);
}
}
return 0;
}
註意到doReadMessages會創建代表客戶端的channel,創建完成會觸發,fireChannelRead 事件。
4. 代表客戶端的channel分發
上一節知道,channel創建完成會觸發fireChannelRead 事件。在ServerSocketChannel初始化時,會註冊ServerBootstrapAcceptor 用戶接收代表客戶端channel並分發到child EventLoopGroup中執行。
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
child.pipeline().addLast(childHandler);
setChannelOptions(child, childOptions, logger);
for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
try {
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
5. netty線程模型
不管是客戶端還是服務端可以通過一個Channel同時讀或者寫,不需要阻塞,讀寫多路復用。
6. handler鏈模型
netty 原理