3個netty5的例子,簡單介紹netty的用法
這是一個netty快速入門的例子,也是我的學習筆記,比較簡單,翻譯於官方的文件整理後把所有程式碼註釋放在每一行程式碼中間,簡單明瞭地介紹一些基礎的用法。
首頁這是基於netty5的例子,如果需要使用請依賴netty5的包。maven引用方式
1 |
< dependency > |
2 |
< groupId >io.netty</ groupId > |
3 |
< artifactId >netty-all</ artifactId > |
4 |
< version >5.0.0.Alpha2</ version > |
5 |
</ dependency > |
0.Netty Server
package com.tjbsl.netty.demo0.server; import com.tjbsl.netty.demo3.time.TimeServerHandler; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; /** * 處理資料 */ public class NettyServer { private int port; public NettyServer(int port) { this.port = port; } public void run() throws Exception { /*** * NioEventLoopGroup 是用來處理I/O操作的多執行緒事件迴圈器, * Netty提供了許多不同的EventLoopGroup的實現用來處理不同傳輸協議。 * 在這個例子中我們實現了一個服務端的應用, * 因此會有2個NioEventLoopGroup會被使用。 * 第一個經常被叫做‘boss’,用來接收進來的連線。 * 第二個經常被叫做‘worker’,用來處理已經被接收的連線, * 一旦‘boss’接收到連線,就會把連線資訊註冊到‘worker’上。 * 如何知道多少個執行緒已經被使用,如何對映到已經建立的Channels上都需要依賴於EventLoopGroup的實現, * 並且可以通過建構函式來配置他們的關係。 */ EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); System.out.println("準備執行埠:" + port); try { /** * ServerBootstrap 是一個啟動NIO服務的輔助啟動類 * 你可以在這個服務中直接使用Channel */ ServerBootstrap b = new ServerBootstrap(); /** * 這一步是必須的,如果沒有設定group將會報java.lang.IllegalStateException: group not set異常 */ b = b.group(bossGroup, workerGroup); /*** * ServerSocketChannel以NIO的selector為基礎進行實現的,用來接收新的連線 * 這裡告訴Channel如何獲取新的連線. */ b = b.channel(NioServerSocketChannel.class); /*** * 這裡的事件處理類經常會被用來處理一個最近的已經接收的Channel。 * ChannelInitializer是一個特殊的處理類, * 他的目的是幫助使用者配置一個新的Channel。 * 也許你想通過增加一些處理類比如NettyServerHandler來配置一個新的Channel * 或者其對應的ChannelPipeline來實現你的網路程式。 * 當你的程式變的複雜時,可能你會增加更多的處理類到pipline上, * 然後提取這些匿名類到最頂層的類上。 */ b = b.childHandler(new ChannelInitializer<SocketChannel>() { // (4) @Override public void initChannel(SocketChannel ch) throws Exception { //ch.pipeline().addLast(new DiscardServerHandler());//demo1.discard //ch.pipeline().addLast(new ResponseServerHandler());//demo2.echo ch.pipeline().addLast(new TimeServerHandler());//demo3.time } }); /*** * 你可以設定這裡指定的通道實現的配置引數。 * 我們正在寫一個TCP/IP的服務端, * 因此我們被允許設定socket的引數選項比如tcpNoDelay和keepAlive。 * 請參考ChannelOption和詳細的ChannelConfig實現的介面文件以此可以對ChannelOptions的有一個大概的認識。 */ b = b.option(ChannelOption.SO_BACKLOG, 128); /*** * option()是提供給NioServerSocketChannel用來接收進來的連線。 * childOption()是提供給由父管道ServerChannel接收到的連線, * 在這個例子中也是NioServerSocketChannel。 */ b = b.childOption(ChannelOption.SO_KEEPALIVE, true); /*** * 繫結埠並啟動去接收進來的連線 */ ChannelFuture f = b.bind(port).sync(); /** * 這裡會一直等待,直到socket被關閉 */ f.channel().closeFuture().sync(); } finally { /*** * 優雅關閉 */ workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } public static void main(String[] args) throws Exception { int port; if (args.length > 0) { port = Integer.parseInt(args[0]); } else { port = 8000; } new NettyServer(port).run(); //通過cmd視窗的telnet 127.0.0.1 8000執行 } } |
1.DISCARD服務(丟棄服務,指的是會忽略所有接收的資料的一種協議)
package com.tjbsl.netty.demo1.discard; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import io.netty.util.CharsetUtil; import io.netty.util.ReferenceCountUtil; /** * 服務端處理通道.這裡只是列印一下請求的內容,並不對請求進行任何的響應 * DiscardServerHandler 繼承自 ChannelHandlerAdapter, * 這個類實現了ChannelHandler介面, * ChannelHandler提供了許多事件處理的介面方法, * 然後你可以覆蓋這些方法。 * 現在僅僅只需要繼承ChannelHandlerAdapter類而不是你自己去實現介面方法。 * */ public class DiscardServerHandler extends ChannelHandlerAdapter { /*** * 這裡我們覆蓋了chanelRead()事件處理方法。 * 每當從客戶端收到新的資料時, * 這個方法會在收到訊息時被呼叫, * 這個例子中,收到的訊息的型別是ByteBuf * @param ctx 通道處理的上下文資訊 * @param msg 接收的訊息 */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { try { ByteBuf in = (ByteBuf) msg; /* while (in.isReadable()) { System.out.print((char) in.readByte()); System.out.flush(); }*/ //這一句和上面註釋的的效果都是列印輸入的字元 System.out.println(in.toString(CharsetUtil.US_ASCII)); }finally { /** * ByteBuf是一個引用計數物件,這個物件必須顯示地呼叫release()方法來釋放。 * 請記住處理器的職責是釋放所有傳遞到處理器的引用計數物件。 */ ReferenceCountUtil.release(msg); } } /*** * 這個方法會在發生異常時觸發 * @param ctx * @param cause */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { /*** * 發生異常後,關閉連線 */ cause.printStackTrace(); ctx.close(); } } |
以上是一個丟棄服務的處理方式,你可以執行後通過telnet來發送訊息,來檢視是否正常執行,注意console裡會列印你的輸入內容。
2.ECHO服務(響應式協議)
到目前為止,我們雖然接收到了資料,但沒有做任何的響應。然而一個服務端通常會對一個請求作出響應。讓我們學習怎樣在ECHO協議的實現下編寫一個響應訊息給客戶端,這個協議針對任何接收的資料都會返回一個響應。
和discard server唯一不同的是把在此之前我們實現的channelRead()方法,返回所有的資料替代列印接收資料到控制檯上的邏輯。
說明NettyServer 還是用上面已經提供的類,只是把這段裡的登出部分修改成如下。
package com.tjbsl.netty.demo2.echo; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import io.netty.util.CharsetUtil; /** * 服務端處理通道. * ResponseServerHandler 繼承自 ChannelHandlerAdapter, * 這個類實現了ChannelHandler介面, * ChannelHandler提供了許多事件處理的介面方法, * 然後你可以覆蓋這些方法。 * 現在僅僅只需要繼承ChannelHandlerAdapter類而不是你自己去實現介面方法。 * 用來對請求響應 */ public class ResponseServerHandler extends ChannelHandlerAdapter { /** * 這裡我們覆蓋了chanelRead()事件處理方法。 * 每當從客戶端收到新的資料時, * 這個方法會在收到訊息時被呼叫, *ChannelHandlerContext物件提供了許多操作, * 使你能夠觸發各種各樣的I/O事件和操作。 * 這裡我們呼叫了write(Object)方法來逐字地把接受到的訊息寫入 * @param ctx 通道處理的上下文資訊 * @param msg 接收的訊息 */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ByteBuf in = (ByteBuf) msg; System.out.println(in.toString(CharsetUtil.UTF_8)); ctx.write(msg); //cxt.writeAndFlush(msg) //請注意,這裡我並不需要顯式的釋放,因為在進入的時候netty已經自動釋放 // ReferenceCountUtil.release(msg); } /** * ctx.write(Object)方法不會使訊息寫入到通道上, * 他被緩衝在了內部,你需要呼叫ctx.flush()方法來把緩衝區中資料強行輸出。 * 或者你可以在channelRead方法中用更簡潔的cxt.writeAndFlush(msg)以達到同樣的目的 * @param ctx * @throws Exception */ @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } /** * 這個方法會在發生異常時觸發 * * @param ctx * @param cause */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { /*** * 發生異常後,關閉連線 */ cause.printStackTrace(); ctx.close(); } } |
同樣以上執行後,可以通過telnet傳送資料,console裡會打印出你傳送的資料,同時你的命令列介面裡應該也會接收到相同的資料。
3.TIME服務(時間協議的服務)
在這個部分被實現的協議是TIME協議。和之前的例子不同的是在不接受任何請求時他會發送一個含32位的整數的訊息,並且一旦訊息傳送就會立即關閉連線。在這個例子中,你會學習到如何構建和傳送一個訊息,然後在完成時主動關閉連線。
因為我們將會忽略任何接收到的資料,而只是在連線被建立傳送一個訊息,所以這次我們不能使用channelRead()方法了,代替他的是,我們需要覆蓋channelActive()方法,下面的就是實現的內容:
說明NettyServer 還是用上面已經提供的類,只是把這段裡的登出部分修改成如下。
1 |
//ch.pipeline().addLast(new DiscardServerHandler()); |
2 |
//ch.pipeline().addLast(new ResponseServerHandler()); |
3 |
ch.pipeline().addLast( new TimeServerHandler()); |
TimeServerHandler類的如下:
package com.tjbsl.netty.demo3.time; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import io.netty.util.CharsetUtil; import java.util.Scanner; public class TimeServerHandler extends ChannelHandlerAdapter { /** * channelActive()方法將會在連線被建立並且準備進行通訊時被呼叫。 * 因此讓我們在這個方法裡完成一個代表當前時間的32位整數訊息的構建工作。 * * @param ctx */ @Override public void channelActive(final ChannelHandlerContext ctx) { /*Scanner cin=new Scanner(System.in); System.out.println("請輸入傳送資訊:"); String name=cin.nextLine();*/ String name="HelloWorld!"; /** * 為了傳送一個新的訊息,我們需要分配一個包含這個訊息的新的緩衝。 * 因為我們需要寫入一個32位的整數,因此我們需要一個至少有4個位元組的ByteBuf。 * 通過ChannelHandlerContext.alloc()得到一個當前的ByteBufAllocator, * 然後分配一個新的緩衝。 */ final ByteBuf time = ctx.alloc().buffer(4); time.writeBytes(name.getBytes()); /*** * 和往常一樣我們需要編寫一個構建好的訊息 * 。但是等一等,flip在哪?難道我們使用NIO傳送訊息時不是呼叫java.nio.ByteBuffer.flip()嗎? * ByteBuf之所以沒有這個方法因為有兩個指標, * 一個對應讀操作一個對應寫操作。 * 當你向ByteBuf裡寫入資料的時候寫指標的索引就會增加, * 同時讀指標的索引沒有變化。 * 讀指標索引和寫指標索引分別代表了訊息的開始和結束。 * 比較起來,NIO緩衝並沒有提供一種簡潔的方式來計算出訊息內容的開始和結尾, * 除非你呼叫flip方法。 * 當你忘記呼叫flip方法而引起沒有資料或者錯誤資料被髮送時, * 你會陷入困境。這樣的一個錯誤不會發生在Netty上, * 因為我們對於不同的操作型別有不同的指標。 * 你會發現這樣的使用方法會讓你過程變得更加的容易, * 因為你已經習慣一種沒有使用flip的方式。 * 另外一個點需要注意的是ChannelHandlerContext.write()(和writeAndFlush())方法會返回一個ChannelFuture物件, * 一個ChannelFuture代表了一個還沒有發生的I/O操作。 * 這意味著任何一個請求操作都不會馬上被執行, * 因為在Netty裡所有的操作都是非同步的。 * 因此你需要在write()方法返回的ChannelFuture完成後呼叫close()方法, * 然後當他的寫操作已經完成他會通知他的監聽者。 */ final ChannelFuture f = ctx.writeAndFlush(time); // (3) /** * 當一個寫請求已經完成是如何通知到我們? * 這個只需要簡單地在返回的ChannelFuture上增加一個ChannelFutureListener。 * 這裡我們構建了一個匿名的ChannelFutureListener類用來在操作完成時關閉Channel。 */ f.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) { assert f == future; /*** * 請注意,close()方法也可能不會立馬關閉,他也會返回一個ChannelFuture。 */ ctx.close(); } }); } //接收結果 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; System.out.println("client:"+buf.toString(CharsetUtil.UTF_8)); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } } |
4.Time客戶端
不像DISCARD和ECHO的服務端,對於TIME協議我們需要一個客戶端因為人們不能把一個32位的二進位制資料翻譯成一個日期或者日曆。在這一部分,我們將會討論如何確保服務端是正常工作的,並且學習怎樣用Netty編寫一個客戶端。
在Netty中,編寫服務端和客戶端最大的並且唯一不同的使用了不同的BootStrap和Channel的實現。
package com.tjbsl.netty.demo3.time.client; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; public class TimeClient { public static void main(String[] args) throws Exception { String host = "127.0.0.1"; int port =8000; EventLoopGroup workerGroup = new NioEventLoopGroup(); try { /** * 如果你只指定了一個EventLoopGroup, * 那他就會即作為一個‘boss’執行緒, * 也會作為一個‘workder’執行緒, * 儘管客戶端不需要使用到‘boss’執行緒。 */ Bootstrap b = new Bootstrap(); // (1) b.group(workerGroup); // (2) /** * 代替NioServerSocketChannel的是NioSocketChannel,這個類在客戶端channel被建立時使用 */ b.channel(NioSocketChannel.class); // (3) /** * 不像在使用ServerBootstrap時需要用childOption()方法, * 因為客戶端的SocketChannel沒有父channel的概念。 */ b.option(ChannelOption.SO_KEEPALIVE, true); // (4) b.handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new TimeClientHandler()); } }); //用connect()方法代替了bind()方法 ChannelFuture f = b.connect(host, port).sync(); //等到執行結束,關閉 f.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); } } } |