高效能NIO框架Netty-整合Protobuf高效能資料傳輸
前言
上篇文章我們整合了kryo來進行資料的傳輸編解碼,今天將繼續學習使用Protobuf來編解碼。Netty對Protobuf的支援比較好,還提供了Protobuf的編解碼器,非常方便。
Protobuf介紹
Protobuf是google開源的專案,全稱 Google Protocol Buffers,特點如下:
- 支援跨平臺多語言,支援目前絕大多數語言例如C++、C#、Java、pthyon等
- 高效能,可靠性高,google出品有保障
- 使用protobuf編譯器能自動生成程式碼,但需要編寫proto檔案,需要一點學習成本
Protobuf使用
Protobuf是將類的定義使用.proto檔案進行描述,然後通過protoc.exe編譯器,根據.proto自動生成.java檔案,然後將生成的.java檔案拷貝到專案中使用即可。
下載完成之後放到磁碟上進行解壓,可以將protoc.exe配置到環境變數中去,這樣就可以直接在cmd命令列中使用protoc命令,也可以不用配置,直接到解壓後的protoc\bin目錄下進行檔案的編譯。
下面我們基於之前的Message物件來構建一個Message.proto檔案。
syntax = "proto3";
option java_outer_classname = "MessageProto";
message Message {
string id = 1;
string content = 2;
}
syntax 宣告可以選擇protobuf的編譯器版本(v2和v3)
- syntax=”proto2”;選擇2版本
- syntax=”proto3”;選擇3版本
option java_outer_classname=”MessageProto”用來指定生成的java類的類名。
message相當於c語言中的struct語句,表示定義一個資訊,其實也就是類。
message裡面的資訊就是我們要傳輸的欄位了,子段後面需要有一個數字編號,從1開始遞增
.proto檔案定好之後就可以用編譯器進行編譯,輸出我們要使用的Java類,我們這邊不配置環境變數,直接到解壓包的bin目錄下進行操作
首先將我們的Message.proto檔案複製到bin目錄下,然後在這個目錄下開啟CMD視窗,輸入下面的命令進行編譯操作:
protoc ./Message.proto --java_out=./
–java_out是輸出目錄,我們就輸出到當前目錄下,執行完之後可以看到bin目錄下多了一個MessageProto.java檔案,把這個檔案複製到專案中使用即可。
Nettty整合Protobuf
首先加入Protobuf的Maven依賴
<!-- https://mvnrepository.com/artifact/com.google.protobuf/protobuf-java -->
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.5.1</version>
</dependency>
建立一個Proto的Server資料處理類,之前的已經不能用了,因為現在傳輸的物件是MessageProto這個物件了
public class ServerPoHandlerProto extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
MessageProto.Message message = (MessageProto.Message) msg;
if (ConnectionPool.getChannel(message.getId()) == null) {
ConnectionPool.putChannel(message.getId(), ctx);
}
System.err.println("server:" + message.getId());
ctx.writeAndFlush(message);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
改造服務端啟動程式碼,增加protobuf編解碼器,是Netty自帶的,不用我們去自定義了
public class ImServer {
public void run(int port) {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
// 實體類傳輸資料,protobuf序列化
ch.pipeline().addLast("decoder",
new ProtobufDecoder(MessageProto.Message.getDefaultInstance()));
ch.pipeline().addLast("encoder",
new ProtobufEncoder());
ch.pipeline().addLast(new ServerPoHandlerProto());
}
})
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
try {
ChannelFuture f = bootstrap.bind(port).sync();
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
}
服務端改造完了,下面需要把客戶的的Handler和編解碼器也改成protobuf的就行了,廢話不多說,直接上程式碼
public class ImConnection {
private Channel channel;
public Channel connect(String host, int port) {
doConnect(host, port);
return this.channel;
}
private void doConnect(String host, int port) {
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(workerGroup);
b.channel(NioSocketChannel.class);
b.option(ChannelOption.SO_KEEPALIVE, true);
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
// 實體類傳輸資料,protobuf序列化
ch.pipeline().addLast("decoder",
new ProtobufDecoder(MessageProto.Message.getDefaultInstance()));
ch.pipeline().addLast("encoder",
new ProtobufEncoder());
ch.pipeline().addLast(new ClientPoHandlerProto());
}
});
ChannelFuture f = b.connect(host, port).sync();
channel = f.channel();
} catch(Exception e) {
e.printStackTrace();
}
}
}
客戶的資料處理類:
public class ClientPoHandlerProto extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
MessageProto.Message message = (MessageProto.Message) msg;
System.out.println("client:" + message.getContent());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
最後一步就開始測試了,需要將客戶的傳送訊息的地方改成MessageProto.Message物件,程式碼如下:
/**
* IM 客戶端啟動入口
* @author yinjihuan
*/
public class ImClientApp {
public static void main(String[] args) {
String host = "127.0.0.1";
int port = 2222;
Channel channel = new ImConnection().connect(host, port);
String id = UUID.randomUUID().toString().replaceAll("-", "");
// protobuf
MessageProto.Message message = MessageProto.Message.newBuilder().setId(id).setContent("hello yinjihuan").build();
channel.writeAndFlush(message);
}
}
更多技術分享請關注微信公眾號:猿天地