網際網路架構(8):Socket網路通訊程式設計--Netty
三、Socket網路通訊程式設計–Netty
Netty是一個提供非同步事件驅動的網路應用框架,用以快速開發高效能、高可靠性的網路伺服器和客戶端程式。
換句話說,Netty是一個NIO框架,使用它可以簡單快速地開發網路應用程式,比如客戶端和服務端的協議。Netty大大簡化了網路程式的開發過程比如TCP和UDP的 Socket的開發。
“快速和簡單”並不意味著應用程式會有難維護和效能低的問題,Netty是一個精心設計的框架,它從許多協議的實現中吸收了很多的經驗比如FTP、SMTP、HTTP、許多二進位制和基於文字的傳統協議,Netty在不降低開發效率、效能、穩定性、靈活性情況下,成功地找到了解決方案。
有一些使用者可能已經發現其他的一些網路框架也聲稱自己有同樣的優勢,所以你可能會問是Netty和它們的不同之處。答案就是Netty的哲學設計理念。Netty從第一天開始就為使用者提供了使用者體驗最好的API以及實現設計。正是因為Netty的設計理念,才讓我們得以輕鬆地閱讀本指南並使用Netty。
1、Netty簡單應用
Server.java
import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; public class Server { private int port; public Server(int port) { // TODO Auto-generated constructor stub this.port = port; } public void run(){ //用來接收連線事件組 EventLoopGroup bossGroup = new NioEventLoopGroup(); //用來處理接收到的連線的事件處理組 EventLoopGroup workerGroup = new NioEventLoopGroup(); try { //server配置輔助類 ServerBootstrap serverBootstrap = new ServerBootstrap(); //將連線接收組與事件處理組連線,當server的bossGroup接收到連線之後就會交給workerGroup進行處理 serverBootstrap.group(bossGroup, workerGroup) //指定接收的Channel型別 .channel(NioServerSocketChannel.class) //handler在初始化時就會執行,而childHandler會在客戶端成功connect後才執行,這是兩者的區別。 .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel channel) throws Exception { // TODO Auto-generated method stub channel.pipeline().addLast(new ServerHandler()); } }) //設定TCP緩衝區的大小 .option(ChannelOption.SO_BACKLOG, 128) //設定傳送緩衝大小 .option(ChannelOption.SO_SNDBUF, 32*1024) //設定接收緩衝大小 .option(ChannelOption.SO_RCVBUF, 32*1024) //設定是否保持連線 .childOption(ChannelOption.SO_KEEPALIVE, true); //注意,此處option()是提供給NioServerSocketChannel用來接收進來的連線,也就是boss執行緒。 //childOption()是提供給由父管道ServerChannel接收到的連線,也就是worker執行緒,在這個例子中也是NioServerSocketChannel。 //非同步繫結埠,可以繫結多個埠 ChannelFuture channelFuture = serverBootstrap.bind(this.port).sync(); ChannelFuture channelFuture2 = serverBootstrap.bind(8766).sync(); System.out.println("Server服務已經啟動."); //非同步檢查管道是否關閉 channelFuture.channel().closeFuture().sync(); channelFuture2.channel().closeFuture().sync(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); }finally{ workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } public static void main(String[] args) { Server server = new Server(8765); server.run(); } }
ServerHandler.java
import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import io.netty.util.ReferenceCountUtil; import io.netty.util.concurrent.GenericFutureListener; import io.netty.util.concurrent.Promise; import io.netty.util.concurrent.PromiseAggregator; public class ServerHandler extends ChannelHandlerAdapter{ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { // TODO Auto-generated method stub System.out.println("Channel Active..."); super.channelActive(ctx); } @Override public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception { // TODO Auto-generated method stub System.out.println("Channel Read..."); try{ ByteBuf buf = (ByteBuf)msg; byte[] msgByte = new byte[buf.readableBytes()]; buf.readBytes(msgByte); System.out.println("Server Handler received message : " + new String(msgByte , "utf-8")); ChannelFuture writeFlush = ctx.writeAndFlush(Unpooled.copiedBuffer(("hi, client.").getBytes())); //給writeFlush新增監聽,當資料傳送完畢之後,呼叫該方法 writeFlush.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture arg0) throws Exception { // TODO Auto-generated method stub System.out.println(arg0); ctx.close(); } }); writeFlush.addListener(ChannelFutureListener.CLOSE); }finally{ ReferenceCountUtil.release(msg); } } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { // TODO Auto-generated method stub System.out.println("Channel Read Complete..."); super.channelReadComplete(ctx); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { // TODO Auto-generated method stub System.out.println("Exception Caught..."); super.exceptionCaught(ctx, cause); } }
Client.java
import io.netty.bootstrap.Bootstrap; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; public class Client { private String ip; private int port; public Client(String ip, int port) { // TODO Auto-generated constructor stub this.ip = ip; this.port = port; } public void run(){ //客戶端用來連線服務端的連線組 EventLoopGroup workerGroup = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(workerGroup) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel sChannel) throws Exception { // TODO Auto-generated method stub sChannel.pipeline().addLast(new ClientHandler()); } }) .option(ChannelOption.SO_KEEPALIVE, true); //可以進多個埠同時連線 ChannelFuture future = bootstrap.connect(this.ip, this.port).sync(); ChannelFuture future2 = bootstrap.connect(this.ip, 8766).sync(); future.channel().writeAndFlush(Unpooled.copiedBuffer(("Hello Server.").getBytes())); future2.channel().writeAndFlush(Unpooled.copiedBuffer(("Hello Server8766.").getBytes())); future.channel().closeFuture().sync(); future2.channel().closeFuture().sync(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); }finally{ workerGroup.shutdownGracefully(); } } public static void main(String[] args) { Client client = new Client("127.0.0.1", 8765); client.run(); } }
ClinentHandler.java
import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; public class ClientHandler extends ChannelHandlerAdapter{ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // TODO Auto-generated method stub try{ ByteBuf buf = (ByteBuf)msg; byte[] msgByte = new byte[buf.readableBytes()]; buf.readBytes(msgByte); System.out.println("ClientHandler received message : " + new String(msgByte, "utf-8")); }finally{ } } }
建立Netty通訊服務的四個步驟:
- 1 建立2個NIO執行緒組,一個專門用於網路事件處理(接受客戶端的連線),另一個則進行網路通訊讀寫
- 2 建立一個ServerBootstrap物件,配置Netty的一系列引數,例如接受傳出資料的快取大小等等。
- 3 建立一個實際處理資料的類Channellnitializer,進行初始化的準備工作,比如設定接收或傳出資料的字符集、格式、以及實際處理資料的介面
- 4 繫結埠,執行同步阻塞方法等待伺服器端啟動即可。
注意:在Netty中,預設傳輸都是以ByteBuf進行傳輸的,如果要使用字串或者其他的格式,需要在配置ChannelInitializer的時候,需要配置相應的編碼器和解碼器,例如如果是要直接傳輸String型別,那麼則需要使用String型別的編碼器和解碼器(Netty API已經提供)。注意,配置的時候客戶端和伺服器端最好都配置,否則沒有配置的一方獲取的仍然還是ByteBuf型別的結果
//3 建立一個輔助類Bootstrap,就是對我們的Server進行一系列的配置
ServerBootstrap b = new ServerBootstrap();
//把倆個工作執行緒組加入進來
b.group(bossGroup, workerGroup)
//我要指定使用NioServerSocketChannel這種型別的通道
.channel(NioServerSocketChannel.class)
//一定要使用 childHandler 去繫結具體的 事件處理器
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel sc) throws Exception {
//設定自定義分隔符
ByteBuf buf = Unpooled.copiedBuffer("$_".getBytes());
//配置分割符解析器,並設定最大幀的長度與自定義分割符
sc.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, buf));
//設定字串形式的解析器
sc.pipeline().addLast(new StringDecoder());
//設定字串形式的編碼器
sc.pipeline().addLast(new StringEncoder());
sc.pipeline().addLast(new ServerHandler());
}
});
2、Netty拆包粘包
在基於流的傳輸裡比如TCP/IP,接收到的資料會先被儲存到一個socket接收緩衝裡。不幸的是,基於流的傳輸並不是一個數據包佇列,而是一個位元組佇列。即使你傳送了2個獨立的資料包,作業系統也不會作為2個訊息處理而僅僅是作為一連串的位元組而言。因此這是不能保證你遠端寫入的資料就會準確地讀取。
參考資料:http://ifeve.com/netty5-user-guide
常用的拆包粘包主要有3種方式:
- 1、訊息定長,例如每個報文的大小固定為200個位元組,如果不夠,空位補空格。
- 2、在包尾部增加特殊字串進行分割,例如加回車等
- 3、 將訊息分文訊息頭和訊息體,在訊息頭中包含表示訊息總長度的欄位,然後進行業務邏輯的處理
(1) 在包尾部增加特殊字串進行分割
Server.java
import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.DelimiterBasedFrameDecoder; import io.netty.handler.codec.string.StringDecoder; public class Server { public static void main(String[] args) throws Exception { //1 第一個執行緒組 是用於接收Client端連線的 EventLoopGroup bossGroup = new NioEventLoopGroup(); //2 第二個執行緒組 是用於實際的業務處理操作的 EventLoopGroup workerGroup = new NioEventLoopGroup(); //3 建立一個輔助類Bootstrap,就是對我們的Server進行一系列的配置 ServerBootstrap b = new ServerBootstrap(); //把倆個工作執行緒組加入進來 b.group(bossGroup, workerGroup) //我要指定使用NioServerSocketChannel這種型別的通道 .channel(NioServerSocketChannel.class) //一定要使用 childHandler 去繫結具體的 事件處理器 .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel sc) throws Exception { //設定自定義分隔符 ByteBuf buf = Unpooled.copiedBuffer("$_".getBytes()); //配置分割符解析器,並設定最大幀的長度與自定義分割符 sc.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, buf)); //設定字串形式的解析器 sc.pipeline().addLast(new StringDecoder()); //設定字串形式的編碼器 sc.pipeline().addLast(new StringEncoder()); sc.pipeline().addLast(new ServerHandler()); } }) //設定TCP緩衝區 .option(ChannelOption.SO_BACKLOG, 128) //設定傳送緩衝大小 .option(ChannelOption.SO_SNDBUF, 32*1024) //設定接收緩衝大小 .option(ChannelOption.SO_RCVBUF, 32*1024) //保持連線 .option(ChannelOption.SO_KEEPALIVE, true); //繫結指定的埠 進行監聽 ChannelFuture f = b.bind(8765).sync(); //Thread.sleep(1000000); //非同步監聽管道關閉 f.channel().closeFuture().sync(); bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } }
ServerHandler.java
import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import io.netty.util.ReferenceCountUtil; import io.netty.util.concurrent.GenericFutureListener; public class ServerHandler extends ChannelHandlerAdapter { /** * @param ctx 連線上下文 * @msg 傳輸的訊息物件 */ @Override public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception { try { //do something msg String request = (String)msg; System.out.println("Server: " + request); //寫給客戶端 String response = "我是響應訊息$_"; //需要注意,在netty中預設是使用ByteBuf進行傳輸的,所以在回寫訊息的時候必須要轉成ByteBuf //呼叫write方法的時候,netty會自動釋放msg,所以下面的ReferenceCountUtil.release可以省略 ChannelFuture future = ctx.writeAndFlush(response); // ChannelFuture future = ctx.writeAndFlush(Unpooled.copiedBuffer(response.getBytes())); //新增監聽,當資料回寫完畢之後,呼叫該監聽方法 future.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture arg0) throws Exception { // TODO Auto-generated method stub System.out.println("Server訊息會送完畢,回撥該方法。"); //關閉連線 // ctx.close(); } }); //當資料回寫完畢之後,關閉與客戶端的連線 // future.addListener(ChannelFutureListener.CLOSE); } catch (Exception e) { // TODO: handle exception }finally{ // ReferenceCountUtil.release(msg); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
Client.java
import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.DelimiterBasedFrameDecoder; import io.netty.handler.codec.string.StringDecoder; public class Client { public static void main(String[] args) throws Exception { EventLoopGroup workgroup = new NioEventLoopGroup(); Bootstrap b = new Bootstrap(); b.group(workgroup) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel sc) throws Exception { ByteBuf buf = Unpooled.copiedBuffer("$_".getBytes()); //配置分割符解析器,並設定最大幀的長度與自定義分割符 sc.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, buf)); //設定字串形式的解析器 sc.pipeline().addLast(new StringDecoder()); //設定字串形式的編碼器 sc.pipeline().addLast(new StringEncoder()); sc.pipeline().addLast(new ClientHandler()); } }); ChannelFuture cf1 = b.connect("127.0.0.1", 8765).sync(); // cf1.channel().writeAndFlush(Unpooled.copiedBuffer("我是訊息1;$_".getBytes())); // cf1.channel().writeAndFlush(Unpooled.copiedBuffer("我是訊息2;$_".getBytes())); // cf1.channel().writeAndFlush(Unpooled.copiedBuffer("我是訊息3;$_".getBytes())); cf1.channel().writeAndFlush("我是訊息1;$_"); cf1.channel().writeAndFlush("我是訊息2;$_"); cf1.channel().writeAndFlush("我是訊息3;$_"); cf1.channel().closeFuture().sync(); workgroup.shutdownGracefully(); } }
ClientHandle.java
import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import io.netty.util.ReferenceCountUtil; public class ClientHandler extends ChannelHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { try { //do something msg String response = (String)msg; System.out.println("Client:" + response); } finally { ReferenceCountUtil.release(msg); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
(2) 訊息定長
只要修改訊息解析器就好
Server.java
//3 建立一個輔助類Bootstrap,就是對我們的Server進行一系列的配置 ServerBootstrap b = new ServerBootstrap(); //把倆個工作執行緒組加入進來 b.group(bossGroup, workerGroup) //我要指定使用NioServerSocketChannel這種型別的通道 .channel(NioServerSocketChannel.class) //一定要使用 childHandler 去繫結具體的 事件處理器 .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel sc) throws Exception { //配置固定長度解析器 sc.pipeline().addLast(new FixedLengthFrameDecoder(5)); //設定字串形式的解析器 sc.pipeline().addLast(new StringDecoder()); //設定字串形式的編碼器 sc.pipeline().addLast(new StringEncoder()); sc.pipeline().addLast(new ServerHandler()); } }) //設定TCP緩衝區 .option(ChannelOption.SO_BACKLOG, 128) //保持連線 .option(ChannelOption.SO_KEEPALIVE, true);
Client.java
Bootstrap b = new Bootstrap(); b.group(workgroup) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel sc) throws Exception { //配置固定長度解析器 sc.pipeline().addLast(new FixedLengthFrameDecoder(5)); //設定字串形式的解析器 sc.pipeline().addLast(new StringDecoder()); //設定字串形式的編碼器 sc.pipeline().addLast(new StringEncoder()); sc.pipeline().addLast(new ClientHandler()); } }); //上面設定了定長為5個位元組,那麼如果不足5個位元組訊息不會發送,如果超過五個直接,那麼會將前面的5個位元組當做一個訊息發出,剩下的部分如果不足5個,那麼則不會發送,如果夠5個就當做一個新的訊息發出 cf1.channel().writeAndFlush(Unpooled.copiedBuffer("aaaaa".getBytes())); cf1.channel().writeAndFlush(Unpooled.copiedBuffer("bbbbb".getBytes())); cf1.channel().writeAndFlush(Unpooled.copiedBuffer("ccccccc ".getBytes())); cf1.channel().writeAndFlush(Unpooled.copiedBuffer("ddd".getBytes()));
3、Netty服務部署
常用的部署方式有2中,一種是耦合在Web應用中(以Tomcat為例),使其伴隨Tomcat的啟動而啟動,伴隨Tomcat的關閉而關閉。另外一種則是將Netty獨立打包部署,然後由單獨的程序啟動執行(可以使用shell或其他指令碼進行啟動),然後以資料庫或者其他快取為承接點,實現資料互動。Netty與其他程式進行互動,然後將獲取到的資料進行處理插入資料庫或者快取,然後其他服務從中獲取。獲取在Netty中呼叫web應用的一些對外介面。
4、Netty編解碼技術
編解碼技術,說白了就是java序列化技術,序列化目的就兩個,第一個進行網路傳輸,第二物件持久化。雖然我們可以使用java進行物件序列化,netty去傳輸,但是java序列化的硬傷太多,比如:java序列化沒法跨語言、序列化後碼流太大、序列化效能太低等等。。
主流的編解碼框架:
- JBoss的Marshalling包
- google的Protobuf
- 基於Protobuf的Kyro (效能高於Protobuf,可以與Marshalling媲美)
- MessagePack框架
(1)Netty結合JBoss Marshalling
JBoss Marshalling是一個java物件序列化包,對JDK預設的序列化框架進行了優化,但又保持跟java.io.Seriallzable介面相容,同時增加了一些可調的引數和附加特性。
類庫:jboss-marshalling1.3.0、jboss-marshalling-serial-1.3.0
應用例項:
1、構造Marshalling的編解碼工廠類
MarshallingCodeFactory.java
import org.jboss.marshalling.MarshallerFactory; import org.jboss.marshalling.Marshalling; import org.jboss.marshalling.MarshallingConfiguration; import io.netty.handler.codec.marshalling.DefaultMarshallerProvider; import io.netty.handler.codec.marshalling.DefaultUnmarshallerProvider; import io.netty.handler.codec.marshalling.MarshallerProvider; import io.netty.handler.codec.marshalling.MarshallingDecoder; import io.netty.handler.codec.marshalling.MarshallingEncoder; import io.netty.handler.codec.marshalling.UnmarshallerProvider; public class MarshallingCodeFactory { /** *解碼器 */ public static MarshallingDecoder buildMarshallingDecode(){ //首先通過Marshlling工具類的方法獲取Marshalling例項物件,引數serial標識建立的是java序列化工廠物件 final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial"); //建立MarshallingConfiguration物件 final MarshallingConfiguration configuration = new MarshallingConfiguration(); //設定Marshalling的版本號 configuration.setVersion(5); //根據MarshallerFactory和configuration建立provider(用於編解碼操作) UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration); //構建Netty的MarshallingDecoder物件,兩個引數分別為provider和單個訊息序列化後的最大長度,大於該長度的訊息會被拒絕處理 MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024 * 1024); return decoder; } /** *編碼器 */ public static MarshallingEncoder buildMarshallingEncoder(){ //首先通過Marshlling工具類的方法獲取Marshalling例項物件,引數serial標識建立的是java序列化工廠物件 final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial"); //建立MarshallingConfiguration物件 final MarshallingConfiguration configuration = new MarshallingConfiguration(); //設定Marshalling的版本號 configuration.setVersion(5); //根據MarshallerFactory和configuration建立provider MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration); MarshallingEncoder encoder = new MarshallingEncoder(provider); return encoder; } }
Server.java
import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; public class Server { private int port; public Server(int port) { // TODO Auto-generated constructor stub this.port = port; } public void run(){ //用來接收連線事件組 EventLoopGroup bossGroup = new NioEventLoopGroup(); //用來處理接收到的連線的事件處理組 EventLoopGroup workerGroup = new NioEventLoopGroup(); try { //server配置輔助類 ServerBootstrap serverBootstrap = new ServerBootstrap(); //將連線接收組與事件處理組連線,當server的bossGroup接收到連線之後就會交給workerGroup進行處理 serverBootstrap.group(bossGroup, workerGroup) //指定接收的Channel型別 .channel(NioServerSocketChannel.class) //handler在初始化時就會執行,而childHandler會在客戶端成功connect後才執行,這是兩者的區別。 .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel channel) throws Exception { // TODO Auto-generated method stub channel.pipeline().addLast(MarshallingCodeFactory.buildMarshallingEncoder()); channel.pipeline().addLast(MarshallingCodeFactory.buildMarshallingDecode()); channel.pipeline().addLast(new ServerHandler()); } }) //設定TCP緩衝區的大小 .option(ChannelOption.SO_BACKLOG, 128) //設定是否保持連線 .childOption(ChannelOption.SO_KEEPALIVE, true); //注意,此處option()是提供給NioServerSocketChannel用來接收進來的連線,也就是boss執行緒。 //childOption()是提供給由父管道ServerChannel接收到的連線,也就是worker執行緒,在這個例子中也是NioServerSocketChannel。 //非同步繫結埠,可以多次呼叫繫結多個埠 ChannelFuture channelFuture = serverBootstrap.bind(this.port).sync(); // ChannelFuture channelFuture2 = serverBootstrap.bind(8764).sync(); System.out.println("Server服務已經啟動."); //非同步檢查管道是否關閉 channelFuture.channel().closeFuture().sync(); // channelFuture2.channel().closeFuture().sync(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); }finally{ workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } public static void main(String[] args) { Server server = new Server(8765); server.run(); } }
ServerHandler.java
import java.io.File; import java.io.FileOutputStream; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import io.netty.util.ReferenceCountUtil; public class ServerHandler extends ChannelHandlerAdapter{ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { // TODO Auto-generated method stub System.out.println("Channel Active..."); super.channelActive(ctx); } @Override public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception { // TODO Auto-generated method stub System.out.println("Channel Read..."); try{ Request request = (Request)msg; byte[] attachment = request.getAttachment(); byte[] unGzipData = GzipUtils.unGzip(attachment); String outPath = System.getProperty("user.dir") + File.separatorChar + "receive" + File.separatorChar + "006_" + request.getId() + ".jpg"; FileOutputStream out = new FileOutputStream(outPath); out.write(unGzipData); out.flush(); out.close(); System.out.println("Server Handler received message : " + request.toString() ); Response response = new Response(); response.setId(request.getId()); response.setName("response_name_" + request.getId()); response.setResponseMessage("響應內容:" + request.getId()); ctx.writeAndFlush(response); // writeFlush.addListener(ChannelFutureListener.CLOSE); }finally{ ReferenceCountUtil.release(msg); } } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { // TODO Auto-generated method stub System.out.println("Channel Read Complete..."); super.channelReadComplete(ctx); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { // TODO Auto-generated method stub System.out.println("Exception Caught..."); super.exceptionCaught(ctx, cause); } }
Client.java
import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import java.io.File; import java.io.FileInputStream; public class Client { private String ip; private int port; public Client(String ip, int port) { // TODO Auto-generated constructor stub this.ip = ip; this.port = port; } public void run() throws Exception{ //客戶端用來連線服務端的連線組 EventLoopGroup workerGroup = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(workerGroup) .channel(NioSocketChannel.class) .handler(new LoggingHandler(LogLevel.INFO)) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel sChannel) throws Exception { // TODO Auto-generated method stub sChannel.pipeline().addLast(MarshallingCodeFactory.buildMarshallingEncoder()); sChannel.pipeline().addLast(MarshallingCodeFactory.buildMarshallingDecode()); sChannel.pipeline().addLast(new ClientHandler()); } }) .option(ChannelOption.SO_KEEPALIVE, true); //可以多次呼叫繫結多個埠 ChannelFuture channelFuture = bootstrap.connect(this.ip, this.port).sync(); for(int i = 0; i < 2; i++){ Request req = new Request(); req.setId("id_" + i); req.setName("name_" + i); req.setRequestMessage("資料訊息_" + i); String path = System.getProperty("user.dir") + File.separatorChar + "source" + File.separatorChar + "006.jpg"; File file = new File(path); if( file.exists() ){ FileInputStream in = new FileInputStream(file); byte[] fileData = new byte[in.available()]; in.read(fileData, 0, fileData.length); req.setAttachment(GzipUtils.gzip(fileData)); } channelFuture.channel().writeAndFlush(req); } channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); }finally{ workerGroup.shutdownGracefully(); } } public static void main(String[] args) { Client client = new Client("127.0.0.1", 8765); try { client.run(); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
ClientHandler.java
import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; public class ClientHandler extends ChannelHandlerAdapter{ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // TODO Auto-generated method stub try{ Response response = (Response)msg; System.out.println("ClientHandler received message : " + response.toString() ); }finally{ } } }
Request.java
import java.io.Serializable; /** * 請求實體,注意需要實現Serializable介面,因為Marshalling相當於對Serializable的一個補充,在傳輸的時候仍然是利用Serializable進行序列化的 * @author jliu10 * */ public class Request implements Serializable{ private static final long serialVersionUID = 5538517398618869423L; private String id; private String name; private String requestMessage; private byte[] attachment; public String getId() { return id; } public void setId(String id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getRequestMessage() { return requestMessage; } public void setRequestMessage(String requestMessage) { this.requestMessage = requestMessage; } public byte[] getAttachment() { return attachment; } public void setAttachment(byte[] attachment) { this.attachment = attachment; } @Override public String toString() { // TODO Auto-generated method stub return " id : " + this.id + "; name : " + this.name + "; requestMessage : " + this.requestMessage; } }
Response.java
import java.io.Serializable; /** * 請求實體,注意需要實現Serializable介面,因為Marshalling相當於對Serializable的一個補充,在傳輸的時候仍然是利用Serializable進行序列化的 * @author jliu10 * */ public class Response implements Serializable{ private static final long serialVersionUID = 4157472697937211837L; private String id; private String name; private String responseMessage; public String getId() { return id; } public void setId(String id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getResponseMessage() { return responseMessage; } public void setResponseMessage(String responseMessage) { this.responseMessage = responseMessage; } @Override public String toString() { // TODO Auto-generated method stub return " id : " + this.id + "; name : " + this.name + "; responseMessage : " + this.responseMessage; } }
一般我們可能會需要傳輸附件檔案,但是原始附件檔案可能會很大或者是資料夾之類的,所以我們通常會選擇先將附件進行壓縮,然後再進行傳輸。JDK已經給我們提供了一個壓縮類GZIP
GzipUtils.java
import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; import java.util.zip.GZIPInputStream; import java.util.zip.GZIPOutputStream; public class GzipUtils { public static byte[] gzip(byte[] data) throws Exception{ //構造一個byte輸出流,用來快取資料,然後提取成byte[](將資料從流中提取到記憶體中) ByteArrayOutputStream bos = new ByteArrayOutputStream(); GZIPOutputStream gzip = new GZIPOutputStream(bos); gzip.write(data); gzip.finish(); gzip.close(); byte[] zipData = bos.toByteArray(); bos.flush(); bos.close(); return zipData; } public static byte[] unGzip(byte[] data) throws Exception{ //構造一個byte輸入流,將傳入的byte[]資料轉成輸入流(將資料從記憶體中寫入到流中) ByteArrayInputStream bis = new ByteArrayInputStream(data); GZIPInputStream gzip = new GZIPInputStream(bis); byte[] readBuf = new byte[1024]; int num = -1; //構造一個byte輸出流,用來快取資料,然後提取成byte[] ByteArrayOutputStream bos = new ByteArrayOutputStream(); while((num = gzip.read(readBuf, 0, readBuf.length)) != -1){ bos.write(readBuf, 0, num); } gzip.close(); bis.close(); byte[] unZipData = bos.toByteArray(); bos.flush(); bos.close(); return unZipData; } public static void main(String[] args) throws Exception { String path = System.getProperty("user.dir") + File.separatorChar + "source" + File.separatorChar + "20160107001.mp4"; File file = new File(path); if( !file.exists() ){ System.out.println("This file + (" + path + ") not exist!"); return; } FileInputStream in = new FileInputStream(file); byte[] fileData = new byte[in.available()]; in.read(fileData, 0, fileData.length); in.close(); System.out.println("原資料長度為:" + fileData.length); byte[] gZipData = GzipUtils.gzip(fileData); System.out.println("壓縮後的資料長度為:" + gZipData.length); byte[] unGzipData = GzipUtils.unGzip(gZipData); System.out.println("解壓後的資料長度為:" + unGzipData.length); String outPath1 = System.getProperty("user.dir") + File.separatorChar + "receive" + File.separatorChar + "20160107001.zip"; FileOutputStream out1 = new FileOutputStream(outPath1); out1.write(gZipData); out1.flush(); out1.close(); String outPath2 = System.getProperty("user.dir") + File.separatorChar + "receive" + File.separatorChar + "20160107001.mp4"; FileOutputStream out2 = new FileOutputStream(outPath2); out2.write(unGzipData); out2.flush(); out2.close(); } }