netty快速入門3個例項
是一個netty快速入門的例子,也是我的學習筆記,比較簡單,翻譯於官方的文件整理後把所有程式碼註釋放在每一行程式碼中間,簡單明瞭地介紹一些基礎的用法。
首頁這是基於netty5的例子,如果需要使用請依賴netty5的包。maven引用方式
1 |
< dependency > |
2 |
< groupId >io.netty</ groupId > |
3 |
< artifactId >netty-all</ artifactId > |
4 |
< version >5.0.0.Alpha2</ version > |
5 |
</ dependency > |
或者去下載最新的jar下載頁面
1.DISCARD服務(丟棄服務,指的是會忽略所有接收的資料的一種協議)
001 |
import io.netty.bootstrap.ServerBootstrap; |
002 |
import io.netty.channel.ChannelFuture; |
003 |
import io.netty.channel.ChannelInitializer; |
004 |
import io.netty.channel.ChannelOption; |
005 |
import io.netty.channel.EventLoopGroup; |
006 |
import io.netty.channel.nio.NioEventLoopGroup; |
007 |
import io.netty.channel.socket.SocketChannel; |
008 |
import io.netty.channel.socket.nio.NioServerSocketChannel; |
009 |
010 |
/** |
011 |
* 處理資料 |
012 |
*/ |
013 |
public class NettyServer { |
014 |
private int port; |
015 |
public NettyServer( int port) { |
016 |
this .port = port; |
017 |
} |
018 |
public void run() throws Exception { |
019 |
/*** |
020 |
* NioEventLoopGroup 是用來處理I/O操作的多執行緒事件迴圈器, |
021 |
* Netty提供了許多不同的EventLoopGroup的實現用來處理不同傳輸協議。 |
022 |
* 在這個例子中我們實現了一個服務端的應用, |
023 |
* 因此會有2個NioEventLoopGroup會被使用。 |
024 |
* 第一個經常被叫做‘boss’,用來接收進來的連線。 |
025 |
* 第二個經常被叫做‘worker’,用來處理已經被接收的連線, |
026 |
* 一旦‘boss’接收到連線,就會把連線資訊註冊到‘worker’上。 |
027 |
* 如何知道多少個執行緒已經被使用,如何對映到已經建立的Channels上都需要依賴於EventLoopGroup的實現, |
028 |
* 並且可以通過建構函式來配置他們的關係。 |
029 |
*/ |
030 |
EventLoopGroup bossGroup = new NioEventLoopGroup(); |
031 |
EventLoopGroup workerGroup = new NioEventLoopGroup(); |
032 |
System.out.println( "準備執行埠:" + port); |
033 |
try { |
034 |
/** |
035 |
* ServerBootstrap 是一個啟動NIO服務的輔助啟動類 |
036 |
* 你可以在這個服務中直接使用Channel |
037 |
*/ |
038 |
ServerBootstrap b = new ServerBootstrap(); |
039 |
/** |
040 |
* 這一步是必須的,如果沒有設定group將會報java.lang.IllegalStateException: group not set異常 |
041 |
*/ |
042 |
b = b.group(bossGroup, workerGroup); |
043 |
/*** |
044 |
* ServerSocketChannel以NIO的selector為基礎進行實現的,用來接收新的連線 |
045 |
* 這裡告訴Channel如何獲取新的連線. |
046 |
*/ |
047 |
b = b.channel(NioServerSocketChannel. class ); |
048 |
/*** |
049 |
* 這裡的事件處理類經常會被用來處理一個最近的已經接收的Channel。 |
050 |
* ChannelInitializer是一個特殊的處理類, |
051 |
* 他的目的是幫助使用者配置一個新的Channel。 |
052 |
* 也許你想通過增加一些處理類比如NettyServerHandler來配置一個新的Channel |
053 |
* 或者其對應的ChannelPipeline來實現你的網路程式。 |
054 |
* 當你的程式變的複雜時,可能你會增加更多的處理類到pipline上, |
055 |
* 然後提取這些匿名類到最頂層的類上。 |
056 |
*/ |
057 |
b = b.childHandler( new ChannelInitializer<SocketChannel>() { // (4) |
058 |
@Override |
059 |
public void initChannel(SocketChannel ch) throws Exception { |
060 |
ch.pipeline().addLast( new DiscardServerHandler()); |
061 |
//ch.pipeline().addLast(new ResponseServerHandler()); |
062 |
// ch.pipeline().addLast(new TimeServerHandler()); |
063 |
} |
064 |
}); |
065 |
/*** |
066 |
* 你可以設定這裡指定的通道實現的配置引數。 |
067 |
* 我們正在寫一個TCP/IP的服務端, |
068 |
* 因此我們被允許設定socket的引數選項比如tcpNoDelay和keepAlive。 |
069 |
* 請參考ChannelOption和詳細的ChannelConfig實現的介面文件以此可以對ChannelOptions的有一個大概的認識。 |
070 |
*/ |
071 |
b = b.option(ChannelOption.SO_BACKLOG, 128 ); |
072 |
/*** |
073 |
* option()是提供給NioServerSocketChannel用來接收進來的連線。 |
074 |
* childOption()是提供給由父管道ServerChannel接收到的連線, |
075 |
* 在這個例子中也是NioServerSocketChannel。 |
076 |
*/ |
077 |
b = b.childOption(ChannelOption.SO_KEEPALIVE, true ); |
078 |
/*** |
079 |
* 繫結埠並啟動去接收進來的連線 |
080 |
*/ |
081 |
ChannelFuture f = b.bind(port).sync(); |
082 |
/** |
083 |
* 這裡會一直等待,直到socket被關閉 |
084 |
*/ |
085 |
f.channel().closeFuture().sync(); |
086 |
} finally { |
087 |
/*** |
088 |
* 優雅關閉 |
089 |
*/ |
090 |
workerGroup.shutdownGracefully(); |
091 |
bossGroup.shutdownGracefully(); |
092 |
} |
093 |
} |
094 |
095 |
public static void main(String[] args) throws Exception { |
096 |
int port; |
097 |
if (args.length > 0 ) { |
098 |
port = Integer.parseInt(args[ 0 ]); |
099 |
} else { |
100 |
port = 8000 ; |
101 |
} |
102 |
new NettyServer(port).run(); |
103 |
} |
104 |
} |
01 |
import io.netty.buffer.ByteBuf; |
02 |
import io.netty.channel.ChannelHandlerAdapter; |
03 |
import io.netty.channel.ChannelHandlerContext; |
04 |
import io.netty.util.CharsetUtil; |
05 |
import io.netty.util.ReferenceCountUtil; |
06 |
07 |
/** |
08 |
* 服務端處理通道.這裡只是列印一下請求的內容,並不對請求進行任何的響應 |
09 |
* DiscardServerHandler 繼承自 ChannelHandlerAdapter, |
10 |
* 這個類實現了ChannelHandler介面, |
11 |
* ChannelHandler提供了許多事件處理的介面方法, |
12 |
* 然後你可以覆蓋這些方法。 |
13 |
* 現在僅僅只需要繼承ChannelHandlerAdapter類而不是你自己去實現介面方法。 |
14 |
* |
15 |
*/ |
16 |
public class DiscardServerHandler extends ChannelHandlerAdapter { |
17 |
18 |
/*** |
19 |
* 這裡我們覆蓋了chanelRead()事件處理方法。 |
20 |
* 每當從客戶端收到新的資料時, |
21 |
* 這個方法會在收到訊息時被呼叫, |
22 |
* 這個例子中,收到的訊息的型別是ByteBuf |
23 |
* @param ctx 通道處理的上下文資訊 |
24 |
* @param msg 接收的訊息 |
25 |
*/ |
26 |
@Override |
27 |
public void channelRead(ChannelHandlerContext ctx, Object msg) { |
28 |
try { |
29 |
ByteBuf in = (ByteBuf) msg; |
30 |
/* while (in.isReadable()) { |
31 |
System.out.print((char) in.readByte()); |
32 |
System.out.flush(); |
33 |
}*/ |
34 |
//這一句和上面註釋的的效果都是列印輸入的字元 |
35 |
System.out.println(in.toString(CharsetUtil.US_ASCII)); |
36 |
}finally { |
37 |
/** |
38 |
* ByteBuf是一個引用計數物件,這個物件必須顯示地呼叫release()方法來釋放。 |
39 |
* 請記住處理器的職責是釋放所有傳遞到處理器的引用計數物件。 |
40 |
*/ |
41 |
ReferenceCountUtil.release(msg); |
42 |
} |
43 |
} |
44 |
45 |
/*** |
46 |
* 這個方法會在發生異常時觸發 |
47 |
* @param ctx |
48 |
* @param cause |
49 |
*/ |
50 |
@Override |
51 |
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { |
52 |
/*** |
53 |
* 發生異常後,關閉連線 |
54 |
*/ |
55 |
cause.printStackTrace(); |
56 |
ctx.close(); |
57 |
} |
58 |
59 |
} |
以上是一個丟棄服務的處理方式,你可以執行後通過telnet來發送訊息,來檢視是否正常執行,注意console裡會列印你的輸入內容。
2.ECHO服務(響應式協議)
到目前為止,我們雖然接收到了資料,但沒有做任何的響應。然而一個服務端通常會對一個請求作出響應。讓我們學習怎樣在ECHO協議的實現下編寫一個響應訊息給客戶端,這個協議針對任何接收的資料都會返回一個響應。
和discard server唯一不同的是把在此之前我們實現的channelRead()方法,返回所有的資料替代列印接收資料到控制檯上的邏輯。
說明NettyServer 還是用上面已經提供的類,只是把這段裡的登出部分修改成如下。
1 |
//ch.pipeline().addLast(new DiscardServerHandler()); |
2 |
ch.pipeline().addLast( new ResponseServerHandler()); |
3 |
//ch.pipeline().addLast(new TimeServerHandler()); |
下面是處理類ResponseServerHandler的程式碼
01 |
import io.netty.channel.ChannelHandlerAdapter; |
02 |
import io.netty.channel.ChannelHandlerContext; |
03 |
04 |
/** |
05 |
* 服務端處理通道. |
06 |
* ResponseServerHandler 繼承自 ChannelHandlerAdapter, |
07 |
* 這個類實現了ChannelHandler介面, |
08 |
* ChannelHandler提供了許多事件處理的介面方法, |
09 |
* 然後你可以覆蓋這些方法。 |
10 |
* 現在僅僅只需要繼承ChannelHandlerAdapter類而不是你自己去實現介面方法。 |
11 |
* 用來對請求響應 |
12 |
*/ |
13 |
public class ResponseServerHandler extends ChannelHandlerAdapter { |
14 |
15 |
/** |
16 |
* 這裡我們覆蓋了chanelRead()事件處理方法。 |
17 |
* 每當從客戶端收到新的資料時, |
18 |
* 這個方法會在收到訊息時被呼叫, |
19 |
*ChannelHandlerContext物件提供了許多操作, |
20 |
* 使你能夠觸發各種各樣的I/O事件和操作。 |
21 |
* 這裡我們呼叫了write(Object)方法來逐字地把接受到的訊息寫入 |
22 |
* @param ctx 通道處理的上下文資訊 |
23 |
* @param msg 接收的訊息 |
24 |
*/ |
25 |
@Override |
26 |
public void channelRead(ChannelHandlerContext ctx, Object msg) { |
27 |
ctx.write(msg); |
28 |
//cxt.writeAndFlush(msg) |
29 |
30 |
//請注意,這裡我並不需要顯式的釋放,因為在定入的時候netty已經自動釋放 |
31 |
// ReferenceCountUtil.release(msg); |
32 |
} |
33 |
34 |
/** |
35 |
* ctx.write(Object)方法不會使訊息寫入到通道上, |
36 |
* 他被緩衝在了內部,你需要呼叫ctx.flush()方法來把緩衝區中資料強行輸出。 |
37 |
* 或者你可以在channelRead方法中用更簡潔的cxt.writeAndFlush(msg)以達到同樣的目的 |
38 |
* @param ctx |
39 |
* @throws Exception |
40 |
*/ |
41 |
@Override |
42 |
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { |
43 |
ctx.flush(); |
44 |
} |
45 |
46 |
/** |
47 |
* 這個方法會在發生異常時觸發 |
48 |
* |
49 |
* @param ctx |
50 |
* @param cause |
51 |
*/ |
52 |
@Override |
53 |
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { |
54 |
/*** |
55 |
* 發生異常後,關閉連線 |
56 |
*/ |
57 |
cause.printStackTrace(); |
58 |
ctx.close(); |
59 |
} |
60 |
61 |
} |
同樣以上執行後,可以通過telnet傳送資料,console裡會打印出你傳送的資料,同時你的命令列介面裡應該也會接收到相同的資料。
3.TIME服務(時間協議的服務)
在這個部分被實現的協議是TIME協議。和之前的例子不同的是在不接受任何請求時他會發送一個含32位的整數的訊息,並且一旦訊息傳送就會立即關閉連線。在這個例子中,你會學習到如何構建和傳送一個訊息,然後在完成時主動關閉連線。
因為我們將會忽略任何接收到的資料,而只是在連線被建立傳送一個訊息,所以這次我們不能使用channelRead()方法了,代替他的是,我們需要覆蓋channelActive()方法,下面的就是實現的內容:
說明NettyServer 還是用上面已經提供的類,只是把這段裡的登出部分修改成如下。
1 |
//ch.pipeline().addLast(new DiscardServerHandler()); |
2 |
//ch.pipeline().addLast(new ResponseServerHandler()); |
3 |
ch.pipeline().addLast( new TimeServerHandler()); |
TimeServerHandler類的如下:
01 |
public class TimeServerHandler extends ChannelHandlerAdapter { |
02 |
03 |
/** |
04 |
* channelActive()方法將會在連線被建立並且準備進行通訊時被呼叫。 |
05 |
* 因此讓我們在這個方法裡完成一個代表當前時間的32位整數訊息的構建工作。 |
06 |
* |
07 |
* @param ctx |
08 |
*/ |
09 |
@Override |
10 |
public void channelActive( final ChannelHandlerContext ctx) { |
11 |
/** |
12 |
* 為了傳送一個新的訊息,我們需要分配一個包含這個訊息的新的緩衝。 |
13 |
* 因為我們需要寫入一個32位的整數,因此我們需要一個至少有4個位元組的ByteBuf。 |
14 |
* 通過ChannelHandlerContext.alloc()得到一個當前的ByteBufAllocator, |
15 |
* 然後分配一個新的緩衝。 |
16 |
*/ |
17 |
final ByteBuf time = ctx.alloc().buffer( 4 ); |
18 |
time.writeInt(( int ) (System.currentTimeMillis() / 1000L + 2208988800L)); |
19 |
/*** |
20 |
* 和往常一樣我們需要編寫一個構建好的訊息 |
21 |
* 。但是等一等,flip在哪?難道我們使用NIO傳送訊息時不是呼叫java.nio.ByteBuffer.flip()嗎? |
22 |
* ByteBuf之所以沒有這個方法因為有兩個指標, |
23 |
* 一個對應讀操作一個對應寫操作。 |
24 |
* 當你向ByteBuf裡寫入資料的時候寫指標的索引就會增加, |
25 |
* 同時讀指標的索引沒有變化。 |
26 |
* 讀指標索引和寫指標索引分別代表了訊息的開始和結束。 |
27 |
* 比較起來,NIO緩衝並沒有提供一種簡潔的方式來計算出訊息內容的開始和結尾, |
28 |
* 除非你呼叫flip方法。 |
29 |
* 當你忘記呼叫flip方法而引起沒有資料或者錯誤資料被髮送時, |
30 |
* 你會陷入困境。這樣的一個錯誤不會發生在Netty上, |
31 |
* 因為我們對於不同的操作型別有不同的指標。 |
32 |
* 你會發現這樣的使用方法會讓你過程變得更加的容易, |
33 |
* 因為你已經習慣一種沒有使用flip的方式。 |
34 |
* 另外一個點需要注意的是ChannelHandlerContext.write()(和writeAndFlush())方法會返回一個ChannelFuture物件, |
35 |
* 一個ChannelFuture代表了一個還沒有發生的I/O操作。 |
36 |
* 這意味著任何一個請求操作都不會馬上被執行, |
37 |
* 因為在Netty裡所有的操作都是非同步的。 |
38 |
* 因此你需要在write()方法返回的ChannelFuture完成後呼叫close()方法, |
39 |
* 然後當他的寫操作已經完成他會通知他的監聽者。 |
40 |
*/ |
41 |
final ChannelFuture f = ctx.writeAndFlush(time); // (3) |
42 |
/** |
43 |
* 當一個寫請求已經完成是如何通知到我們? |
44 |
* 這個只需要簡單地在返回的ChannelFuture上增加一個ChannelFutureListener。 |
45 |
* 這裡我們構建了一個匿名的ChannelFutureListener類用來在操作完成時關閉Channel。 |
46 |
*/ |
47 |
f.addListener( new ChannelFutureListener() { |
48 |
@Override |
49 |
public void operationComplete(ChannelFuture future) { |
50 |
assert f == future; |
51 |
/*** |
52 |
* 請注意,close()方法也可能不會立馬關閉,他也會返回一個ChannelFuture。 |
53 |
*/ |
54 |
ctx.close(); |
55 |
} |
56 |
}); |
57 |
} |
58 |
@Override |
59 |
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { |
60 |
cause.printStackTrace(); |
61 |
ctx.close(); |
62 |
} |
63 |
} |
4.Time客戶端
不像DISCARD和ECHO的服務端,對於TIME協議我們需要一個客戶端因為人們不能把一個32位的二進位制資料翻譯成一個日期或者日曆。在這一部分,我們將會討論如何確保服務端是正常工作的,並且學習怎樣用Netty編寫一個客戶端。
在Netty中,編寫服務端和客戶端最大的並且唯一不同的使用了不同的BootStrap和Channel的實現。
01 |
public class TimeClient { |
02 |
public static void main(String[] args) throws Exception { |
03 |
String host = "127.0.0.1" ; |
04 |
int port = 8000 ; |
05 |
EventLoopGroup workerGroup = new NioEventLoopGroup(); |
06 |
try { |
07 |
/** |
08 |
* 如果你只指定了一個EventLoopGroup, |
09 |
* 那他就會即作為一個‘boss’執行緒, |
10 |
* 也會作為一個‘workder’執行緒, |
11 |
* 儘管客戶端不需要使用到‘boss’執行緒。 |
12 |
*/ |
13 |
Bootstrap b = new Bootstrap(); // (1) |
14 |
b.group(workerGroup); // (2) |
15 |
/** |
16 |
* 代替NioServerSocketChannel的是NioSocketChannel,這個類在客戶端channel被建立時使用 |
17 |
*/ |
18 |
b.channel(NioSocketChannel. class ); // (3) |
19 |
/** |
20 |
* 不像在使用ServerBootstrap時需要用childOption()方法, |
21 |
* 因為客戶端的SocketChannel沒有父channel的概念。 |
22 |
*/ |
23 |
b.option(ChannelOption.SO_KEEPALIVE, true ); // (4) |
24 |
b.handler( new ChannelInitializer<SocketChannel>() { |
25 |
@Override |
26 |
public void initChannel(SocketChannel ch) throws Exception { |
27 |
ch.pipeline().addLast( new TimeClientHandler()); |
28 |
} |
29 |
}); |
30 |
//用connect()方法代替了bind()方法 |
31 |
ChannelFuture f = b.connect(host, port).sync(); |
32 |
//等到執行結束,關閉 |