Netty(ChannelHandler 和 ChannelPipeline)
ChannelHandler家族
channel的生命週期
Interface Channel定義了一組和ChannelInboundHandler API密切相關的簡單但功能強大的狀態模型,其Channel主要有4個狀態。
狀態 |
描述 |
ChannelUnregistered |
Channel 已經被建立,但還未註冊到EventLoop |
ChannelRegistered |
Channel 已經被註冊到了EventLoop |
ChannelActive |
Channel 處於活動狀態(已經連線到它的遠端節點)。它現在可以接收和傳送資料了 |
ChannelInactive |
Channel 沒有連線到遠端節點 |
當這些狀態發生改變時,將會生成對應的事件。這些事件將會被轉發給ChannelPipeline中的ChannelHandler,其可以隨後對它們做出響應。
ChannelHandler的生命週期
interface ChannelHandler定義的生命週期操作如下所示,在ChannelHandler被新增到ChannelPipeline中或者被ChannelPipeline中移除時會呼叫 這些操作。這些方法中的每一個都接受一個ChannelHandlerContext引數。
狀態 |
描述 |
handlerAdded |
當把 ChannelHandler 新增到 ChannelPipeline 中時被呼叫 |
handlerRemoved |
當從 ChannelPipeline 中移除 ChannelHandler 時被呼叫 |
exceptionCaught |
當處理過程中在 ChannelPipeline 中有錯誤時被呼叫 |
ChannelHandler有兩個重要的子介面:
- ChannelInboundHandler——處理入站資料以及各種狀態變化;
- ChannelOutboundHandler——處理出站資料並且允許攔截所有的操作。
ChannelInboundHandler介面
以下方法將會在資料被接收時或者 與其對應的Channel狀態發生改變時被呼叫
/**
* {@link ChannelHandler} which adds callbacks for state changes. This allows the user
* to hook in to state changes easily.
*/
public interface ChannelInboundHandler extends ChannelHandler {
/**
* 當Channel已經註冊到它的EventLoop並且能夠處理I/O時被呼叫
*/
void channelRegistered(ChannelHandlerContext ctx) throws Exception;
/**
* 當Channel從它的EventLoop登出並且無法處理任何I/O時被呼叫
*/
void channelUnregistered(ChannelHandlerContext ctx) throws Exception;
/**
* 當Channel處於活動狀態時被呼叫;Channel已經連線/繫結並且已經就緒
*/
void channelActive(ChannelHandlerContext ctx) throws Exception;
/**
* 當Channel離開活動狀態並且不再連線它的遠端節點時被呼叫
*/
void channelInactive(ChannelHandlerContext ctx) throws Exception;
/**
* 當從Channel讀取資料時被呼叫
*/
void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;
/**
* 當Channel上的一個讀操作完成時被呼叫
*/
void channelReadComplete(ChannelHandlerContext ctx) throws Exception;
/**
* 當ChannelnboundHandler.fireUserEventTriggered()方法被呼叫時被呼叫,因為一個POJO被傳經了ChannelPipeline
*/
void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception;
/**
* 當Channel的可寫狀態發生改變時被呼叫。使用者可以確保寫操作不會完成得太快(以避免發生OutOfMemoryError)或者可以在Channel變為再次可寫時恢復寫入。可以通過呼叫Channel的isWritable()方法來檢測Channel的可寫性。與可寫性相關的閾值可以通過Channel.config().setWriteHighWaterMark()和Channel.config().setWriteLowWater-Mark()方法來設定
*/
void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception;
/**
* 如果丟擲一個可丟擲的異常物件,則呼叫。
*/
@Override
@SuppressWarnings("deprecation")
void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
}
ChannelOutboundHandler介面
出站操作和資料將由ChannelOutboundHandler處理。它的方法將被Channel、ChannelPipeline以及ChannelHandlerContext呼叫。ChannelOutboundHandler的一個強大的功能是可以按需推遲操作或者事件,這使得可以通過一些複雜的方法來處理請求。例如,如果到遠端節點的寫入被暫停了,那麼你可以推遲沖刷操作並在稍後繼續。
public interface ChannelOutboundHandler extends ChannelHandler {
/**
* 當請求將Channel繫結到本地地址時被呼叫
*/
void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception;
/**
* 當請求將Channel連線到遠端節點時被呼叫
*/
void connect(
ChannelHandlerContext ctx, SocketAddress remoteAddress,
SocketAddress localAddress, ChannelPromise promise) throws Exception;
/**
* 當請求將Channel從遠端節點斷開時被呼叫
*/
void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
/**
* 當請求關閉Channel時被呼叫
*/
void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
/**
* 當請求將Channel從它的EventLoop登出時被呼叫
*/
void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
/**
* 當請求從Channel讀取更多的資料時被呼叫
*/
void read(ChannelHandlerContext ctx) throws Exception;
/**
* 當請求通過Channel將資料寫到遠端節點時被呼叫
*/
void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception;
/**
* 當請求通過Channel將入隊資料沖刷到遠端節點時被呼叫
*/
void flush(ChannelHandlerContext ctx) throws Exception;
}
ChannelPromise與ChannelFuture ChannelOutboundHandler中的大部分方法都需要一個ChannelPromise引數,以便在操作完成時得到通知。ChannelPromise是ChannelFuture的一個子類,其定義了一些可寫的方法,如setSuccess()和setFailure(),從而使ChannelFuture不可變。
ChannelHandler介面卡
ChannelInboundHandlerAdapter和ChannelOutboundHandlerAdapter類作為自己的ChannelHandler的介面卡類。這兩個介面卡分別提供了ChannelInboundHandler和ChannelOutboundHandler的基本實現。通過擴充套件抽象類ChannelHandlerAdapter,它們獲得了它們共同的超介面ChannelHandler的方法。生成的類的層次結構如:
ChannelHandlerAdapter還提供了實用方法isSharable()。如果其對應的實現被標註為Sharable,那麼 這個方法將返回true,表示它可以被新增到多個ChannelPipeline中。(@Sharable註解)
在ChannelInboundHandlerAdapter和ChannelOutboundHandlerAdapter中所提供的方法體呼叫了其相關聯的ChannelHandlerContext上的等效方法,從而將事件轉發到了ChannelPipeline中的下一個ChannelHandler中。
資源管理
為了診斷潛在的(資源洩漏)問題,Netty提供了class ResourceLeakDetector它將對你應用程式的緩衝區分配做大約1%的取樣來檢測記憶體洩露。相關的開銷是非常小的。
Netty目前定義了4種洩漏檢測級別:
級別 |
描述 |
DISABLED |
禁用洩漏檢測。只有在詳盡的測試之後才應使用 |
SIMPLE |
使用1%的預設取樣率檢測並報告任何發現的洩露。這是預設級別,適合絕大部分情況。 |
ADVANCED |
使用預設的取樣率,報告所發現的任何的洩露以及對應的訊息被訪問的位置 |
PARANOID |
類似於ADVANCED,但是其將會對每次(對訊息的)訪問都進行取樣。這對效能將會有很大影響,在除錯階段使用 |
洩露檢測級別可以通過將下面的Java系統屬性設定為表中的一個值來定義:
java-Dio.netty.leakDetectionLevel=ADVANCED
如果一個訊息被消費或者丟棄了,並且沒有傳遞給ChannelPipeline中的下一個ChannelOutboundHandler,那麼使用者就有責任呼叫ReferenceCountUtil.release()。如果訊息到達了實際的傳輸層,那麼當它被寫入時或者Channel關閉時,都將被自動釋放。
ChannelPipeline介面
ChannelPipeline是一個攔截流經Channel的入站和出站事件的Channel-Handler例項鏈,那麼就很容易看出這些ChannelHandler之間的互動是組成一個應用程式資料和事件處理邏輯的核心。
每一個新建立的Channel都將會被分配一個新的ChannelPipeline。這項關聯是永久性的;Channel既不能附加另外一個ChannelPipeline,也不能分離其當前的。在Netty元件的生命週期中,這是一項固定的操作,不需要開發人員的任何干預。
根據事件的起源,事件將會被ChannelInboundHandler或者ChannelOutboundHandler處理。隨後,通過呼叫ChannelHandlerContext實現,它將被轉發給同一超型別的下一個ChannelHandler。
ChannelHandlerContext:
ChannelHandlerContext使得ChannelHandler能夠和它的ChannelPipeline以及其他的ChannelHandler互動。ChannelHandler可以通知其所屬的ChannelPipeline中的下一個ChannelHandler,甚至可以動態修改它所屬的ChannelPipeline。 ChannelHandlerContext具有豐富的用於處理事件和執行I/O 操作的API。
當你完成了通過呼叫ChannelPipeline.add*()方法將入站處理器(ChannelInboundHandler)和出站處理器(ChannelOutboundHandler)混合新增到ChannelPipeline之後,每一個ChannelHandler從頭部到尾端的順序位置正如同我們方才所定義它們的一樣。因此,如果你將圖中的處理器(ChannelHandler)從左到右進行編號,那麼第一個被入站事件看到的ChannelHandler將是1,而第一個被出站事件看到的ChannelHandler將是 5。
在ChannelPipeline傳播事件時,它會測試ChannelPipeline中的下一個Channel-Handler的型別是否和事件的運動方向相匹配。如果不匹配,ChannelPipeline將跳過該ChannelHandler並前進到下一個,直到它找到和該事件所期望的方向相匹配的為止。
修改 ChannelPipeline
ChannelHandler可以通過新增、刪除或者替換其他的ChannelHandler來實時地修改ChannelPipeline的佈局。
名稱 |
描述 |
addFirst addBefore addAfter addLast |
將一個ChannelHandler 新增到ChannelPipeline 中 |
remove |
將一個ChannelHandler 從 ChannelPipeline 中移除 |
replace |
將 ChannelPipeline 中的一個 ChannelHandler 替換為另一個ChannelHandler |
ChannelHandler的執行和阻塞
通常ChannelPipeline中的每一個ChannelHandler都是通過它的EventLoop(I/O 執行緒)來處理傳遞給它的事件的。所以至關重要的是不要阻塞這個執行緒,因為這會對整體的I/O 處理產生負面的影響。但有時可能需要與那些使用阻塞API 的遺留程式碼進行互動。對於這種情況,ChannelPipeline有一些接受一個EventExecutorGroup的add()方法。如果一個事件被傳遞給一個自定義的EventExecutorGroup,它將被包含在這個EventExecutorGroup中的某個EventExecutor所處理,從而被從該Channel本身的EventLoop中移除。對於這種用例,Netty提供了一個叫DefaultEventExecutor-Group的預設實現。
通過型別或者名稱來訪問ChannelHandler的方法:
名稱 |
描述 |
get |
通過型別或者名稱返回 ChannelHandler |
context |
返回和ChannelHandler 繫結的 ChannelHandlerContext |
names |
返回ChannelPipeline 中所有 ChannelHandler 的名稱 |
觸發事件
ChannelPipeline的API公開了用於呼叫入站和出站操作的附加方法:
ChannelPipeline的入站操作:
方法名稱 | 描述 |
fireChannelRegistered | 呼叫ChannelPipeline中下一個ChannelInboundHandler的channelRegistered(ChannelHandlerContext)方法 |
fireChannelUnregistered | 呼叫ChannelPipeline中下一個ChannelInboundHandler的fireChannelUnregistered(ChannelHandlerContext)方法 |
fireChannelActive | 呼叫ChannelPipeline中下一個ChannelInboundHandler的fireChannelActive(ChannelHandlerContext)方法 |
fireChannelInactive | 呼叫ChannelPipeline中下一個ChannelInboundHandler的fireChannelInactive(ChannelHandlerContext)方法 |
fireExceptionCaught | 呼叫ChannelPipeline中下一個ChannelInboundHandler的fireExceptionCaught(ChannelHandlerContext)方法 |
fireUserEventTriggered | 呼叫ChannelPipeline中下一個ChannelInboundHandler的fireUserEventTriggered(ChannelHandlerContext)方法 |
fireChannelRead | 呼叫ChannelPipeline中下一個ChannelInboundHandler的fireChannelRead(ChannelHandlerContext)方法 |
fireChannelReadComplete | 呼叫ChannelPipeline中下一個ChannelInboundHandler的fireChannelReadComplete(ChannelHandlerContext)方法 |
fireChannelWritabilityChanged | 呼叫ChannelPipeline中下一個ChannelInboundHandler的fireChannelWritabilityChanged(ChannelHandlerContext)方法 |
在出站這邊,處理事件將會導致底層的套接字上發生一系列的動作。
ChannelPipeline的出站操作:
方法名稱 | 描述 |
bind | 將Channel繫結到一個本地地址,這將呼叫ChannelPipeline中的下一個ChannelOutboundHandler的bind(ChannelHandlerContext,Socket-Address, ChannelPromise)方法 |
connect | 將Channel連線到一個遠端地址,這將呼叫ChannelPipeline中的下一個ChannelOutboundHandler的connect(ChannelHandlerContext, SocketAddress, ChannelPromise)方法 |
disconnect | 將Channel斷開連線 。這將呼叫ChannelPipeline中的下一個ChannelOutboundHandler的disconnect(ChannelHandlerContext, ChannelPromise)方法 |
close | 將Channel關閉。 這將呼叫ChannelPipeline中的下一個ChannelOutboundHandler的close(ChannelHandlerContext, ChannelPromise)方法 |
deregister | 將Channel從它先前所分配的EventExecutor(即EventLoop)中登出。這將呼叫ChannelPipeline中的下一個ChannelOutboundHandler的deregister(ChannelHandlerContext, ChannelPromise)方法 |
flush | 沖刷Channel所有掛起的寫入。這將呼叫ChannelPipeline中的下一個ChannelOutboundHandler的flush(ChannelHandlerContext)方法 |
write | 將訊息寫入Channel。這將呼叫ChannelPipeline中的下一個ChannelOutboundHandler的write(ChannelHandlerContext, Object msg, Channel-Promise)方法。注意:這並不會將訊息寫入底層的Socket,而只會將它放入佇列中。要將它 寫入Socket,需要呼叫flush()或者writeAndFlush()方法 |
writeAndFlush | 這是一個先呼叫write()方法再接著呼叫flush()方法的便利方法 |
read | 請求從Channel中讀取更多的資料。這將呼叫ChannelPipeline中的下一個ChannelOutboundHandler的read(ChannelHandlerContext)方法 |
總結:
- ChannelPipeline儲存了與Channel相關聯的ChannelHandler;
- ChannelPipeline可以根據需要,通過新增或者刪除ChannelHandler來動態地修改;
- ChannelPipeline有著豐富的API用以被呼叫,以響應入站和出站事件。
ChannelHandlerContext介面
ChannelHandlerContext代表了ChannelHandler和ChannelPipeline之間的關聯,每當有ChannelHandler新增到ChannelPipeline中時,都會建立ChannelHandlerContext。ChannelHandlerContext的主要功能是管理它所關聯的ChannelHandler和在同一個ChannelPipeline中的其他ChannelHandler之間的互動。
ChannelHandlerContext有很多的方法,其中一些方法也存在於Channel和ChannelPipeline本身上,但是有一點重要的不同。如果呼叫Channel或者ChannelPipeline上的這些方法,它們將沿著整個ChannelPipeline進行傳播。而呼叫位於ChannelHandlerContext上的相同方法,則將從當前所關聯的ChannelHandler開始,並且只會傳播給位於該ChannelPipeline中的下一個能夠處理該事件的ChannelHandler。
public interface ChannelHandlerContext extends AttributeMap, ChannelInboundInvoker, ChannelOutboundInvoker {
/**
* 返回繫結到這個例項的Channel
*/
Channel channel();
/**
* 返回排程事件的EventExecutor
*/
EventExecutor executor();
/**
* 返回這個例項的唯一名稱
*/
String name();
/**
* 返回繫結到這個例項的ChannelHandler
*/
ChannelHandler handler();
/**
* 如果所關聯的ChannelHandler已經被從ChannelPipeline中移除則返回true
*/
boolean isRemoved();
/**
* 觸發對下一個ChannelInboundHandler上的fireChannelRegistered()方法的呼叫
*/
@Override
ChannelHandlerContext fireChannelRegistered();
/**
* 觸發對下一個ChannelInboundHandler上的fireChannelUnregistered()方法的呼叫
*/
@Override
ChannelHandlerContext fireChannelUnregistered();
/**
* 觸發對下一個ChannelInboundHandler上的fireChannelActive()方法的呼叫
*/
@Override
ChannelHandlerContext fireChannelActive();
/**
* 觸發對下一個ChannelInboundHandler上的fireChannelInactive()方法的呼叫
*/
@Override
ChannelHandlerContext fireChannelInactive();
/**
* 觸發對下一個ChannelInboundHandler上的fireExceptionCaught()方法的呼叫
*/
@Override
ChannelHandlerContext fireExceptionCaught(Throwable cause);
/**
* 觸發對下一個ChannelInboundHandler上的fireUserEventTriggered()方法的呼叫
*/
@Override
ChannelHandlerContext fireUserEventTriggered(Object evt);
/**
* 觸發對下一個ChannelInboundHandler上的fireChannelRead()方法的呼叫
*/
@Override
ChannelHandlerContext fireChannelRead(Object msg);
/**
* 觸發對下一個ChannelInboundHandler上的fireChannelReadComplete()方法的呼叫
*/
@Override
ChannelHandlerContext fireChannelReadComplete();
/**
* 觸發對下一個ChannelInboundHandler上的fireChannelWritabilityChanged()方法的呼叫
*/
@Override
ChannelHandlerContext fireChannelWritabilityChanged();
/**
* 將資料從Channel讀取到第一個入站緩衝區;如果讀取成功則觸發一個channelRead事件,並(在最後一個訊息被讀取完成後)通知ChannelInboundHandler的channelReadComplete(ChannelHandlerContext)方法
*/
@Override
ChannelHandlerContext read();
/**
* 重新整理所有掛起的訊息。
*/
@Override
ChannelHandlerContext flush();
/**
* 返回這個例項所關聯的ChannelPipeline
*/
ChannelPipeline pipeline();
/**
* 返回和這個例項相關聯的Channel所配置的ByteBufAllocator
*/
ByteBufAllocator alloc();
/******************補充*************************/
//write 通過這個例項寫入訊息並經過ChannelPipeline
//writeAndFlush 通過這個例項寫入並沖刷訊息並經過ChannelPipeline
}
使用ChannelHandlerContext的API的時候,請牢記以下兩點:
- ChannelHandlerContext和ChannelHandler之間的關聯(繫結)是永遠不會改變的,所以快取對它的引用是安全的;
- 相對於其他類的同名方法,ChannelHandlerContext的方法將產生更短的事件流,應該儘可能地利用這個特性來獲得最大的效能。
使用 ChannelHandlerContext
修改客戶端的ChildChannelHandler:
private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
protected void initChannel(SocketChannel ch) throws Exception {
System.out.println("客戶端啟動……");
ByteBuf bufs= Unpooled.copiedBuffer("pipeline傳送的資料->", Charset.forName("UTF-8"));
ch.pipeline().write(bufs);//通過呼叫ChannelPipeline的write方法將資料寫入通道,但是不重新整理
ch.pipeline().addLast("text",new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.channel().write(Unpooled.copiedBuffer("通過ChannelHandlerContext獲取的channel傳送的訊息->",
Charset.forName("UTF-8")));//通過ChannelHandlerContext獲取的channel傳送的訊息->
CompositeByteBuf messageBuf=Unpooled.compositeBuffer();
ByteBuf headerBuf=buf;
ByteBuf bodyBuf=buf;
messageBuf.addComponent(bodyBuf);//將ByteBuf例項追加到CompositeByteBuf
messageBuf.addComponent(headerBuf);
for (ByteBuf buf:messageBuf){//遍歷所有ByteBuf
System.out.println(buf);
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req, "UTF-8");
System.out.println("複合緩衝區:"+body);
}
ctx.writeAndFlush(buf);
}
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
ByteBuf buf = (ByteBuf) msg;
ByteBuf copyBuf=((ByteBuf) msg).copy();
// System.out.println(buf.refCnt());//返回此物件的引用計數。如果為0,則表示此物件已被釋放。
// buf.release();//釋放引用計數物件
for (int i = 0; i < buf.capacity(); i++) {
byte b=buf.getByte(i);
if((char)b>='a'&&(char)b<='z'||(char)b>='A'&&(char)b<='Z'||(char)b==',')
System.out.println("i="+(char)b);
}
int i=buf.forEachByte(new ByteProcessor() {
@Override
public boolean process(byte value) throws Exception {
byte[] b=",".getBytes();
if (b[0]!=value)
return true;
else
return false;
}
});
System.out.println("i="+i+" value="+(char) buf.getByte(i));
ByteBuf sliced = buf.slice(0,2);
sliced.setByte(0,(byte)'h');
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req, "UTF-8");
System.out.println(body);
ctx.fireChannelRead(copyBuf);
}
});
ch.pipeline().addLast("text2",new ChannelInboundHandlerAdapter(){
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
ByteBuf buf = (ByteBuf) msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req, "UTF-8");
System.out.println("text2:"+body);
ByteBuf bufs= Unpooled.copiedBuffer("test2傳送的資料", Charset.forName("UTF-8"));
ctx.writeAndFlush(bufs);
ctx.close();
}
});
// ch.pipeline().remove("text2");
}
}
被呼叫的Channel或ChannelPipeline上的write()方法將一直傳播事件通過整個ChannelPipeline,但是在ChannelHandler的級別上,事件從一個ChannelHandler到下一個ChannelHandler的移動是由ChannelHandlerContext上的呼叫完成的。
要想呼叫從某個特定的ChannelHandler開始的處理過程,必須獲取到在(ChannelPipeline)該ChannelHandler之前的ChannelHandler所關聯的ChannelHandlerContext。這個ChannelHandlerContext將呼叫和它所關聯的ChannelHandler之後的ChannelHandler。
訊息將從下一個ChannelHandler開始流經ChannelPipeline,繞過了所有前面的ChannelHandler。
因為一個ChannelHandler可以從 屬於多個ChannelPipeline,所以它也可以繫結到多個ChannelHandlerContext例項。 對於這種用法 指在多個ChannelPipeline中共享同一個ChannelHandler,對應的ChannelHandler必須要使用@Sharable註解標註;否則,試圖將它新增到多個ChannelPipeline時將會觸發異常。顯而易見,為了安全地被用於多個併發的Channel(即連線),這樣的ChannelHandler必須是執行緒安全的。
只應該在確定了你的ChannelHandler是執行緒安全的時才使用@Sharable註解。
在多個ChannelPipeline中安裝同一個ChannelHandler的一個常見的原因是用於收集跨越多個Channel的統計資訊。
異常處理
處理入站異常
如果在處理入站事件的過程中有異常被丟擲,那麼它將從它在ChannelInboundHandler裡被觸發的那一點開始流經ChannelPipeline。要想處理這種型別的入站異常,你需要在你的ChannelInboundHandler實現中重寫下面的方法。
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
System.out.println("伺服器異常..");
cause.printStackTrace();
ctx.close();
}
因為異常將會繼續按照入站方向流動(就像所有的入站事件一樣),所以實現了前面所示邏輯的ChannelInboundHandler通常位於ChannelPipeline的最後。這確保了所有的入站異常都總是會被處理,無論它們可能會發生在ChannelPipeline中的什麼位置。
- ChannelHandler.exceptionCaught()的預設實現是簡單地將當前異常轉發給ChannelPipeline中的下一個ChannelHandler;
- 如果異常到達了ChannelPipeline的尾端,它將會被記錄為未被處理;
- 要想定義自定義的處理邏輯,你需要重寫exceptionCaught()方法。然後你需要決定是否需要將該異常傳播出去。
處理出站異常
用於處理出站操作中的正常完成以及異常的選項,都基於以下的通知機制。
- 每個出站操作都將返回一個ChannelFuture。註冊到ChannelFuture的ChannelFutureListener將在操作完成時被通知該操作是成功了還是出錯了 。
- 幾乎所有的ChannelOutboundHandler上的方法都會傳入一個ChannelPromise的例項。作為ChannelFuture的子類,ChannelPromise也可以被分配用於非同步通知的監聽器。但是,ChannelPromise還具有提供立即通知的可寫方法:
ChannelPromise setSuccess();
ChannelPromise setFailure(Throwable cause);
新增ChannelFutureListener只需要呼叫ChannelFuture例項上的addListener(ChannelFutureListener)方法,並且有兩種不同的方式可以做到這一點。其中最常用的方式是,調用出站操作(如write()方法)所返回的ChannelFuture上的addListener()方法。
方式一:
ChannelFuture future=ctx.writeAndFlush(bufs);
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess())
System.out.println("成功");
else{
System.out.println("失敗");
future.cause().printStackTrace();
future.channel().close();
}
}
});
方式二:ChannelFutureListener新增到即將作為引數傳遞給ChannelOutboundHandler的方法的ChannelPromise。
public class OutboundExceptionHandler extends ChannelOutboundHandlerAdapter {
@Override
public void
write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
promise.addListener(new ChannelFutureListener() {
@Override
public void
operationComplete(ChannelFuture
f) {
if (!f.isSuccess()) {
f.cause().printStackTrace();
f.channel().close();
}
}
});
}
}
ChannelPromise的可寫方法
通過呼叫ChannelPromise上的setSuccess()和setFailure()方法,可以使一個操作的狀態在ChannelHandler的方法返回給其呼叫者時便即刻被感知到。
參考《Netty實戰》
附:
package netty.in.action;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import java.nio.charset.Charset;
public class EchoServer {
final ByteBuf bufs= Unpooled.copiedBuffer("Hello,劉德華", Charset.forName("UTF-8"));
public void bind(int port) throws Exception {
//配置服務端的NIO執行緒組
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.childHandler(new ChildChannelHandler());
// 繫結埠,同步等待成功
ChannelFuture f=b.bind(port).sync();
//等待服務端監聽埠關閉
f.channel().closeFuture().sync();
} finally {
//退出,釋放執行緒池資源
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
protected void initChannel(SocketChannel ch) throws Exception {
System.out.println("服務端啟動……");
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
ByteBuf buf = (ByteBuf) msg;
if(buf.hasArray()){
byte[] array=buf.array();//返回該緩衝區的備份位元組陣列。
int offset=buf.arrayOffset()+buf.readerIndex();//計算第一個位元組的偏移量
int length=buf.readableBytes();//獲取可讀位元組數
String s=new String(array,offset,length);
System.out.println("s="+s);
}else{
byte[] array = new byte[buf.readableBytes()];//獲取可讀位元組數並分配一個新的陣列來儲存
buf.getBytes(buf.readerIndex(),array);//將位元組複製到該陣列
String s=new String(array,0,buf.readableBytes());
System.out.println("直接緩衝區:"+s);
}
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req, "UTF-8");
System.out.println(body);
bufs.retain();//引用計數器加一
ChannelFuture future=ctx.writeAndFlush(bufs);
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess())
System.out.println("成功");
else{
System.out.println("失敗");
future.cause().printStackTrace();
future.channel().close();
}
}
});
// ctx.close();
}
});
}
}
public static void main(String[] args) throws Exception {
int port=8080;
new EchoServer().bind(port);
}
}
package netty.in.action;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufProcessor;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.ByteProcessor;
import java.nio.charset.Charset;
public class EchoClient {
final ByteBuf buf= Unpooled.copiedBuffer("Hello,王寶強", Charset.forName("UTF-8"));
public void connect(int port, String host) throws Exception {
// 配置客戶端NIO執行緒組
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChildChannelHandler() );
// 發起非同步連線操作
ChannelFuture f = b.connect(host, port).sync();
// 等待客戶端鏈路關閉
f.channel().closeFuture().sync();
} finally {
// 優雅退出,釋放NIO執行緒組
group.shutdownGracefully();
}
}
private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
protected void initChannel(SocketChannel ch) throws Exception {
System.out.println("客戶端啟動……");
ByteBuf bufs= Unpooled.copiedBuffer("pipeline傳送的資料->", Charset.forName("UTF-8"));
ch.pipeline().write(bufs);//通過呼叫ChannelPipeline的write方法將資料寫入通道,但是不重新整理
ch.pipeline().addLast("text",new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.channel().write(Unpooled.copiedBuffer("通過ChannelHandlerContext獲取的channel傳送的訊息->",
Charset.forName("UTF-8")));//通過ChannelHandlerContext獲取的channel傳送的訊息->
CompositeByteBuf messageBuf=Unpooled.compositeBuffer();
ByteBuf headerBuf=buf;
ByteBuf bodyBuf=buf;
messageBuf.addComponent(bodyBuf);//將ByteBuf例項追加到CompositeByteBuf
messageBuf.addComponent(headerBuf);
for (ByteBuf buf:messageBuf){//遍歷所有ByteBuf
System.out.println(buf);
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req, "UTF-8");
System.out.println("複合緩衝區:"+body);
}
ctx.writeAndFlush(buf);
}
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
ByteBuf buf = (ByteBuf) msg;
ByteBuf copyBuf=((ByteBuf) msg).copy();
// System.out.println(buf.refCnt());//返回此物件的引用計數。如果為0,則表示此物件已被釋放。
// buf.release();//釋放引用計數物件
for (int i = 0; i < buf.capacity(); i++) {
byte b=buf.getByte(i);
if((char)b>='a'&&(char)b<='z'||(char)b>='A'&&(char)b<='Z'||(char)b==',')
System.out.println("i="+(char)b);
}
int i=buf.forEachByte(new ByteProcessor() {
@Override
public boolean process(byte value) throws Exception {
byte[] b=",".getBytes();
if (b[0]!=value)
return true;
else
return false;
}
});
System.out.println("i="+i+" value="+(char) buf.getByte(i));
ByteBuf sliced = buf.slice(0,2);
sliced.setByte(0,(byte)'h');
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req, "UTF-8");
System.out.println(body);
ctx.fireChannelRead(copyBuf);
}
});
ch.pipeline().addLast("text2",new ChannelInboundHandlerAdapter(){
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
ByteBuf buf = (ByteBuf) msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req, "UTF-8");
System.out.println("text2:"+body);
ByteBuf bufs= Unpooled.copiedBuffer("test2傳送的資料", Charset.forName("UTF-8"));
ctx.writeAndFlush(bufs);
ctx.close();
}
});
// ch.pipeline().remove("text2");
}
}
public static void main(String[] args) throws Exception {
int port = 8080;
new EchoClient().connect(port, "127.0.0.1");
}
}