利用Netty構建自定義協議的通訊
阿新 • • 發佈:2019-01-04
在複雜的網路世界中,各種應用之間通訊需要依賴各種各樣的協議,比如:HTTP
,Telnet
,FTP
,SMTP
等等。
在開發過程中,有時候我們需要構建一些適應自己業務的應用層協議,Netty
作為一個非常優秀的網路通訊框架,可以幫助我們完成自定義協議的通訊。
一般而言,我們制定的協議需要兩個部分:
- Header : 協議頭部,放置一些Meta資訊。
- Content : 應用之間互動的資訊主體。
例如:
| Version | Content-Length | SessionId | Content |
其中Version
,Content-Length
,SessionId
就是Header資訊,Content
就是互動的主體。給這個協議起一個名字叫做luck
luck
協議,我們構建一個類。// 訊息的頭部
public class LuckHeader {
// 協議版本
private int version;
// 訊息內容長度
private int contentLength;
// 服務名稱
private String sessionId;
public LuckHeader(int version, int contentLength, String sessionId) {
this.version = version;
this.contentLength = contentLength;
this .sessionId = sessionId;
}
public int getVersion() {
return version;
}
public void setVersion(int version) {
this.version = version;
}
public int getContentLength() {
return contentLength;
}
public void setContentLength(int contentLength) {
this .contentLength = contentLength;
}
public String getSessionId() {
return sessionId;
}
public void setSessionId(String sessionId) {
this.sessionId = sessionId;
}
}
// 訊息的主體
public class LuckMessage {
private LuckHeader luckHeader;
private String content;
public LuckMessage(LuckHeader luckHeader, String content) {
this.luckHeader = luckHeader;
this.content = content;
}
public LuckHeader getLuckHeader() {
return luckHeader;
}
public void setLuckHeader(LuckHeader luckHeader) {
this.luckHeader = luckHeader;
}
public String getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
@Override
public String toString() {
return String.format("[version=%d,contentLength=%d,sessionId=%s,content=%s]",
luckHeader.getVersion(),
luckHeader.getContentLength(),
luckHeader.getSessionId(),
content);
}
}
那麼我們在Netty
中如何去對這種自定義的協議編碼(Encode)呢?
在Netty
中對資料進行編碼解碼需要利用Codec
元件,Codec
元件中分為:
- Encoder : 編碼器,將出站的資料從一種格式轉換成另外一種格式。
- Decoder : 解碼器,將入站的資料從一種格式轉換成另外一種格式。
LuckDecoder.java
public class LuckDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
// 獲取協議的版本
int version = in.readInt();
// 獲取訊息長度
int contentLength = in.readInt();
// 獲取SessionId
byte[] sessionByte = new byte[36];
in.readBytes(sessionByte);
String sessionId = new String(sessionByte);
// 組裝協議頭
LuckHeader header = new LuckHeader(version, contentLength, sessionId);
// 讀取訊息內容
byte[] content = in.readBytes(in.readableBytes()).array();
LuckMessage message = new LuckMessage(header, new String(content));
out.add(message);
}
}
LuckEncoder.java
@ChannelHandler.Sharable
public class LuckEncoder extends MessageToByteEncoder<LuckMessage> {
@Override
protected void encode(ChannelHandlerContext ctx, LuckMessage message, ByteBuf out) throws Exception {
// 將Message轉換成二進位制資料
LuckHeader header = message.getLuckHeader();
// 這裡寫入的順序就是協議的順序.
// 寫入Header資訊
out.writeInt(header.getVersion());
out.writeInt(message.getContent().length());
out.writeBytes(header.getSessionId().getBytes());
// 寫入訊息主體資訊
out.writeBytes(message.getContent().getBytes());
}
}
編寫一個邏輯控制層,展現server接收到的協議資訊:
public class NettyLuckHandler extends SimpleChannelInboundHandler<Message> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, Message msg) throws Exception {
// 簡單地打印出server接收到的訊息
System.out.println(msg.toString());
}
}
編寫完成之後,把編解碼器
和邏輯控制器
放入初始化元件中:
public class NettyLuckInitializer extends ChannelInitializer<SocketChannel> {
private static final LuckEncoder ENCODER = new LuckEncoder();
@Override
protected void initChannel(SocketChannel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
// 新增編解碼器, 由於ByteToMessageDecoder的子類無法使用@Sharable註解,
// 這裡必須給每個Handler都新增一個獨立的Decoder.
pipeline.addLast(ENCODER);
pipeline.addLast(new LuckDecoder());
// 新增邏輯控制層
pipeline.addLast(new NettyLuckHandler());
}
}
編寫一個服務端啟動類:
public class NettyLuckServer {
// 指定埠號
private static final int PORT = 8888;
public static void main(String args[]) throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
// 指定socket的一些屬性
serverBootstrap.option(ChannelOption.SO_BACKLOG, 1024);
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) // 指定是一個NIO連線通道
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new NettyLuckInitializer());
// 繫結對應的埠號,並啟動開始監聽埠上的連線
Channel ch = serverBootstrap.bind(PORT).sync().channel();
System.out.printf("luck協議啟動地址:127.0.0.1:%d/\n", PORT);
// 等待關閉,同步埠
ch.closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
光有服務端並不行,沒法測試我們的server是不是成功了。所以我們還需要編寫一個客戶端程式。
LuckClientInitializer.java
public class LuckClientInitializer extends ChannelInitializer<SocketChannel> {
private static final LuckEncoder ENCODER = new LuckEncoder();
@Override
protected void initChannel(SocketChannel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
// 新增編解碼器, 由於ByteToMessageDecoder的子類無法使用@Sharable註解,
// 這裡必須給每個Handler都新增一個獨立的Decoder.
pipeline.addLast(ENCODER);
pipeline.addLast(new LuckDecoder());
// and then business logic.
pipeline.addLast(new NettyLuckClientHandler());
}
}
LuckClientHandler.java
public class LuckClientHandler extends SimpleChannelInboundHandler<LuckMessage> {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, LuckMessage message) throws Exception {
System.out.println(message);
}
}
LuckClient.java
public class LuckClient {
public static void main(String args[]) throws InterruptedException {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.handler(new NettyLuckInitializer());
// Start the connection attempt.
Channel ch = b.connect("127.0.0.1", 8888).sync().channel();
int version = 1;
String sessionId = UUID.randomUUID().toString();
String content = "I'm the luck protocol!";
LuckHeader header = new LuckHeader(version, content.length(), sessionId);
LuckMessage message = new LuckMessage(header, content);
ch.writeAndFlush(message);
ch.close();
} finally {
group.shutdownGracefully();
}
}
}
先執行NettyLuckServer.java
,然後再去執行LuckClient.java
可以看到控制的輸出
四月 15, 2016 11:31:34 下午 io.netty.handler.logging.LoggingHandler channelRegistered
資訊: [id: 0x92534c29] REGISTERED
四月 15, 2016 11:31:34 下午 io.netty.handler.logging.LoggingHandler bind
資訊: [id: 0x92534c29] BIND(0.0.0.0/0.0.0.0:8888)
luck協議啟動地址:127.0.0.1:8888
四月 15, 2016 11:31:34 下午 io.netty.handler.logging.LoggingHandler channelActive
資訊: [id: 0x92534c29, L:/0:0:0:0:0:0:0:0:8888] ACTIVE
四月 15, 2016 11:31:54 下午 io.netty.handler.logging.LoggingHandler logMessage
資訊: [id: 0x92534c29, L:/0:0:0:0:0:0:0:0:8888] RECEIVED: [id: 0x67a91c6b, L:/127.0.0.1:8888 - R:/127.0.0.1:53585]
[version=1,contentLength=22,sessionId=cff7b3ea-1188-4314-abaa-de04db32d39f,content=I'm the luck protocol!]
服務端順利解析出了我們自定義的luck
協議。