Netty實戰手冊(三)
阿新 • • 發佈:2018-12-15
上篇已經講述瞭如何搭建基礎的服務結構,現在瞭解下如何完成與客戶端的通訊模型。
首先,在HandlerService中,處理接收來自客戶端的訊息:
@Override public void receive( ChannelHandlerContext _ctx , Object _obj ) { ISupportCommand< ChannelHandlerContext , ByteBuf > cmd = CommonContextHolder.getBean( ISupportCommand.CMD_SERVICE ); cmd.docommand( _ctx , ( ByteBuf ) _obj ); }
然後在CommandService中處理訊息,這裡的訊息協議是事先與客戶端約定的結構、順序等。當然你可以把ByteBuf替換為Protobuff來使用。這裡不詳細描述Protobuff的作用和用法了,想了解的人可以自行了解下Protobuff或者Protostuff在Netty中的用法。
基本上,針對協議的處理形式就是這樣了。實際應用時,會定義不同的訊息內容來完成對應的客戶端請求。由於ByteBuf的讀寫比較繁瑣,且不直觀,我們會使用上述的Protobuff來代替它來作為訊息傳遞的物件。@Override public void docommand( ChannelHandlerContext _ctx , ByteBuf _buf ) { int cmd = _buf.readInt();//假設我們只約定了一個int型別的客戶端引數 System.out.println( "CommandService do command: CMD=[" + cmd + "]" ); //什麼事情都沒有做,將收到的內容返回客戶端。 ByteBuf buf = _ctx.alloc().buffer(); buf.writeInt( cmd ); _ctx.writeAndFlush( buf ); }
在使用JSTS是,會要求你使用自己的解碼類來實現針對資料協議的解碼粘碼工作,這裡我沒有做相關操作,僅提供了一個訊息解碼類來完成解碼的基本工作:
package com.jees.demo.protos; import java.nio.ByteOrder; import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Component; import com.jees.jsts.netty.support.*; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; @Component( value = INettyHandler.NETTY_DECODER ) // 這裡會注入自己的解碼類到上層的服務中去 @Scope( value = INettyHandler.NETTY_CREATOR ) // 這裡是因為Netty的Decoder物件要求每次都必須使用新的物件 public class MessageDecoder extends AbsNettyDecoder { // 網路位元組序,預設為大端位元組序 public static final int MAX_FRAME_LENGTH = 1024 * 4; // 訊息中長度欄位佔用的位元組數 public static final int LENGTH_FIELD_LENGTH = 4; // 訊息中長度欄位偏移的位元組數 private static final int LENGTH_FIELD_OFFSET = 0; // 該欄位加長度欄位等於資料幀的長度 private static final int LENGTH_ADJUSTMENT = 0; // 從資料幀中跳過的位元組數 private static final int INITIAL_BYTES_TO_STRIP = 0; public MessageDecoder() { super( ByteOrder.LITTLE_ENDIAN , MAX_FRAME_LENGTH , LENGTH_FIELD_OFFSET , LENGTH_FIELD_LENGTH , LENGTH_ADJUSTMENT , INITIAL_BYTES_TO_STRIP , true ); } @Override protected ByteBuf decode( ChannelHandlerContext _ctx , ByteBuf _buf ) throws Exception { return _buf;//這裡什麼解析工作都沒有,直接通知給HandlerService處理。其實應該做解碼的工作的,否則粘包不能正確處理。 } }
下面是完整的客戶端程式碼:
package com.jees.demo.client ;
import java.util.Random;
import io.netty.bootstrap.Bootstrap ;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture ;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOption ;
import io.netty.channel.EventLoopGroup ;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup ;
import io.netty.channel.socket.nio.NioSocketChannel ;
public class TestNettyClient {
private EventLoopGroup worker ;
private Bootstrap booter ;
private ChannelFuture future ;
public TestNettyClient( int _cmd ) {
try {
String host = "localhost" ;
int port = 1000 ;
worker = new NioEventLoopGroup() ;
booter = new Bootstrap() ;
booter.group( worker ).channel( NioSocketChannel.class ).option( ChannelOption.SO_KEEPALIVE , true ).handler( new SimpleChannelInboundHandler< ByteBuf >() {
@Override
protected void channelRead0( ChannelHandlerContext ctx , ByteBuf buf ) throws Exception {
System.out.println( "TestNettyClient read sever msg: MSG=[" + buf.readInt() + "]" );
}
} );
future = booter.connect( host , port ).sync() ;
Channel chl = future.channel();
ByteBuf buf = chl.alloc().buffer();
buf.writeInt( _cmd );
chl.writeAndFlush( buf );
future.channel().closeFuture().sync() ;
} catch ( Exception e ) {
e.printStackTrace() ;
} finally {
worker.shutdownGracefully() ;
}
}
public static void main( String[] args ) {
new TestNettyClient( new Random().nextInt() );
}
}
最後,我們將jees-core-dispatcher.xml中的最後一行改為:
<context:component-scan base-package="com.jees.demo.*" /> <!-- 讓Spring可以掃描到相關類 -->
下面是伺服器和客戶端的訊息輸出截圖:
想了解更多訊息的朋友,可以加入QQ群8802330,參與討論。我會不定期更新相關內容的原始碼,供各位學習和使用。
下期我將加入JDBS來進行資料的處理,喜歡的人請關注我的個人部落格或者碼雲:https://gitee.com/aiyoyoyo/或者Github:https://github.com/aiyoyoyo。