Java NIO框架Netty(二)netty5例子,程式碼詳解
阿新 • • 發佈:2019-02-07
這是一個netty快速入門的例子,也是我的學習筆記,比較簡單,翻譯於官方的文件整理後把所有程式碼註釋放在每一行程式碼中間,簡單明瞭地介紹一些基礎的用法。
首頁這是基於netty5的例子,如果需要使用請依賴netty5的包。maven引用方式
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>5.0.0.Alpha2</version>
</dependency >
1.Netty Server
package com.tjbsl.netty.demo0.server;
import com.tjbsl.netty.demo3.time.TimeServerHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
* 處理資料
*/
public class NettyServer {
private int port;
public NettyServer(int port) {
this.port = port;
}
public void run() throws Exception {
/***
* NioEventLoopGroup 是用來處理I/O操作的多執行緒事件迴圈器,
* Netty提供了許多不同的EventLoopGroup的實現用來處理不同傳輸協議。
* 在這個例子中我們實現了一個服務端的應用,
* 因此會有2個NioEventLoopGroup會被使用。
* 第一個經常被叫做‘boss’,用來接收進來的連線。
* 第二個經常被叫做‘worker’,用來處理已經被接收的連線,
* 一旦‘boss’接收到連線,就會把連線資訊註冊到‘worker’上。
* 如何知道多少個執行緒已經被使用,如何對映到已經建立的Channels上都需要依賴於EventLoopGroup的實現,
* 並且可以通過建構函式來配置他們的關係。
*/
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
System.out.println("準備執行埠:" + port);
try {
/**
* ServerBootstrap 是一個啟動NIO服務的輔助啟動類
* 你可以在這個服務中直接使用Channel
*/
ServerBootstrap b = new ServerBootstrap();
/**
* 這一步是必須的,如果沒有設定group將會報java.lang.IllegalStateException: group not set異常
*/
b = b.group(bossGroup, workerGroup);
/***
* ServerSocketChannel以NIO的selector為基礎進行實現的,用來接收新的連線
* 這裡告訴Channel如何獲取新的連線.
*/
b = b.channel(NioServerSocketChannel.class);
/***
* 這裡的事件處理類經常會被用來處理一個最近的已經接收的Channel。
* ChannelInitializer是一個特殊的處理類,
* 他的目的是幫助使用者配置一個新的Channel。
* 也許你想通過增加一些處理類比如NettyServerHandler來配置一個新的Channel
* 或者其對應的ChannelPipeline來實現你的網路程式。
* 當你的程式變的複雜時,可能你會增加更多的處理類到pipline上,
* 然後提取這些匿名類到最頂層的類上。
*/
b = b.childHandler(new ChannelInitializer<SocketChannel>() { // (4)
@Override
public void initChannel(SocketChannel ch) throws Exception {
//ch.pipeline().addLast(new DiscardServerHandler());//demo1.discard
//ch.pipeline().addLast(new ResponseServerHandler());//demo2.echo
ch.pipeline().addLast(new TimeServerHandler());//demo3.time
}
});
/***
* 你可以設定這裡指定的通道實現的配置引數。
* 我們正在寫一個TCP/IP的服務端,
* 因此我們被允許設定socket的引數選項比如tcpNoDelay和keepAlive。
* 請參考ChannelOption和詳細的ChannelConfig實現的介面文件以此可以對ChannelOptions的有一個大概的認識。
*/
b = b.option(ChannelOption.SO_BACKLOG, 128);
/***
* option()是提供給NioServerSocketChannel用來接收進來的連線。
* childOption()是提供給由父管道ServerChannel接收到的連線,
* 在這個例子中也是NioServerSocketChannel。
*/
b = b.childOption(ChannelOption.SO_KEEPALIVE, true);
/***
* 繫結埠並啟動去接收進來的連線
*/
ChannelFuture f = b.bind(port).sync();
/**
* 這裡會一直等待,直到socket被關閉
*/
f.channel().closeFuture().sync();
} finally {
/***
* 優雅關閉
*/
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
int port;
if (args.length > 0) {
port = Integer.parseInt(args[0]);
} else {
port = 8000;
}
new NettyServer(port).run();
//通過cmd視窗的telnet 127.0.0.1 8000執行
}
}
1.DISCARD服務(丟棄服務,指的是會忽略所有接收的資料的一種協議)
package com.tjbsl.netty.demo1.discard;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.CharsetUtil;
import io.netty.util.ReferenceCountUtil;
/**
* 服務端處理通道.這裡只是列印一下請求的內容,並不對請求進行任何的響應
* DiscardServerHandler 繼承自 ChannelHandlerAdapter,
* 這個類實現了ChannelHandler介面,
* ChannelHandler提供了許多事件處理的介面方法,
* 然後你可以覆蓋這些方法。
* 現在僅僅只需要繼承ChannelHandlerAdapter類而不是你自己去實現介面方法。
*
*/
public class DiscardServerHandler extends ChannelHandlerAdapter {
/***
* 這裡我們覆蓋了chanelRead()事件處理方法。
* 每當從客戶端收到新的資料時,
* 這個方法會在收到訊息時被呼叫,
* 這個例子中,收到的訊息的型別是ByteBuf
* @param ctx 通道處理的上下文資訊
* @param msg 接收的訊息
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
try {
ByteBuf in = (ByteBuf) msg;
/* while (in.isReadable()) {
System.out.print((char) in.readByte());
System.out.flush();
}*/
//這一句和上面註釋的的效果都是列印輸入的字元
System.out.println(in.toString(CharsetUtil.US_ASCII));
}finally {
/**
* ByteBuf是一個引用計數物件,這個物件必須顯示地呼叫release()方法來釋放。
* 請記住處理器的職責是釋放所有傳遞到處理器的引用計數物件。
*/
ReferenceCountUtil.release(msg);
}
}
/***
* 這個方法會在發生異常時觸發
* @param ctx
* @param cause
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
/***
* 發生異常後,關閉連線
*/
cause.printStackTrace();
ctx.close();
}
}
以上是一個丟棄服務的處理方式,你可以執行後通過telnet來發送訊息,來檢視是否正常執行,注意console裡會列印你的輸入內容。
2.ECHO服務(響應式協議)
到目前為止,我們雖然接收到了資料,但沒有做任何的響應。然而一個服務端通常會對一個請求作出響應。讓我們學習怎樣在ECHO協議的實現下編寫一個響應訊息給客戶端,這個協議針對任何接收的資料都會返回一個響應。
和discard server唯一不同的是把在此之前我們實現的channelRead()方法,返回所有的資料替代列印接收資料到控制檯上的邏輯。
說明NettyServer 還是用上面已經提供的類,只是把這段裡的登出部分修改成如下。
package com.tjbsl.netty.demo2.echo;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.CharsetUtil;
/**
* 服務端處理通道.
* ResponseServerHandler 繼承自 ChannelHandlerAdapter,
* 這個類實現了ChannelHandler介面,
* ChannelHandler提供了許多事件處理的介面方法,
* 然後你可以覆蓋這些方法。
* 現在僅僅只需要繼承ChannelHandlerAdapter類而不是你自己去實現介面方法。
* 用來對請求響應
*/
public class ResponseServerHandler extends ChannelHandlerAdapter {
/**
* 這裡我們覆蓋了chanelRead()事件處理方法。
* 每當從客戶端收到新的資料時,
* 這個方法會在收到訊息時被呼叫,
*ChannelHandlerContext物件提供了許多操作,
* 使你能夠觸發各種各樣的I/O事件和操作。
* 這裡我們呼叫了write(Object)方法來逐字地把接受到的訊息寫入
* @param ctx 通道處理的上下文資訊
* @param msg 接收的訊息
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf in = (ByteBuf) msg;
System.out.println(in.toString(CharsetUtil.UTF_8));
ctx.write(msg);
//cxt.writeAndFlush(msg)
//請注意,這裡我並不需要顯式的釋放,因為在進入的時候netty已經自動釋放
// ReferenceCountUtil.release(msg);
}
/**
* ctx.write(Object)方法不會使訊息寫入到通道上,
* 他被緩衝在了內部,你需要呼叫ctx.flush()方法來把緩衝區中資料強行輸出。
* 或者你可以在channelRead方法中用更簡潔的cxt.writeAndFlush(msg)以達到同樣的目的
* @param ctx
* @throws Exception
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
/**
* 這個方法會在發生異常時觸發
*
* @param ctx
* @param cause
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
/***
* 發生異常後,關閉連線
*/
cause.printStackTrace();
ctx.close();
}
}
3.TIME服務(時間協議的服務)
在這個部分被實現的協議是TIME協議。和之前的例子不同的是在不接受任何請求時他會發送一個含32位的整數的訊息,並且一旦訊息傳送就會立即關閉連線。在這個例子中,你會學習到如何構建和傳送一個訊息,然後在完成時主動關閉連線。
因為我們將會忽略任何接收到的資料,而只是在連線被建立傳送一個訊息,所以這次我們不能使用channelRead()方法了,代替他的是,我們需要覆蓋channelActive()方法,下面的就是實現的內容:
說明NettyServer 還是用上面已經提供的類,只是把這段裡的登出部分修改成如下。
//ch.pipeline().addLast(new DiscardServerHandler());
//ch.pipeline().addLast(new ResponseServerHandler());
ch.pipeline().addLast(new TimeServerHandler());
TimeServerHandler類的如下:
package com.tjbsl.netty.demo3.time;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.CharsetUtil;
import java.util.Scanner;
public class TimeServerHandler extends ChannelHandlerAdapter {
/**
* channelActive()方法將會在連線被建立並且準備進行通訊時被呼叫。
* 因此讓我們在這個方法裡完成一個代表當前時間的32位整數訊息的構建工作。
*
* @param ctx
*/
@Override
public void channelActive(final ChannelHandlerContext ctx) {
/*Scanner cin=new Scanner(System.in);
System.out.println("請輸入傳送資訊:");
String name=cin.nextLine();*/
String name="HelloWorld!";
/**
* 為了傳送一個新的訊息,我們需要分配一個包含這個訊息的新的緩衝。
* 因為我們需要寫入一個32位的整數,因此我們需要一個至少有4個位元組的ByteBuf。
* 通過ChannelHandlerContext.alloc()得到一個當前的ByteBufAllocator,
* 然後分配一個新的緩衝。
*/
final ByteBuf time = ctx.alloc().buffer(4);
time.writeBytes(name.getBytes());
/***
* 和往常一樣我們需要編寫一個構建好的訊息
* 。但是等一等,flip在哪?難道我們使用NIO傳送訊息時不是呼叫java.nio.ByteBuffer.flip()嗎?
* ByteBuf之所以沒有這個方法因為有兩個指標,
* 一個對應讀操作一個對應寫操作。
* 當你向ByteBuf裡寫入資料的時候寫指標的索引就會增加,
* 同時讀指標的索引沒有變化。
* 讀指標索引和寫指標索引分別代表了訊息的開始和結束。
* 比較起來,NIO緩衝並沒有提供一種簡潔的方式來計算出訊息內容的開始和結尾,
* 除非你呼叫flip方法。
* 當你忘記呼叫flip方法而引起沒有資料或者錯誤資料被髮送時,
* 你會陷入困境。這樣的一個錯誤不會發生在Netty上,
* 因為我們對於不同的操作型別有不同的指標。
* 你會發現這樣的使用方法會讓你過程變得更加的容易,
* 因為你已經習慣一種沒有使用flip的方式。
* 另外一個點需要注意的是ChannelHandlerContext.write()(和writeAndFlush())方法會返回一個ChannelFuture物件,
* 一個ChannelFuture代表了一個還沒有發生的I/O操作。
* 這意味著任何一個請求操作都不會馬上被執行,
* 因為在Netty裡所有的操作都是非同步的。
* 因此你需要在write()方法返回的ChannelFuture完成後呼叫close()方法,
* 然後當他的寫操作已經完成他會通知他的監聽者。
*/
final ChannelFuture f = ctx.writeAndFlush(time); // (3)
/**
* 當一個寫請求已經完成是如何通知到我們?
* 這個只需要簡單地在返回的ChannelFuture上增加一個ChannelFutureListener。
* 這裡我們構建了一個匿名的ChannelFutureListener類用來在操作完成時關閉Channel。
*/
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
assert f == future;
/***
* 請注意,close()方法也可能不會立馬關閉,他也會返回一個ChannelFuture。
*/
ctx.close();
}
});
}
//接收結果
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println("client:"+buf.toString(CharsetUtil.UTF_8));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
4.Time客戶端
不像DISCARD和ECHO的服務端,對於TIME協議我們需要一個客戶端因為人們不能把一個32位的二進位制資料翻譯成一個日期或者日曆。在這一部分,我們將會討論如何確保服務端是正常工作的,並且學習怎樣用Netty編寫一個客戶端。
在Netty中,編寫服務端和客戶端最大的並且唯一不同的使用了不同的BootStrap和Channel的實現。
package com.tjbsl.netty.demo3.time.client;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
public class TimeClient {
public static void main(String[] args) throws Exception {
String host = "127.0.0.1";
int port =8000;
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
/**
* 如果你只指定了一個EventLoopGroup,
* 那他就會即作為一個‘boss’執行緒,
* 也會作為一個‘workder’執行緒,
* 儘管客戶端不需要使用到‘boss’執行緒。
*/
Bootstrap b = new Bootstrap(); // (1)
b.group(workerGroup); // (2)
/**
* 代替NioServerSocketChannel的是NioSocketChannel,這個類在客戶端channel被建立時使用
*/
b.channel(NioSocketChannel.class); // (3)
/**
* 不像在使用ServerBootstrap時需要用childOption()方法,
* 因為客戶端的SocketChannel沒有父channel的概念。
*/
b.option(ChannelOption.SO_KEEPALIVE, true); // (4)
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new TimeClientHandler());
}
});
//用connect()方法代替了bind()方法
ChannelFuture f = b.connect(host, port).sync();
//等到執行結束,關閉
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
}
}
}