Netty(預置的ChannelHandler和編解碼器)
通過SSL/TLS保護Netty應用程式
為了支援SSL/TLS,Java提供了javax.net.ssl包,它的SSLContext和SSLEngine類使得實現解密和加密相當簡單直接。Netty通過一個名為SslHandler的ChannelHandler實現利用了這個API,其中SslHandler在內部使用SSLEngine來完成實際的工作。
Netty的OpenSSL/SSLEngine實現
Netty還提供了使用OpenSSL工具包(www.openssl.org)的SSLEngine實現。這個OpenSsl-Engine類提供了比JDK 提供的SSLEngine實現更好的效能。如果OpenSSL庫可用,可以將Netty應用程式(客戶端和伺服器)配置為預設使用OpenSslEngine。如果不可用,Netty將會回退到JDK 實現。無論你使用JDK 的SSLEngine還是使用Netty的OpenSslEngine,SSL API和資料流都是一致的。
public class SslChannelInitializer extends ChannelInitializer<Channel> { private final SslContext context; private final boolean startTls; public SslChannelInitializer(SslContext context, boolean startTls)//傳入要使用的SslContext { this.context = context; this.startTls = startTls;//如果設定為true,第一個寫入的訊息將不會被加密(客戶端應該設定為true) } @Override protected void initChannel(Channel ch) throws Exception { SSLEngine engine = context.newEngine(ch.alloc());//對於每個SslHandler例項,都使用Channel的ByteBuf-Allocator從SslContext獲取一個新的SSLEngine ch.pipeline().addFirst("ssl", new SslHandler(engine, startTls));//將SslHandler作為第一個ChannelHandler新增到ChannelPipeline中 } }
在大多數情況下,SslHandler將是ChannelPipeline中的第一個ChannelHandler。這確保了只有在所有其他的ChannelHandler將它們的邏輯應用到資料之後,才會進行加密。
方法名稱 |
描述 |
setHandshakeTimeout (long , TimeUnit) setHandshakeTimeoutMillis (long) getHandshakeTimeoutMillis () |
設定和獲取超時事件,超時後,握手ChannelFuture 將會被通知失敗 |
setCloseNotifyTimeout (long, TimeUnit) setCloseNotifyTimeoutMillis (long) getCloseNotifyTimeoutMillis () |
設定和獲取超時時間,超時後,將會觸發一個關閉通知並關閉連線。同時會導致通知該ChannelFuture 失敗 |
handshakeFuture() |
返回一個在握手完成後將會得到通知的ChannelFuture,如果握手前已經執行過,則返回包含了先前握手結果的ChannelFuture |
close() close(ChannelPromise) close(ChannelHandlerContext, ChannelPromise) |
傳送 close_notify 以請求關閉並銷燬底層的SslEngine |
構建基於Netty的HTTP/HTTPS應用程式
HTTP解碼器、編碼器和編解碼器
HTTP是基於請求/響應模式的:客戶端向伺服器傳送一個HTTP請求,然後伺服器將會返回一個HTTP響應。Netty提供了多種編碼器和解碼器以簡化對這個協議的使用。
一個HTTP請求/響應可能由多個數據部分組成,並且它總是以一個LastHttpContent部分作為結束。FullHttpRequest和FullHttpResponse訊息是特殊的子型別,分別代表了完整的請求和響應。所有型別的HTTP訊息(FullHttpRequest、LastHttpContent以及程式碼清單中展示的那些)都實現了HttpObject介面。
HTTP解碼器和編碼器:
名稱 | 描述 |
HttpRequestEncoder | 將HttpRequest、HttpContent和LastHttpContent訊息編碼為位元組 |
HttpResponseEncoder | 將HttpResponse、HttpContent和LastHttpContent訊息編碼為位元組 |
HttpRequestDecoder | 將位元組解碼為HttpRequest、HttpContent和LastHttpContent訊息 |
HttpResponseDecoder | 將位元組解碼為HttpResponse、HttpContent和LastHttpContent訊息 |
public class HttpPipelineInitializer extends ChannelInitializer<Channel>
{
private final boolean client;
public HttpPipelineInitializer(boolean client)
{
this.client = client;
}
@Override
protected void initChannel(Channel ch) throws Exception
{
ChannelPipeline pipeline = ch.pipeline();
if (client) {
pipeline.addLast("decoder", new HttpResponseDecoder());//如果是客戶端,則新增HttpResponseDecoder以處理來自伺服器的響應
pipeline.addLast("encoder", new HttpRequestEncoder());//如果是客戶端,則新增HttpRequestEncoder以向伺服器傳送請求
} else {
pipeline.addLast("decoder", new HttpRequestDecoder());//如果是伺服器,則新增HttpRequestDecoder以接收來自客戶端的請求
pipeline.addLast("encoder", new HttpResponseEncoder());//如果是伺服器,則新增HttpResponseEncoder以向客戶端傳送響應
}
}
}
聚合 HTTP訊息
由於HTTP的請求和響應可能由許多部分組成,因此你需要聚合它們以形成完整的訊息。為了消除這項繁瑣的任務,Netty提供了一個聚合器,它可以將多個訊息部分合併為FullHttpRequest或者FullHttpResponse訊息。通過這樣的方式,你將總是看到完整的訊息內容。
public class HttpAggregatorInitializer extends ChannelInitializer<Channel> {
private final boolean isClient;
public HttpAggregatorInitializer(boolean isClient) {
this.isClient = isClient;
}
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
if (isClient) {
pipeline.addLast("codec", new HttpClientCodec());//如果是客戶端,則新增HttpClientCodec
} else {
pipeline.addLast("codec", new HttpServerCodec());//如果是伺服器,則新增HttpServerCodecs
}
pipeline.addLast("aggregator", new HttpObjectAggregator(512 * 1024));//將最大的訊息大小為512KB的HttpObjectAggregator新增到ChannelPipeline
}
}
HTTP壓縮
HTTP請求的頭部資訊
客戶端可以通過提供以下頭部資訊來指示伺服器它所支援的壓縮格式:
GET /encrypted-areaHTTP/1.1
Host:www.example.com
Accept-Encoding:gzip,deflate
public class HttpCompressionInitializer extends ChannelInitializer<Channel> {
private final boolean isClient;
public HttpCompressionInitializer(boolean isClient) {
this.isClient = isClient;
}
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
if (isClient) {
pipeline.addLast("codec ", new HttpClientCodec());//如果是客戶端,則新增HttpClientCodec
pipeline.addLast("decompressor", new HttpContentDecompressor());//如果是客戶端,則新增HttpContentDecompressor以處理來自伺服器的壓縮內容
} else {
pipeline.addLast("codec", new HttpServerCodec())//如果是伺服器,則新增HttpServerCodec
;
pipeline.addLast("compressor", new HttpContentCompressor());//如果是伺服器,則新增HttpContentCompressor來壓縮資料(如果客戶端支援它)
}
}
}
使用 HTTPS
啟用HTTPS只需要將SslHandler新增到ChannelPipeline的ChannelHandler組合中。
public class HttpsCodecInitializer extends ChannelInitializer<Channel> {
private final SslContext context;
private final boolean isClient;
public HttpsCodecInitializer(SslContext context, boolean isClient)//傳入要使用的SslContext
{
this.context = context;
this.isClient = isClient;
}
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
SSLEngine engine = context.newEngine(ch.alloc());
pipeline.addFirst("ssl", new SslHandler(engine));//將SslHandler新增到ChannelPipeline中以使用HTTPS
if (isClient) {
pipeline.addLast("codec", new HttpClientCodec());//如果是客戶端,則新增HttpClientCodec
} else {
pipeline.addLast("codec", new HttpServerCodec());//如果是伺服器,則新增HttpServerCodec
}
}
}
WebSocket
要想向你的應用程式中新增對於WebSocket的支援,你需要將適當的客戶端或者伺服器WebSocketChannelHandler新增到ChannelPipeline中。這個類將處理由WebSocket定義的稱為幀的特殊訊息型別。
名稱 | 描述 |
BinaryWebSocketFrame | 資料幀:二進位制資料 |
TextWebSocketFrame | 資料幀:文字資料 |
ContinuationWebSocketFrame | 資料幀:屬於上一個BinaryWebSocketFrame或者TextWeb-SocketFrame的文字的或者二進位制資料 |
CloseWebSocketFrame | 控制幀:一個CLOSE請求、關閉的狀態碼以及關閉的原因 |
PingWebSocketFrame | 控制幀:請求一個PongWebSocketFrame |
PongWebSocketFrame | 控制幀:對PingWebSocketFrame請求的響應 |
使用WebSocketServerProtocolHandler的簡單示例,這個類處理協議升級握手,以及3種控制幀——Close、Ping和Pong。Text和Binary資料幀將會被傳遞給下一個(由你實現的)ChannelHandler進行處理。
public class WebSocketServerInitializer extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel ch) throws
Exception {
ch.pipeline().addLast(
new HttpServerCodec(),
new HttpObjectAggregator(65536),//為握手提供聚合的HttpRequest
new WebSocketServerProtocolHandler("/websocket"),//如果被請求的端點是"/websocket",則處理該升級握手
new TextFrameHandler(),//TextFrameHandler處理TextWebSocketFrame
new BinaryFrameHandler(),//BinaryFrameHandler處理BinaryWebSocketFrame
new ContinuationFrameHandler());//ContinuationFrameHandler處理ContinuationWebSocketFrame
}
public static final class TextFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
@Override
public void
channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
// Handle text frame
}
}
public static final class BinaryFrameHandler extends SimpleChannelInboundHandler<BinaryWebSocketFrame> {
@Override
public void
channelRead0(ChannelHandlerContext ctx, BinaryWebSocketFrame msg) throws Exception {
// Handle binary frame
}
}
public static final class ContinuationFrameHandler extends SimpleChannelInboundHandler<ContinuationWebSocketFrame> {
@Override
public void
channelRead0(ChannelHandlerContext ctx, ContinuationWebSocketFrame msg) throws Exception {
// Handle continuation frame
}
}
}
空閒的連線和超時
檢測空閒連線以及超時對於及時釋放資源來說是至關重要的。由於這是一項常見的任務,Netty特地為它提供了幾個ChannelHandler實現。
名稱 |
描述 |
IdleStateHandler |
當連結空閒時間太長,將觸發一個IdleStateEvent 事件。然後,可以通過 ChannelInboundHandler中重寫userEventTriggered()方法來處理該 IdleStateEvent 事件 |
ReadTimeoutHandler |
如果指定時間間隔內沒收到任何入站資料,則丟擲一個ReadTimeoutException 並關閉對應Channel。可以重寫ChannelHandler中的exceptionCaught()方法檢測該ReadTimeoutException |
WriteTimeoutHandler |
如果指定時間間隔內沒有任何出站資料寫入,則丟擲WriteTimeoutExceptoin 並關閉對應Channel。可以重寫ChannelHandler 的exceptionCaught()方法檢測該WriteTimeout-Exception |
如果在60秒之內沒有接收或者傳送任何的資料,我們將如何得到通知;如果沒有響應,則連線會被關閉:
public class IdleStateHandlerInitializer extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel ch) throws
Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new IdleStateHandler(0, 0, 60, TimeUnit.SECONDS));//IdleStateHandler將在被觸發時傳送一個IdleStateEvent事件
pipeline.addLast(new HeartbeatHandler());//將一個HeartbeatHandler新增到Chan-nelPipeline中
pipeline.addLast(new MessageHander());
}
public static final class HeartbeatHandler extends ChannelInboundHandlerAdapter {////實現userEventTriggered()方法以傳送心跳訊息
private static final ByteBuf HEARTBEAT_SEQUENCE =
Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("HEARTBEAT", CharsetUtil.ISO_8859_1));//傳送到遠端節點的心跳訊息
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
ctx.writeAndFlush(HEARTBEAT_SEQUENCE.duplicate()).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);//傳送心跳訊息,並在傳送失敗時關閉該連線
} else {
super.userEventTriggered(ctx, evt);//不是IdleStateEvent事件,所以將它傳遞給下一個ChannelInboundHandler
}
}
}
public static final class MessageHander extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req, "UTF-8");
System.out.println(body);
}
}
}
如果連線超過60秒沒有接收或者傳送任何的資料,那麼IdleStateHandler將會使用一個IdleStateEvent事件來呼叫fireUserEventTriggered()方法。HeartbeatHandler實現了userEventTriggered()方法,如果這個方法檢測到IdleStateEvent事件,它將會發送心跳訊息,並且新增一個將在傳送操作失敗時關閉該連線的ChannelFutureListener。
解碼基於分隔符的協議和基於長度的協議
基於分隔符的協議
基於分隔符的(delimited)訊息協議使用定義的字元來標記的訊息或者訊息段(通常被稱為幀)的開頭或者結尾。
名稱 |
描述 |
DelimiterBasedFrameDecoder |
使用任何由使用者提供的分隔符來提取幀的通用解碼器 |
LineasedFrameDecoder |
提取由行尾符(\n 或者 \r\n)分割的幀的解碼器。這個解碼器比DelimiterBaseFrameDecoder 更快 |
public class LineBasedHandlerInitializer extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new LineBasedFrameDecoder(64 * 1024));//該LineBasedFrame-Decoder將提取的幀轉發給下一個ChannelInboundHandler
pipeline.addLast(new FrameHandler());//新增FrameHandler以接收幀
}
public static final class FrameHandler extends SimpleChannelInboundHandler<ByteBuf> {
@Override
public void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
// Do something with the data extracted from the frame
}
}
}
如果你正在使用除了行尾符之外的分隔符分隔的幀,那麼你可以以類似的方式使用DelimiterBasedFrameDecoder,只需要將特定的分隔符序列指定到其建構函式即可。
這些解碼器是實現你自己的基於分隔符的協議的工具。作為示例,我們將使用下面的協議規範:
- 傳入資料流是一系列的幀,每個幀都由換行符(\n)分隔;
- 每個幀都由一系列的元素組成,每個元素都由單個空格字元分隔;
- 一個幀的內容代表一個命令,定義為一個命令名稱後跟著數目可變的引數。
基於長度的協議
基於長度的協議通過將它的長度編碼到幀的頭部來定義幀,而不是使用特殊的分隔符來標記它的結束。
名稱 |
描述 |
FixedLengthFrameDecoder |
提取在呼叫建構函式時指定的定長幀 |
LengthFieldBaseFrameDecoder |
根據編碼進幀頭部中的長度值提取幀; 該欄位的偏移量以及長度在建構函式中指定 |
遇到被編碼到訊息頭部的幀大小不是固定值的協議。為了處理這種變長幀,你可以使用LengthFieldBasedFrameDecoder,它將從頭部欄位確定幀長,然後從資料流中提取指定的位元組數。
public class LengthBasedInitializer extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel ch) throws
Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new LengthFieldBasedFrameDecoder(64 * 1024,
0, 8));//使用LengthFieldBasedFrameDecoder解碼將幀長度編碼到幀起始的前8個位元組中的訊息
pipeline.addLast(new FrameHandler());//新增FrameHandler以處理每個幀
}
public static final class FrameHandler extends SimpleChannelInboundHandler<ByteBuf> {
@Override
public void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
// Do something with the frame
}
}
}
寫大型資料
在寫大型資料時,需要準備好處理到遠端節點的連線是慢速連線的情況,這種情況會導致記憶體釋放的延遲。NIO的零拷貝特性,這種特性消除了將檔案的內容從檔案系統移動到網路棧的複製過程。所有的這一切都發生在Netty的核心中,所以應用程式所有需要做的就是使用一個FileRegion介面的實現,其在Netty的API文件中的定義是:“通過支援零拷貝的檔案傳輸的Channel來發送的檔案區域。”
FileInputStream in = new FileInputStream(file);
FileRegion region = new DefaultFileRegion( in.getChannel(), 0, file.length());//以該檔案的完整長度建立一個新的DefaultFileRegion
channel.writeAndFlush(region).addListener(//傳送該DefaultFile-Region,並註冊一個ChannelFutureListener
new ChannelFutureListener() {
@Override
public void
operationComplete(ChannelFuture future)
throws Exception {
if (!future.isSuccess()) {
Throwable cause = future.cause();
// Do something
}
}
});
這個示例只適用於檔案內容的直接傳輸,不包括應用程式對資料的任何處理。在需要將資料從檔案系統複製到使用者記憶體中時,可以使用ChunkedWriteHandler,它支援非同步寫大型資料流,而又不會導致大量的記憶體消耗。關鍵是interface ChunkedInput<B>,其中型別引數B是readChunk()方法返回的型別。Netty預置了該介面的4個實現。每個都代表了一個將由ChunkedWriteHandler處理的不定長度的資料流。
名稱 | 描述 |
ChunkedFile | 從檔案中逐塊獲取資料,當你的平臺不支援零拷貝或者你需要轉換資料時使用 |
ChunkedNioFile | 和ChunkedFile類似,只是它使用了FileChannel |
ChunkedStream | 從InputStream中逐塊傳輸內容 |
ChunkedNioStream | 從ReadableByteChannel中逐塊傳輸內容 |
當Channel的狀態變為活動的時,WriteStreamHandler將會逐塊地把來自檔案中的資料作為ChunkedStream寫入。資料在傳輸之前將會由SslHandler加密。
public class ChunkedWriteHandlerInitializer extends ChannelInitializer<Channel> {
private final File file;
private final SslContext sslCtx;
public ChunkedWriteHandlerInitializer(File file, SslContext sslCtx) {
this.file = file;
this.sslCtx = sslCtx;
}
@Override
protected void
initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new SslHandler(sslCtx.newEngine(ch.alloc())));//將SslHandler新增到ChannelPipeline中
pipeline.addLast(new ChunkedWriteHandler());//新增Chunked-WriteHandler以處理作為ChunkedInput傳入的資料
pipeline.addLast(new WriteStreamHandler());//一旦連線建立,WriteStreamHandler就開始寫檔案資料
}
public final class WriteStreamHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {//當連線建立時,channelActive()方法將使用ChunkedInput寫檔案資料
super.channelActive(ctx);
ctx.writeAndFlush(new ChunkedStream(new FileInputStream(file)));
}
}
}
序列化資料
JDK提供了ObjectOutputStream和ObjectInputStream,用於通過網路對POJO的基本資料型別和圖進行序列化和反序列化。該API並不複雜,而且可以被應用於任何實現了java.io.Serializable介面的物件。但是它的效能也不是非常高效的。
名稱 |
描述 |
CompatibleObjectDecoder |
和使用JDK序列化的 非基於Netty的 遠端節點 進行互操作的解碼器 |
CompatibleObjectEncode |
和使用JDK序列化的 非基於Netty的 遠端節點 進行互操作的編碼器 |
ObjectDecoder |
構建於JDK 序列化之上的使用自定義的序列化來解碼的解碼器; 當沒有其他的外部依賴時,它提供了速度上的改進。否則其他序列化更可取 |
ObjectEncoder |
構建於JDK序列化之上的使用自定義的序列化來編碼的編碼器; 當沒有其他的外部依賴時,它提供了速度上的改進。否則其他序列化更可取 |
使用 JBoss Marshalling進行序列化
它比JDK序列化最多快3倍,而且也更加緊湊。
名稱 |
描述 |
CompatibleMarchallingDecoder CompatibleMarshallingEncoder |
與只使用JDK序列化的遠端節點相容 |
MarshallingDecoder MarshallingEncoder |
適用於使用JBoss Marshalling 的節點。這些類必須一起使用 |
import io.netty.channel.*;
import io.netty.handler.codec.marshalling.MarshallerProvider;
import io.netty.handler.codec.marshalling.MarshallingDecoder;
import io.netty.handler.codec.marshalling.MarshallingEncoder;
import io.netty.handler.codec.marshalling.UnmarshallerProvider;
import java.io.Serializable;
public class MarshallingInitializer extends ChannelInitializer<Channel> {
private final MarshallerProvider marshallerProvider;
private final UnmarshallerProvider unmarshallerProvider;
public MarshallingInitializer(
UnmarshallerProvider unmarshallerProvider, MarshallerProvider marshallerProvider) {
this.marshallerProvider = marshallerProvider;
this.unmarshallerProvider = unmarshallerProvider;
}
@Override
protected void initChannel(Channel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast(new MarshallingDecoder(unmarshallerProvider));//新增MarshallingDecoder以將ByteBuf轉換為POJO
pipeline.addLast(new MarshallingEncoder(marshallerProvider));//新增Marshalling-Encoder以將POJO轉換為ByteBuf
pipeline.addLast(new ObjectHandler());//新增ObjectHandler,以處理普通的實現了Serializable介面的POJO
}
public static final class ObjectHandler extends SimpleChannelInboundHandler<Serializable> {
@Override
public void channelRead0(ChannelHandlerContext channelHandlerContext, Serializable serializable) throws Exception {
// Do something
}
}
}
通過 Protocol Buffers 序列化
Netty序列化的最後一個解決方案是利用Protocol Buffers的編解碼器,它是一種由Google公司開發的、現在已經開源的資料交換格式。
名稱 |
描述 |
ProtobufDecoder |
使用 protobuf 對訊息進行解碼 |
ProtobufEncoder |
使用 protobuf 對訊息進行編碼 |
ProtobufVarint32FrameDecoder |
根據訊息中的Google Protocol Buffer 的 “Base 128 Varints” 整型長度欄位值動態地分割所接收到的ByteBuf |
ProtobufVarint32LengthFieldPrepender |
向ByteBuf 前追加一個Google Protocal Buffer 的“Base 128 Varints” 整型的長度欄位值 |
import com.google.protobuf.MessageLite;
import io.netty.channel.*;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
public class ProtoBufInitializer extends ChannelInitializer<Channel> {
private final MessageLite lite;
public ProtoBufInitializer(MessageLite lite) {
this.lite = lite;
}
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new ProtobufVarint32FrameDecoder());//新增ProtobufVarint32FrameDecoder以分隔幀
pipeline.addLast(new ProtobufEncoder());//新增ProtobufEncoder以處理訊息的編碼
pipeline.addLast(new ProtobufDecoder(lite));//新增ProtobufDecoder以解碼訊息
pipeline.addLast(new ObjectHandler());//新增Object-Handler以處理解碼訊息
}
public static final class ObjectHandler extends SimpleChannelInboundHandler<Object> {
@Override
public void channelRead0(ChannelHandlerContext
ctx, Object msg) throws Exception {
// Do something with the object
}
}
}
參考《Netty實戰》