Netty In Action中文版
阿新 • • 發佈:2019-01-28
http://blog.csdn.net/abc_key/article/details/38304217
本章介紹
- 單元測試
- EmbeddedChannel
10.1 General
正如前面所學的,Netty提供了一個簡單的方法在ChannelPipeline上“堆疊”不同的ChannelHandler實現。所有的ChannelHandler都會參與處理事件,這個設計允許獨立出可重用的小邏輯塊,它只處理一個任務。這不僅使程式碼更清晰,也更容易測試。
測試ChannelHandler可以通過使用“嵌入式”傳輸很容易的傳遞事件槽管道以測試你的實現。對於這個嵌入式傳輸,Netty提供了一個特定的Channel實現:EmbeddedChannel。但是它是如何工作的呢?EmbeddedChannel的工作非常簡單,它允許寫入入站或出站資料,然後檢查ChannelPipeline的結束。這允許你檢查訊息編碼/解碼或觸發ChannelHandler任何行為。
編寫入站和出站的卻別是什麼?入站資料是通過ChannelInboundHandler處理,代表從遠端對等通道讀取資料;出站資料是通過ChannelOutboundHandler處理,代表寫入資料到遠端對等通道。因此測試ChannelHandler就會選擇writeInbound(...)或writeOutbound()(或者都選擇)。
EmbeddedChannel提供了下面一些方法:- writeInbound(Object...),寫一個訊息到入站通道
- writeOutbound(Object...),寫訊息到出站通道
- readInbound(),從EmbeddedChannel讀取入站訊息,可能返回null
- readOutbound(),從EmbeddedChannel讀取出站訊息,可能返回null
- finish(),標示EmbeddedChannel已結束,任何寫資料都會失敗
如上圖所示,使用writeOutbound(...)寫訊息到通道,訊息在出站方法通過ChannelPipeline,之後就可以使用readOutbound()讀取訊息。著同樣使用與入站,使用writeInbound(...)和readInbound()。處理入站和出站是相似的,它總是遍歷整個ChannelPipeline直到ChannelPipeline結束,並將處理過的訊息儲存在EmbeddedChannel中。下面來看看如何測試你的邏輯。
10.2 測試ChannelHandler
10.2.1 測試處理入站訊息的handler
我們來編寫一個簡單的ByteToMessageDecoder實現,有足夠的資料可以讀取時將產生固定大小的包,如果沒有足夠的資料可以讀取,則會等待下一個資料塊並再次檢查是否可以產生一個完整包。下圖顯示了重新組裝接收的位元組:
如上圖所示,它可能會佔用一個以上的“event”以獲取足夠的位元組產生一個數據包,並將它傳遞到ChannelPipeline中的下一個ChannelHandler,看下面程式碼:
解碼器的實現完成了,寫一個單元測試的方法是個好主意。即使程式碼看起來沒啥問題,但是也應該進行單元測試,這樣能在部署到生產之前就發現問題。現在讓我們來看看如何使用EmbeddedChannel來完成測試,看下面程式碼:package netty.in.action; import java.util.List; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; public class FixedLengthFrameDecoder extends ByteToMessageDecoder { private final int frameLength; public FixedLengthFrameDecoder(int frameLength) { if (frameLength <= 0) { throw new IllegalArgumentException( "frameLength must be a positive integer: " + frameLength); } this.frameLength = frameLength; } @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { while (in.readableBytes() >= frameLength) { ByteBuf buf = in.readBytes(frameLength); out.add(buf); } } }
package netty.in.action;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.embedded.EmbeddedChannel;
import org.junit.Assert;
import org.junit.Test;
public class FixedLengthFrameDecoderTest {
@Test
public void testFramesDecoded() {
ByteBuf buf = Unpooled.buffer();
for (int i = 0; i < 9; i++) {
buf.writeByte(i);
}
ByteBuf input = buf.duplicate();
EmbeddedChannel channel = new EmbeddedChannel(
new FixedLengthFrameDecoder(3));
// write bytes
Assert.assertTrue(channel.writeInbound(input));
Assert.assertTrue(channel.finish());
// read message
Assert.assertEquals(buf.readBytes(3), channel.readInbound());
Assert.assertEquals(buf.readBytes(3), channel.readInbound());
Assert.assertEquals(buf.readBytes(3), channel.readInbound());
Assert.assertNull(channel.readInbound());
}
@Test
public void testFramesDecoded2() {
ByteBuf buf = Unpooled.buffer();
for (int i = 0; i < 9; i++) {
buf.writeByte(i);
}
ByteBuf input = buf.duplicate();
EmbeddedChannel channel = new EmbeddedChannel(
new FixedLengthFrameDecoder(3));
Assert.assertFalse(channel.writeInbound(input.readBytes(2)));
Assert.assertTrue(channel.writeInbound(input.readBytes(7)));
Assert.assertTrue(channel.finish());
Assert.assertEquals(buf.readBytes(3), channel.readInbound());
Assert.assertEquals(buf.readBytes(3), channel.readInbound());
Assert.assertEquals(buf.readBytes(3), channel.readInbound());
Assert.assertNull(channel.readInbound());
}
}
如上面程式碼,testFramesDecoded()方法想測試一個ByteBuf,這個ByteBuf包含9個可讀位元組,被解碼成包含了3個可讀位元組的ByteBuf。你可能注意到,它寫入9位元組到通道是通過呼叫writeInbound()方法,之後再執行finish()來將EmbeddedChannel標記為已完成,最後呼叫readInbound()方法來獲取EmbeddedChannel中的資料,直到沒有可讀位元組。testFramesDecoded2()方法採取同樣的方式,但有一個區別就是入站ByteBuf分兩步寫的,當呼叫writeInbound(input.readBytes(2))後返回false時,FixedLengthFrameDecoder值會產生輸出,至少有3個位元組是可讀,testFramesDecoded2()測試的工作相當於testFramesDecoded()。10.2.2 測試處理出站訊息的handler
測試處理出站訊息和測試處理入站訊息不太一樣,例如有一個繼承MessageToMessageEncoder的AbsIntegerEncoder類,它所做的事情如下:
- 將已接收的資料flush()後將從ByteBuf讀取所有整數並呼叫Math.abs(...)
- 完成後將位元組寫入ChannelPipeline中下一個ChannelHandler的ByteBuf中
看下面程式碼:
package netty.in.action;
import java.util.List;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageEncoder;
public class AbsIntegerEncoder extends MessageToMessageEncoder<ByteBuf> {
@Override
protected void encode(ChannelHandlerContext ctx, ByteBuf msg,
List<Object> out) throws Exception {
while(msg.readableBytes() >= 4){
int value = Math.abs(msg.readInt());
out.add(value);
}
}
}
下面程式碼是測試AbsIntegerEncoder:package netty.in.action;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.embedded.EmbeddedChannel;
import org.junit.Assert;
import org.junit.Test;
public class AbsIntegerEncoderTest {
@Test
public void testEncoded() {
//建立一個能容納10個int的ByteBuf
ByteBuf buf = Unpooled.buffer();
for (int i = 1; i < 10; i++) {
buf.writeInt(i * -1);
}
//建立EmbeddedChannel物件
EmbeddedChannel channel = new EmbeddedChannel(new AbsIntegerEncoder());
//將buf資料寫入出站EmbeddedChannel
Assert.assertTrue(channel.writeOutbound(buf));
//標示EmbeddedChannel完成
Assert.assertTrue(channel.finish());
//讀取出站資料
ByteBuf output = (ByteBuf) channel.readOutbound();
for (int i = 1; i < 10; i++) {
Assert.assertEquals(i, output.readInt());
}
Assert.assertFalse(output.isReadable());
Assert.assertNull(channel.readOutbound());
}
}
10.3 測試異常處理
有時候傳輸的入站或出站資料不夠,通常這種情況也需要處理,例如丟擲一個異常。這可能是你錯誤的輸入或處理大的資源或其他的異常導致。我們來寫一個實現,如果輸入位元組超出限制長度就丟擲TooLongFrameException,這樣的功能一般用來防止資源耗盡。看下圖:
上圖顯示幀的大小被限制為3位元組,若輸入的位元組超過3位元組,則超過的位元組被丟棄並丟擲TooLongFrameException。在ChannelPipeline中的其他ChannelHandler實現可以處理TooLongFrameException或者忽略異常。處理異常在ChannelHandler.exceptionCaught()方法中完成,ChannelHandler提供了一些具體的實現,看下面程式碼:
package netty.in.action;
import java.util.List;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.TooLongFrameException;
public class FrameChunkDecoder extends ByteToMessageDecoder {
// 限制大小
private final int maxFrameSize;
public FrameChunkDecoder(int maxFrameSize) {
this.maxFrameSize = maxFrameSize;
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in,
List<Object> out) throws Exception {
// 獲取可讀位元組數
int readableBytes = in.readableBytes();
// 若可讀位元組數大於限制值,清空位元組並丟擲異常
if (readableBytes > maxFrameSize) {
in.clear();
throw new TooLongFrameException();
}
// 讀取ByteBuf並放到List中
ByteBuf buf = in.readBytes(readableBytes);
out.add(buf);
}
}
測試FrameChunkDecoder的程式碼如下:package netty.in.action;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.TooLongFrameException;
import org.junit.Assert;
import org.junit.Test;
public class FrameChunkDecoderTest {
@Test
public void testFramesDecoded() {
//建立ByteBuf並填充9位元組資料
ByteBuf buf = Unpooled.buffer();
for (int i = 0; i < 9; i++) {
buf.writeByte(i);
}
//複製一個ByteBuf
ByteBuf input = buf.duplicate();
//建立EmbeddedChannel
EmbeddedChannel channel = new EmbeddedChannel(new FrameChunkDecoder(3));
//讀取2個位元組寫入入站通道
Assert.assertTrue(channel.writeInbound(input.readBytes(2)));
try {
//讀取4個位元組寫入入站通道
channel.writeInbound(input.readBytes(4));
Assert.fail();
} catch (TooLongFrameException e) {
}
//讀取3個位元組寫入入站通道
Assert.assertTrue(channel.writeInbound(input.readBytes(3)));
//標識完成
Assert.assertTrue(channel.finish());
//從EmbeddedChannel入去入站資料
Assert.assertEquals(buf.readBytes(2), channel.readInbound());
Assert.assertEquals(buf.skipBytes(4).readBytes(3),
channel.readInbound());
}
}