Netty實現自定義協議
關於協議,使用最為廣泛的是HTTP協議,但是在一些服務互動領域,其使用則相對較少,主要原因有三方面:
- HTTP協議會攜帶諸如header和cookie等資訊,其本身對位元組的利用率也較低,這使得HTTP協議比較臃腫,在承載相同資訊的情況下,HTTP協議將需要傳送更多的資料包;
- HTTP協議是基於TCP的短連線,其在每次請求和響應的時候都需要進行三次握手和四次揮手,由於服務的互動設計一般都要求能夠承載高併發的請求,因而HTTP協議這種頻繁的握手和揮手動作會極大的影響服務之間互動的效率;
- 服務之間往往有一些根據其自身業務特性所獨有的需求,而HTTP協議無法很好的服務於這些業務需求。
基於上面的原因,一般的服務之間進行互動時都會使用自定義協議,常見的框架,諸如dubbo,kafka,zookeeper都實現了符合其自身業務需求的協議,本文主要講解如何使用Netty實現一款自定義的協議。
1. 協議規定
所謂協議,其本質其實就是定義了一個將資料轉換為位元組,或者將位元組轉換為資料的一個規範。一款自定義協議,其一般包含兩個部分:訊息頭和訊息體。訊息頭的長度一般是固定的,或者說是可確定的,其定義了此次訊息的一些公有資訊,比如當前服務的版本,訊息的sessionId,訊息的型別等等;訊息體則主要是此次訊息所需要傳送的內容,一般在訊息頭的最後一定的位元組中儲存了當前訊息的訊息體的長度。下面是我們為當前自定義協議所做的一些規定:
名稱 | 欄位 | 位元組數 | 描述 |
---|---|---|---|
魔數 | magicNumber | 4 | 一個固定的數字,一般用於指定當前位元組序列是當前型別的協議,比如Java生成的class檔案起始就使用0xCAFEBABE作為其識別符號,對於本服務,這裡將其定義為0x1314 |
主版本號 | mainVersion | 1 | 當前伺服器版本程式碼的主版本號 |
次版本號 | subVersion | 1 | 當前伺服器版本的次版本號 |
修訂版本號 | modifyVersion | 1 | 當前伺服器版本的修訂版本號 |
會話id | sessionId | 8 | 當前請求的會話id,用於將請求和響應串聯到一起 |
訊息型別 | messageType | 1 | 請求:1,表示當前是一個請求訊息;響應:2,表示當前是一個響應訊息;Ping:3,表示當前是一個Ping訊息;Pong:4,表示當前是一個Pong訊息;Empty:5,表示當前是一個空訊息,該訊息不會寫入資料管道中; |
附加資料 | attachments | 不定 | 附加訊息是字串型別的鍵值對來表示的,這裡首先使用2個位元組記錄鍵值對的個數,然後對於每個鍵和值,都首先使用4個位元組記錄其長度,然後是具體的資料,其形式如:鍵值對個數+鍵長度+鍵資料+值長度+值資料... |
訊息體長度 | length | 4位元組 | 記錄了訊息體的長度 |
訊息體 | body | 不定 | 訊息體,服務之間互動所傳送或接收的資料,其長度有前面的length指定 |
上述協議定義中,我們除了定義常用的請求和響應訊息型別以外,還定義了Ping和Pong訊息。Ping和Pong訊息的作用一般是,在服務處於閒置狀態達到一定時長,比如2s時,客戶端服務會向服務端傳送一個Ping訊息,則會返回一個Pong訊息,這樣才表示客戶端與服務端的連線是完好的。如果服務端沒有返回相應的訊息,客戶端就會關閉與服務端的連線或者是重新建立與服務端的連線。這樣的優點在於可以防止突然會產生的客戶端與服務端的大量互動。
2. 協議實現
通過上面的定義其實我們可以發現,所謂協議,就是定義了一個規範,基於這個規範,我們可以將訊息轉換為相應的位元組流,然後經由TCP傳輸到目標服務,目標服務則也基於該規範將位元組流轉換為相應的訊息,這樣就達到了相互交流的目的。這裡面最重要的主要是如何基於該規範將訊息轉換為位元組流或者將位元組流轉換為訊息。這一方面,Netty為我們提供了ByteToMessageDecoder
和MessageToByteEncoder
用於進行訊息和位元組流的相互轉換。首先我們定義瞭如下訊息實體:
public class Message {
private int magicNumber;
private byte mainVersion;
private byte subVersion;
private byte modifyVersion;
private String sessionId;
private MessageTypeEnum messageType;
private Map<String, String> attachments = new HashMap<>();
private String body;
public Map<String, String> getAttachments() {
return Collections.unmodifiableMap(attachments);
}
public void setAttachments(Map<String, String> attachments) {
this.attachments.clear();
if (null != attachments) {
this.attachments.putAll(attachments);
}
}
public void addAttachment(String key, String value) {
attachments.put(key, value);
}
// getter and setter...
}
上述訊息中,我們將協議中所規定的各個欄位都進行了定義,並且定義了一個標誌訊息型別的列舉MessageTypeEnum,如下是該列舉的原始碼:
public enum MessageTypeEnum {
REQUEST((byte)1), RESPONSE((byte)2), PING((byte)3), PONG((byte)4), EMPTY((byte)5);
private byte type;
MessageTypeEnum(byte type) {
this.type = type;
}
public int getType() {
return type;
}
public static MessageTypeEnum get(byte type) {
for (MessageTypeEnum value : values()) {
if (value.type == type) {
return value;
}
}
throw new RuntimeException("unsupported type: " + type);
}
}
上述主要是定義了描述自定義協議相關的實體屬性,對於訊息的編碼,本質就是依據上述協議方式將訊息實體轉換為位元組流,如下是轉換位元組流的程式碼:
public class MessageEncoder extends MessageToByteEncoder<Message> {
@Override
protected void encode(ChannelHandlerContext ctx, Message message, ByteBuf out) {
// 這裡會判斷訊息型別是不是EMPTY型別,如果是EMPTY型別,則表示當前訊息不需要寫入到管道中
if (message.getMessageType() != MessageTypeEnum.EMPTY) {
out.writeInt(Constants.MAGIC_NUMBER); // 寫入當前的魔數
out.writeByte(Constants.MAIN_VERSION); // 寫入當前的主版本號
out.writeByte(Constants.SUB_VERSION); // 寫入當前的次版本號
out.writeByte(Constants.MODIFY_VERSION); // 寫入當前的修訂版本號
if (!StringUtils.hasText(message.getSessionId())) {
// 生成一個sessionId,並將其寫入到位元組序列中
String sessionId = SessionIdGenerator.generate();
message.setSessionId(sessionId);
out.writeCharSequence(sessionId, Charset.defaultCharset());
}
out.writeByte(message.getMessageType().getType()); // 寫入當前訊息的型別
out.writeShort(message.getAttachments().size()); // 寫入當前訊息的附加引數數量
message.getAttachments().forEach((key, value) -> {
Charset charset = Charset.defaultCharset();
out.writeInt(key.length()); // 寫入鍵的長度
out.writeCharSequence(key, charset); // 寫入鍵資料
out.writeInt(value.length()); // 希爾值的長度
out.writeCharSequence(value, charset); // 寫入值資料
});
if (null == message.getBody()) {
out.writeInt(0); // 如果訊息體為空,則寫入0,表示訊息體長度為0
} else {
out.writeInt(message.getBody().length());
out.writeCharSequence(message.getBody(), Charset.defaultCharset());
}
}
}
}
對於訊息的解碼,其過程與上面的訊息編碼方式基本一致,主要是基於協議所規定的將位元組流資料轉換為訊息實體資料。如下是其轉換過程:
public class MessageDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> out) throws Exception {
Message message = new Message();
message.setMagicNumber(byteBuf.readInt()); // 讀取魔數
message.setMainVersion(byteBuf.readByte()); // 讀取主版本號
message.setSubVersion(byteBuf.readByte()); // 讀取次版本號
message.setModifyVersion(byteBuf.readByte()); // 讀取修訂版本號
CharSequence sessionId = byteBuf.readCharSequence(
Constants.SESSION_ID_LENGTH, Charset.defaultCharset()); // 讀取sessionId
message.setSessionId((String)sessionId);
message.setMessageType(MessageTypeEnum.get(byteBuf.readByte())); // 讀取當前的訊息型別
short attachmentSize = byteBuf.readShort(); // 讀取附件長度
for (short i = 0; i < attachmentSize; i++) {
int keyLength = byteBuf.readInt(); // 讀取鍵長度和資料
CharSequence key = byteBuf.readCharSequence(keyLength, Charset.defaultCharset());
int valueLength = byteBuf.readInt(); // 讀取值長度和資料
CharSequence value = byteBuf.readCharSequence(valueLength, Charset.defaultCharset());
message.addAttachment(key.toString(), value.toString());
}
int bodyLength = byteBuf.readInt(); // 讀取訊息體長度和資料
CharSequence body = byteBuf.readCharSequence(bodyLength, Charset.defaultCharset());
message.setBody(body.toString());
out.add(message);
}
}
如此,我們自定義訊息與位元組流的相互轉換工作已經完成。對於訊息的處理,主要是要根據訊息的不同型別,對訊息進行相應的處理,比如對於request型別訊息,要寫入響應資料,對於ping訊息,要寫入pong訊息作為迴應。下面我們通過定義Netty handler的方式實現對訊息的處理:
// 服務端訊息處理器
public class ServerMessageHandler extends SimpleChannelInboundHandler<Message> {
// 獲取一個訊息處理器工廠類例項
private MessageResolverFactory resolverFactory = MessageResolverFactory.getInstance();
@Override
protected void channelRead0(ChannelHandlerContext ctx, Message message) throws Exception {
Resolver resolver = resolverFactory.getMessageResolver(message); // 獲取訊息處理器
Message result = resolver.resolve(message); // 對訊息進行處理並獲取響應資料
ctx.writeAndFlush(result); // 將響應資料寫入到處理器中
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
resolverFactory.registerResolver(new RequestMessageResolver()); // 註冊request訊息處理器
resolverFactory.registerResolver(new ResponseMessageResolver());// 註冊response訊息處理器
resolverFactory.registerResolver(new PingMessageResolver()); // 註冊ping訊息處理器
resolverFactory.registerResolver(new PongMessageResolver()); // 註冊pong訊息處理器
}
}
// 客戶端訊息處理器
public class ClientMessageHandler extends ServerMessageHandler {
// 建立一個執行緒,模擬使用者傳送訊息
private ExecutorService executor = Executors.newSingleThreadExecutor();
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 對於客戶端,在建立連線之後,在一個獨立執行緒中模擬使用者傳送資料給服務端
executor.execute(new MessageSender(ctx));
}
/**
* 這裡userEventTriggered()主要是在一些使用者事件觸發時被呼叫,這裡我們定義的事件是進行心跳檢測的
* ping和pong訊息,當前觸發器會在指定的觸發器指定的時間返回內如果客戶端沒有被讀取訊息或者沒有寫入
* 訊息到管道,則會觸發當前方法
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state() == IdleState.READER_IDLE) {
// 一定時間內,當前服務沒有發生讀取事件,也即沒有訊息傳送到當前服務來時,
// 其會發送一個Ping訊息到伺服器,以等待其響應Pong訊息
Message message = new Message();
message.setMessageType(MessageTypeEnum.PING);
ctx.writeAndFlush(message);
} else if (event.state() == IdleState.WRITER_IDLE) {
// 如果當前服務在指定時間內沒有寫入訊息到管道,則關閉當前管道
ctx.close();
}
}
}
private static final class MessageSender implements Runnable {
private static final AtomicLong counter = new AtomicLong(1);
private volatile ChannelHandlerContext ctx;
public MessageSender(ChannelHandlerContext ctx) {
this.ctx = ctx;
}
@Override
public void run() {
try {
while (true) {
// 模擬隨機發送訊息的過程
TimeUnit.SECONDS.sleep(new Random().nextInt(3));
Message message = new Message();
message.setMessageType(MessageTypeEnum.REQUEST);
message.setBody("this is my " + counter.getAndIncrement() + " message.");
message.addAttachment("name", "xufeng");
ctx.writeAndFlush(message);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
上述程式碼中,由於客戶端和服務端需要處理的訊息型別是完全一樣的,因而客戶端處理類繼承了服務端處理類。但是對於客戶端而言,其還需要定時向服務端傳送心跳訊息,用於檢測客戶端與伺服器的連線是否健在,因而客戶端還會實現userEventTriggered()方法,在該方法中定時向伺服器傳送心跳訊息。userEventTriggered()方法主要是在客戶端被閒置一定時間後,其會根據其讀取或者寫入訊息的限制時長來選擇性的觸發讀取或寫入事件。
上述實現中,我們看到,對於具體型別訊息的處理,我們是通過一個工廠類來獲取對應的訊息處理器,然後處理相應的訊息,下面我們該工廠類的程式碼:
public final class MessageResolverFactory {
// 建立一個工廠類例項
private static final MessageResolverFactory resolverFactory = new MessageResolverFactory();
private static final List<Resolver> resolvers = new CopyOnWriteArrayList<>();
private MessageResolverFactory() {}
// 使用單例模式例項化當前工廠類例項
public static MessageResolverFactory getInstance() {
return resolverFactory;
}
public void registerResolver(Resolver resolver) {
resolvers.add(resolver);
}
// 根據解碼後的訊息,在工廠類處理器中查詢可以處理當前訊息的處理器
public Resolver getMessageResolver(Message message) {
for (Resolver resolver : resolvers) {
if (resolver.support(message)) {
return resolver;
}
}
throw new RuntimeException("cannot find resolver, message type: " + message.getMessageType());
}
}
上述工廠類比較簡單,主要就是通過單例模式獲取一個工廠類例項,然後提供一個根據具體訊息來查詢其對應的處理器的方法。下面我們來看看各個訊息處理器的程式碼:
// request型別的訊息
public class RequestMessageResolver implements Resolver {
private static final AtomicInteger counter = new AtomicInteger(1);
@Override
public boolean support(Message message) {
return message.getMessageType() == MessageTypeEnum.REQUEST;
}
@Override
public Message resolve(Message message) {
// 接收到request訊息之後,對訊息進行處理,這裡主要是將其打印出來
int index = counter.getAndIncrement();
System.out.println("[trx: " + message.getSessionId() + "]"
+ index + ". receive request: " + message.getBody());
System.out.println("[trx: " + message.getSessionId() + "]"
+ index + ". attachments: " + message.getAttachments());
// 處理完成後,生成一個響應訊息返回
Message response = new Message();
response.setMessageType(MessageTypeEnum.RESPONSE);
response.setBody("nice to meet you too!");
response.addAttachment("name", "xufeng");
response.addAttachment("hometown", "wuhan");
return response;
}
}
// 響應訊息處理器
public class ResponseMessageResolver implements Resolver {
private static final AtomicInteger counter = new AtomicInteger(1);
@Override
public boolean support(Message message) {
return message.getMessageType() == MessageTypeEnum.RESPONSE;
}
@Override
public Message resolve(Message message) {
// 接收到對方服務的響應訊息之後,對響應訊息進行處理,這裡主要是將其打印出來
int index = counter.getAndIncrement();
System.out.println("[trx: " + message.getSessionId() + "]"
+ index + ". receive response: " + message.getBody());
System.out.println("[trx: " + message.getSessionId() + "]"
+ index + ". attachments: " + message.getAttachments());
// 響應訊息不需要向對方服務再發送響應,因而這裡寫入一個空訊息
Message empty = new Message();
empty.setMessageType(MessageTypeEnum.EMPTY);
return empty;
}
}
// ping訊息處理器
public class PingMessageResolver implements Resolver {
@Override
public boolean support(Message message) {
return message.getMessageType() == MessageTypeEnum.PING;
}
@Override
public Message resolve(Message message) {
// 接收到ping訊息後,返回一個pong訊息返回
System.out.println("receive ping message: " + System.currentTimeMillis());
Message pong = new Message();
pong.setMessageType(MessageTypeEnum.PONG);
return pong;
}
}
// pong訊息處理器
public class PongMessageResolver implements Resolver {
@Override
public boolean support(Message message) {
return message.getMessageType() == MessageTypeEnum.PONG;
}
@Override
public Message resolve(Message message) {
// 接收到pong訊息後,不需要進行處理,直接返回一個空的message
System.out.println("receive pong message: " + System.currentTimeMillis());
Message empty = new Message();
empty.setMessageType(MessageTypeEnum.EMPTY);
return empty;
}
}
如此,對於自定義協議的訊息處理過程已經完成,下面則是使用用Netty實現的客戶端與服務端程式碼:
// 服務端
public class Server {
public static void main(String[] args) {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 新增用於處理粘包和拆包問題的處理器
pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4));
pipeline.addLast(new LengthFieldPrepender(4));
// 新增自定義協議訊息的編碼和解碼處理器
pipeline.addLast(new MessageEncoder());
pipeline.addLast(new MessageDecoder());
// 新增具體的訊息處理器
pipeline.addLast(new ServerMessageHandler());
}
});
ChannelFuture future = bootstrap.bind(8585).sync();
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
public class Client {
public static void main(String[] args) {
NioEventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
try {
bootstrap.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, Boolean.TRUE)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 新增用於解決粘包和拆包問題的處理器
pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4));
pipeline.addLast(new LengthFieldPrepender(4));
// 新增用於進行心跳檢測的處理器
pipeline.addLast(new IdleStateHandler(1, 2, 0));
// 新增用於根據自定義協議將訊息與位元組流進行相互轉換的處理器
pipeline.addLast(new MessageEncoder());
pipeline.addLast(new MessageDecoder());
// 新增客戶端訊息處理器
pipeline.addLast(new ClientMessageHandler());
}
});
ChannelFuture future = bootstrap.connect("127.0.0.1", 8585).sync();
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
group.shutdownGracefully();
}
}
}
執行上述程式碼之後,我們可以看到客戶端和伺服器分別列印瞭如下資料:
// 客戶端
receive pong message: 1555123429356
[trx: d05024d2]1. receive response: nice to meet you too!
[trx: d05024d2]1. attachments: {hometown=wuhan, name=xufeng}
[trx: 66ee1438]2. receive response: nice to meet you too!
// 伺服器
receive ping message: 1555123432279
[trx: f582444f]4. receive request: this is my 4 message.
[trx: f582444f]4. attachments: {name=xufeng}
3. 小結
本文首先將自定義協議與HTTP協議進行了對比,闡述了自定義協議的一些優點。然後定義了一份自定義協議,並且講解了協議中各個位元組的含義。最後通過Netty對自定義協議進行了實現,並且實現了基於自定義協