Netty(EventLoop 和執行緒模型)
EventLoop介面
Netty的EventLoop是協同設計的一部分,它採用了兩個基本的API:併發和網路程式設計。首先,io.netty.util.concurrent包構建在JDK的java.util.concurrent包上,用來提供執行緒執行器。其次,io.netty.channel包中的類,為了與Channel的事件進行互動,擴充套件了這些介面/類。
在這個模型中,一個EventLoop將由一個永遠都不會改變的Thread驅動, 同時任務(Runnable或者Callable)可以直接提交給EventLoop實現, 以立即執行或者排程執行。根據配置和可用核心的不同,可能會建立多個EventLoop例項用以優化資源的使用,並且單個EventLoop可能會被指派用於服務多個Channel。
需要注意的是,Netty的EventLoop在繼承了ScheduledExecutorService的同時,只定義了一個方法,parent()。這個方法,如下面的程式碼片斷所示,用於返回到當前EventLoop實現的例項所屬的EventLoopGroup的引用。
public interface EventLoop extends OrderedEventExecutor, EventLoopGroup {
@Override
EventLoopGroup parent();
}
事件/任務的執行順序 事件和任務是以先進先出(FIFO)的順序執行的。這樣可以通過保證位元組內容總是按正確的順序被處理,消除潛在的資料損壞的可能性。
通過原始碼分析可以看到所有的任務都會在這個方法中執行,而NioEventLoop的run方法最外層套了一個for (;;),它會判斷是否有任務提交,然後不斷迴圈該內部程式碼。
Netty 4 中的 I/O 和事件處理
由I/O操作觸發的事件將流經安裝了一個或者多個ChannelHandler的ChannelPipeline。傳播這些事件的方法呼叫可以隨後被Channel-Handler所攔截,並且可以按需地處理事件。在Netty4中,所有的I/O操作和事件都由已經被分配給了EventLoop的那個Thread來處理。
Netty 3 中的 I/O操作
在以前的版本中所使用的執行緒模型只保證了入站(之前稱為上游)事件會在所謂的I/O執行緒(對應於Netty4中的EventLoop)中執行。所有的出站(下游)事件都由呼叫執行緒處理,其可能是I/O執行緒也可能是別的執行緒。但是其需要在ChannelHandler中對出站事件進行仔細的同步。 簡而言之,不可能保證多個執行緒不會在同一時刻嘗試訪問出站事件。
任務排程
JDK 的任務排程API
在Java5之前,任務排程是建立在java.util.Timer類之上的,其使用了一個後臺Thread,並且具有 與標準執行緒相同的限制。隨後,JDK提供了java.util.concurrent包,它定義了interface ScheduledExecutorService。
java.util.concurrent.Executors類的工廠方法:
方法 | 描述 |
newScheduledThreadPool(int corePoolSize) newScheduledThreadPool(int corePoolSize,ThreadFactory threadFactory) |
建立一個ScheduledThreadExecutorService,用於排程命令在指定延遲之後執行或者週期性地執行。它使用corePoolSize引數來計算執行緒數 |
newSingleThreadScheduledExecutor() newSingleThreadScheduledExecutor(ThreadFactory threadFactory) |
建立一個ScheduledThreadExecutorService,用於排程命令在指定延遲之後執行或者週期性地執行。它使用一個執行緒來執行被排程的任務 |
使用 EventLoop排程任務
ScheduledExecutorService的實現具有侷限性,例如,事實上作為執行緒池管理的一部分,將會有額外的執行緒建立。如果有大量任務被緊湊地排程,那麼這將 成為一個瓶頸。Netty通過Channel的EventLoop實現任務排程解決了這一問題:
ctx.channel().eventLoop().schedule(new Runnable() {
@Override
public void run() {
System.out.println("開啟非同步執行緒");
}
},1,TimeUnit.SECONDS);//排程任務在從現在開始的1秒之後執行
如果要開啟一個週期性的任務,如心跳檢測:
ctx.channel().eventLoop().scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
ctx.writeAndFlush(Unpooled.copiedBuffer("傳送心跳"+new Date(), Charset.forName("UTF-8")));
}
},1,5, TimeUnit.SECONDS);//排程在1秒之後,並且以後每間隔5秒執行
如果想要取消這個任務可以這麼寫:
ScheduledFuture<?> future=ctx.channel().eventLoop().schedule(new Runnable() {
@Override
public void run() {
System.out.println("開啟非同步執行緒");
}
},1,TimeUnit.SECONDS);
future.cancel(false);
通過呼叫返回值future的cancel方法進行取消任務。
實現細節
執行緒管理
Netty執行緒模型的卓越效能取決於對於當前執行的Thread的身份的確定,也就是說,確定它是否是分配給當前Channel以及它的EventLoop的那一個執行緒。(EventLoop將負責處理一個Channel的整個生命週期內的所有事件。)
如果(當前)呼叫執行緒正是支撐EventLoop的執行緒,那麼所提交的程式碼塊將會被 (直接 )執行。否則,EventLoop將排程該任務以便稍後執行,並將它放入到內部佇列中。當EventLoop下次處理它的事件時,它會執行佇列中的那些任務/事件。這也就解釋了任何的Thread是如何與Channel直接互動而無需在ChannelHandler中進行額外同步的。
每個EventLoop都有它自已的任務佇列,獨立於任何其他的EventLoop。
永遠不要將一個長時間執行的任務放入到執行佇列中,因為它將阻塞需要在同一執行緒上執行的任何其他任務。
如果必須要進行阻塞呼叫或者執行長時間執行的任務,建議使用一個專門的EventExecutor。
ChannelHandler的執行和阻塞
通常ChannelPipeline中的每一個ChannelHandler都是通過它的EventLoop(I/O 執行緒)來處理傳遞給它的事件的。所以至關重要的是不要阻塞這個執行緒,因為這會對整體的I/O 處理產生負面的影響。但有時可能需要與那些使用阻塞API 的遺留程式碼進行互動。對於這種情況,ChannelPipeline有一些接受一個EventExecutorGroup的add()方法。如果一個事件被傳遞給一個自定義的EventExecutorGroup,它將被包含在這個EventExecutorGroup中的某個EventExecutor所處理,從而被從該Channel本身的EventLoop中移除。對於這種用例,Netty提供了一個叫DefaultEventExecutorGroup的預設實現。
EventLoop/執行緒的分配
服務於Channel的I/O和事件的EventLoop包含在EventLoopGroup中。根據不同的傳輸實現,EventLoop的建立和分配方式也不同。
非同步傳輸
非同步傳輸實現只使用了少量的EventLoop(以及和它們相關聯的Thread), 而且在當前的執行緒模型中,它們可能會被多個Channel所共享。這使得可以通過儘可能少量的Thread來支撐大量的Channel,而不是每個Channel分配一個Thread。
EventLoopGroup負責為每個新建立的Channel分配一個EventLoop。在當前實現中,使用順序迴圈(round-robin)的方式進行分配以獲取一個均衡的分佈,並且相同的EventLoop可能會被分配給多個Channel。(這一點在將來的版本中可能會改變。)
一旦一個Channel被分配給一個EventLoop,它將在它的整個生命週期中都使用這個EventLoop(以及相關聯的Thread) 。
需要注意的是,EventLoop的分配方式對ThreadLocal的使用的影響。因為一個EventLoop通常會被用於支撐多個Channel,所以對於所有相關聯的Channel來說,ThreadLocal都將是一樣的。這使得它對於實現狀態追蹤等功能來說是個糟糕的選擇。然而,在一些無狀態的上下文中,它仍然可以被用於在多個Channel之間共享一些重度的或者代價昂貴的物件,甚至是事件。
阻塞傳輸
用於像OIO( 舊的阻塞I/O)這樣的其他傳輸的設計略有不同。這裡每一個Channel都將被分配給一個EventLoop(以及它的Thread)。
但是,正如同之前一樣,得到的保證是每個Channel的I/O事件都將只會被一個Thread(用於支撐該Channel的EventLoop的那個Thread)處理。
參考《Netty實戰》
附:
package netty.in.action;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import java.nio.charset.Charset;
public class EchoServer {
final ByteBuf bufs= Unpooled.copiedBuffer("Hello,劉德華", Charset.forName("UTF-8"));
public void bind(int port) throws Exception {
//配置服務端的NIO執行緒組
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.childHandler(new ChildChannelHandler());
// 繫結埠,同步等待成功
ChannelFuture f=b.bind(port).sync();
//等待服務端監聽埠關閉
f.channel().closeFuture().sync();
} finally {
//退出,釋放執行緒池資源
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
protected void initChannel(SocketChannel ch) throws Exception {
System.out.println("服務端啟動……");
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
ByteBuf buf = (ByteBuf) msg;
if(buf.hasArray()){
byte[] array=buf.array();//返回該緩衝區的備份位元組陣列。
int offset=buf.arrayOffset()+buf.readerIndex();//計算第一個位元組的偏移量
int length=buf.readableBytes();//獲取可讀位元組數
String s=new String(array,offset,length);
System.out.println("s="+s);
}else{
byte[] array = new byte[buf.readableBytes()];//獲取可讀位元組數並分配一個新的陣列來儲存
buf.getBytes(buf.readerIndex(),array);//將位元組複製到該陣列
String s=new String(array,0,buf.readableBytes());
System.out.println("直接緩衝區:"+s);
}
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req, "UTF-8");
System.out.println(body);
bufs.retain();//引用計數器加一
ChannelFuture future=ctx.writeAndFlush(bufs);
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess())
System.out.println("成功");
else{
System.out.println("失敗");
future.cause().printStackTrace();
future.channel().close();
}
}
});
// ctx.close();
}
});
}
}
public static void main(String[] args) throws Exception {
int port=8080;
new EchoServer().bind(port);
}
}
package netty.in.action;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufProcessor;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.ByteProcessor;
import io.netty.util.concurrent.ScheduledFuture;
import java.io.UnsupportedEncodingException;
import java.nio.charset.Charset;
import java.util.Date;
import java.util.concurrent.TimeUnit;
public class EchoClient {
final ByteBuf buf= Unpooled.copiedBuffer("Hello,王寶強", Charset.forName("UTF-8"));
public void connect(int port, String host) throws Exception {
// 配置客戶端NIO執行緒組
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChildChannelHandler2() );
// 發起非同步連線操作
ChannelFuture f = b.connect(host, port).sync();
// 等待客戶端鏈路關閉
f.channel().closeFuture().sync();
} finally {
// 優雅退出,釋放NIO執行緒組
group.shutdownGracefully();
}
}
private class ChildChannelHandler2 extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
System.out.println("客戶端啟動……");
ch.pipeline().addLast("text",new ChannelInboundHandlerAdapter() {
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(buf);
}
public void channelRead(ChannelHandlerContext ctx, Object msg) throws UnsupportedEncodingException {
ByteBuf buf = (ByteBuf) msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req, "UTF-8");
System.out.println(body);
ScheduledFuture<?> future=ctx.channel().eventLoop().schedule(new Runnable() {
@Override
public void run() {
System.out.println("開啟非同步執行緒");
}
},1,TimeUnit.SECONDS);
// future.cancel(false);
ctx.channel().eventLoop().scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
ctx.writeAndFlush(Unpooled.copiedBuffer("傳送心跳"+new Date(), Charset.forName("UTF-8")));
}
},1,5, TimeUnit.SECONDS);
}
});
}
}
private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
protected void initChannel(SocketChannel ch) throws Exception {
System.out.println("客戶端啟動……");
ByteBuf bufs= Unpooled.copiedBuffer("pipeline傳送的資料->", Charset.forName("UTF-8"));
ch.pipeline().write(bufs);//通過呼叫ChannelPipeline的write方法將資料寫入通道,但是不重新整理
ch.pipeline().addLast("text",new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.channel().write(Unpooled.copiedBuffer("通過ChannelHandlerContext獲取的channel傳送的訊息->",
Charset.forName("UTF-8")));//通過ChannelHandlerContext獲取的channel傳送的訊息->
CompositeByteBuf messageBuf=Unpooled.compositeBuffer();
ByteBuf headerBuf=buf;
ByteBuf bodyBuf=buf;
messageBuf.addComponent(bodyBuf);//將ByteBuf例項追加到CompositeByteBuf
messageBuf.addComponent(headerBuf);
for (ByteBuf buf:messageBuf){//遍歷所有ByteBuf
System.out.println(buf);
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req, "UTF-8");
System.out.println("複合緩衝區:"+body);
}
ctx.writeAndFlush(buf);
}
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
ByteBuf buf = (ByteBuf) msg;
ByteBuf copyBuf=((ByteBuf) msg).copy();
// System.out.println(buf.refCnt());//返回此物件的引用計數。如果為0,則表示此物件已被釋放。
// buf.release();//釋放引用計數物件
for (int i = 0; i < buf.capacity(); i++) {
byte b=buf.getByte(i);
if((char)b>='a'&&(char)b<='z'||(char)b>='A'&&(char)b<='Z'||(char)b==',')
System.out.println("i="+(char)b);
}
int i=buf.forEachByte(new ByteProcessor() {
@Override
public boolean process(byte value) throws Exception {
byte[] b=",".getBytes();
if (b[0]!=value)
return true;
else
return false;
}
});
System.out.println("i="+i+" value="+(char) buf.getByte(i));
ByteBuf sliced = buf.slice(0,2);
sliced.setByte(0,(byte)'h');
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req, "UTF-8");
System.out.println(body);
ctx.fireChannelRead(copyBuf);
}
});
ch.pipeline().addLast("text2",new ChannelInboundHandlerAdapter(){
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
ByteBuf buf = (ByteBuf) msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req, "UTF-8");
System.out.println("text2:"+body);
ByteBuf bufs= Unpooled.copiedBuffer("test2傳送的資料", Charset.forName("UTF-8"));
ctx.writeAndFlush(bufs);
ctx.close();
}
});
// ch.pipeline().remove("text2");
}
}
public static void main(String[] args) throws Exception {
int port = 8080;
new EchoClient().connect(port, "127.0.0.1");
}
}