netty框架 基於noi的(同步非阻塞io)長連線方案
Socket通訊(BIO/NIO/AIO)程式設計
BIO: 傳統阻塞IO
NIO: 同步非阻塞式IO
AIO: 非同步非阻塞IO,(非阻塞採用的是註冊通知的模式。)
為什麼會選擇Netty?因為它簡單!使用Netty不必編寫複雜的邏輯程式碼去實現通訊,再也不需要去考慮效能問題,不需要考慮編碼問題,半包讀寫等問題。強大的Netty已經幫我們實現好了,我們只需要使用即可。
原生NIO的三個最主要的類: Buffer(緩衝區),Channel(通道),Selector(多路複用器)
什麼是阻塞?
應用程式在獲取網路資料的時候,如果網路傳輸資料很慢,就會一直等待,直到傳輸完畢為止。
什麼是非阻塞?
應用程式直接可以獲取已經準備就緒好的資料,無需等待。
NIO(同步非阻塞式IO)
同步非阻塞式IO,伺服器實現模式為一個請求一個執行緒,即客戶端傳送的連線請求都會註冊到多路複用器上,多路複用器輪詢到連線有I/O請求時才啟動一個執行緒進行處理。
Netty簡介
Netty是基於Java NIO client-server的網路應用框架,使用Netty可以快速開發網路應用,例如伺服器和客戶端協議。Netty提供了一種新的方式來開發網路應用程式,這種新的方式使它很容易使用和具有很強的擴充套件性。Netty的內部實現是很複雜的,但是Netty提供了簡單易用的API從網路處理程式碼中解耦業務邏輯。Netty是完全基於NIO實現的,所以整個Netty都是非同步的。
Netty通訊的步驟:
①建立兩個NIO執行緒組,一個專門用於網路事件處理(接受客戶端的連線),另一個則進行網路通訊的讀寫。
②建立一個ServerBootstrap物件,配置Netty的一系列引數,例如接受傳出資料的快取大小等。
③建立一個用於實際處理資料的類ChannelInitializer,進行初始化的準備工作,比如設定接受傳出資料的字符集、格式以及實際處理資料的介面。
④繫結埠,執行同步阻塞方法等待伺服器端啟動即可。
詳細程式碼如下
服務端:
步驟1:服務端啟動類
package netty.server; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; /** * 程式的入口,負責啟動應用 * @author liuyazhuang * */ public class Main { static boolean isBrowser = false; public static void main(String[] args) { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workGroup); b.channel(NioServerSocketChannel.class); b.childHandler(new MyChannelInitializer(isBrowser)); b.option(ChannelOption.SO_BACKLOG, 1024); b.childOption(ChannelOption.SO_KEEPALIVE, true); System.out.println("服務端開啟等待客戶端連線...."+(isBrowser?"瀏覽器":"Nio客戶端")); Channel ch = b.bind(8888).sync().channel(); ch.closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); }finally{ //優雅的退出程式 bossGroup.shutdownGracefully(); workGroup.shutdownGracefully(); } } }
步驟2:初始化通道處理器(新增各種處理器,例如編解碼,業務處理器)
package netty.server;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.IdleStateHandler;
import java.util.concurrent.TimeUnit;
/**
* 初始化連線時候的各個元件
* @author liuyazhuang
*
*/
public class MyChannelInitializer extends ChannelInitializer<SocketChannel> {
private boolean isBrowser;
public MyChannelInitializer(boolean isBrowser){
this.isBrowser = isBrowser;
}
@Override
protected void initChannel(SocketChannel e) throws Exception {
ChannelPipeline pipeline = e.pipeline();
if(isBrowser) {
//用於瀏覽器的websocket場景
pipeline.addLast("http-codec", new HttpServerCodec());//HttpServerCodec:將請求和應答訊息解碼為HTTP訊息
pipeline.addLast("aggregator", new HttpObjectAggregator(65536));//把Http訊息組成完整地HTTP訊息
pipeline.addLast("http-chunked", new ChunkedWriteHandler());//向客戶端傳送HTML5檔案
}
pipeline.addLast(new IdleStateHandler(7, 7, 7, TimeUnit.SECONDS));//設定7秒沒有讀到資料,則觸發一個READER_IDLE事件
if(isBrowser) {
pipeline.addLast("handler", new ServerSocketHandlerBrowser());
} else {
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
pipeline.addLast("handler", new ServerSocketHandlerNIO());
}
//e.pipeline().addLast(new ReadTimeoutHandler(15));//設定連線最長時間,時間一到連線斷開。
}
}
步驟3:新增通道組全域性常量,方便操作通道的傳送訊息,(單發或者群發)
package netty.server;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
/**
* 儲存整個工程的全域性配置
* @author liuyazhuang
*
*/
public class NettyConfig {
/**
* 儲存每一個客戶端接入進來時的channel物件
*/
public static ChannelGroup group = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
}
步驟四:實現業務處理器(瀏覽器websocket)
package netty.server;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.websocketx.*;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.CharsetUtil;
import java.util.Date;
import static io.netty.handler.codec.http.HttpUtil.isKeepAlive;
import static io.netty.handler.codec.http.HttpUtil.setContentLength;
public class ServerSocketHandlerBrowser extends SimpleChannelInboundHandler<Object> {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {
//處理客戶端向服務端發起http握手請求的業務
if (o instanceof FullHttpRequest) {
handHttpRequest(channelHandlerContext, (FullHttpRequest)o);
}else if (o instanceof WebSocketFrame) { //處理websocket連線業務
// ridx是readerIndex讀取資料索引,位置從0開始
// widx是writeIndex寫資料索引,位置從0開始
// cap是capacity緩衝區初始化的容量,預設256,可以通過Unpooled.buffer(8)設定,初始化緩衝區容量是8。
handWebsocketFrame(channelHandlerContext, (WebSocketFrame)o);
}
}
private int loss_connect_time = 0;
private int retryTime = 8;
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state() == IdleState.READER_IDLE) {
loss_connect_time++;
System.out.println("5 秒沒有接收到客戶端的資訊了");
if (loss_connect_time > retryTime-1) {
StringBuffer bf = new StringBuffer("關閉這個不活躍的channel").append("(因為連續檢測了").append(retryTime).append("次讀狀態都是空閒的)");
System.out.println(bf.toString());
ctx.channel().close();
}
}
}
super.userEventTriggered(ctx, evt);
}
private WebSocketServerHandshaker handshaker;
private static final String WEB_SOCKET_URL = "ws://localhost:8888/websocket";
//客戶端與服務端建立連線的時候呼叫
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
NettyConfig.group.add(ctx.channel());
System.out.println("客戶端與服務端連線開啟...");
super.channelActive(ctx);
}
//客戶端與服務端斷開連線的時候呼叫
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
NettyConfig.group.remove(ctx.channel());
System.out.println("客戶端與服務端連線關閉...");
}
//服務端接收客戶端傳送過來的資料結束之後呼叫
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
//工程出現異常的時候呼叫
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
/**
* 處理客戶端與服務端之前的websocket業務
* @param ctx
* @param frame
*/
private void handWebsocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame){
//判斷是否是關閉websocket的指令
if (frame instanceof CloseWebSocketFrame) {
handshaker.close(ctx.channel(), (CloseWebSocketFrame)frame.retain());
}
//判斷是否是ping訊息
if (frame instanceof PingWebSocketFrame) {
ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
return;
}
//判斷是否是二進位制訊息,如果是二進位制訊息,丟擲異常
if( ! (frame instanceof TextWebSocketFrame) ){
System.out.println("目前我們不支援二進位制訊息");
throw new RuntimeException("【"+this.getClass().getName()+"】不支援訊息");
}
//返回應答訊息
//獲取客戶端向服務端傳送的訊息
String request = ((TextWebSocketFrame) frame).text();
System.out.println("服務端收到客戶端的訊息====>>>" + request);
TextWebSocketFrame tws = new TextWebSocketFrame(new Date().toString()
+ ctx.channel().id()
+ " ===>>> "
+ request);
//群發,服務端向每個連線上來的客戶端群發訊息
NettyConfig.group.writeAndFlush(tws);
}
/**
* 處理客戶端向服務端發起http握手請求的業務
* @param ctx
* @param req
*/
private void handHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req){
if (!req.decoderResult().isSuccess()
|| ! ("websocket".equals(req.headers().get("Upgrade")))) {
sendHttpResponse(ctx, req,
new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
return;
}
//構造握手響應返回,本機測試
WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(
WEB_SOCKET_URL, null, false);
handshaker = wsFactory.newHandshaker(req);
//把握手訊息返回給客戶端
if (handshaker == null) {
WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
}else{
handshaker.handshake(ctx.channel(), req);
}
}
/**
* 服務端向客戶端響應訊息
* @param ctx
* @param req
* @param res
*/
private void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req,
DefaultFullHttpResponse res){
if (res.status().code() != 200) {
ByteBuf buf = Unpooled.copiedBuffer(res.getStatus().toString(), CharsetUtil.UTF_8);
res.content().writeBytes(buf);
buf.release();
setContentLength(res,res.content().readableBytes());
}
//如果是非Keep-Alive,關閉連線
ChannelFuture f = ctx.channel().writeAndFlush(res);
if (!isKeepAlive(req) || res.getStatus().code() != 200) {
f.addListener(ChannelFutureListener.CLOSE);
}
}
}
步驟四:實現業務處理器(java客戶端)
package netty.server;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
/**
* 接收/處理/響應客戶端websocket請求的核心業務處理類
* @author liuyazhuang
*
*/
public class ServerSocketHandlerNIO extends SimpleChannelInboundHandler<Object> {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {
try {
System.out.println("Client say : " + o.toString());
//返回客戶端訊息 - 我已經接收到了你的訊息
channelHandlerContext.writeAndFlush("Received your message : " + o.toString());
} catch (Exception e) {
System.out.println("解析不出來啊哈哈");
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
super.channelRead(ctx, msg);
System.out.println(msg);
}
private int loss_connect_time = 0;
private int retryTime = 8;
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state() == IdleState.READER_IDLE) {
loss_connect_time++;
System.out.println("5 秒沒有接收到客戶端的資訊了");
if (loss_connect_time > retryTime-1) {
StringBuffer bf = new StringBuffer("關閉這個不活躍的channel").append("(因為連續檢測了").append(retryTime).append("次讀狀態都是空閒的)");
System.out.println(bf.toString());
ctx.channel().close();
}
}
}
super.userEventTriggered(ctx, evt);
}
private WebSocketServerHandshaker handshaker;
private static final String WEB_SOCKET_URL = "ws://localhost:8888/websocket";
//客戶端與服務端建立連線的時候呼叫
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
NettyConfig.group.add(ctx.channel());
System.out.println("客戶端與服務端連線開啟...");
super.channelActive(ctx);
}
//客戶端與服務端斷開連線的時候呼叫
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
NettyConfig.group.remove(ctx.channel());
System.out.println("客戶端與服務端連線關閉...");
}
//服務端接收客戶端傳送過來的資料結束之後呼叫
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
//工程出現異常的時候呼叫
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
客戶端程式碼
步驟一: 配置客戶端主程式,新增通道處理器
package netty.client;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleStateHandler;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.concurrent.TimeUnit;
public class client {
static Channel channel = null;
public void connect(int port, String host) throws Exception {
// Configure the client.
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
/*
* 解碼和編碼,應和服務端一致
* */
//字串解碼和編碼
p.addLast("decoder", new StringDecoder());
p.addLast("encoder", new StringEncoder());
p.addLast(new IdleStateHandler(3, 4, 5, TimeUnit.SECONDS));
p.addLast("ping",new HeartBeatClientHandler());
}
});
//連線客戶端
channel = b.connect(host, port).sync().channel();
//控制檯輸入
BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
for (;;) {
String line = in.readLine();
if (line == null) {
continue;
}
//向服務端傳送資料
channel.writeAndFlush(line);
}
} finally {
group.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
int port = 8888;
new client().connect(port, "127.0.0.1");//NIO模式是(非阻塞同步io)執行緒一直會卡在這裡。
}
}
步驟二: 實現通道業務處理器
package netty.client;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.ReferenceCountUtil;
import java.util.Date;
public class HeartBeatClientHandler extends ChannelInboundHandlerAdapter {
private static final int TRY_TIMES = 5;
private int currentTime = 0;
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("啟用時間是:"+new Date());
if (ctx.channel().isWritable()) {
System.out.println("連結成功,我要傳送資料啦");
}
ctx.fireChannelActive();
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("停止時間是:"+new Date());
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state() == IdleState.WRITER_IDLE) {
if(currentTime <= TRY_TIMES){
currentTime++;
StringBuffer bf = new StringBuffer("寫空閒,開始傳送心跳:")
.append("(第").append(currentTime).append("次寫心跳)")
.append("(最多寫心跳次數").append(TRY_TIMES+1).append(")");
System.out.println(bf.toString());
ctx.writeAndFlush("Heartbeat");
}
}
}else {
super.userEventTriggered(ctx, evt);
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String message = (String) msg;
System.out.println(message);
if (message.equals("Received your message : Heartbeat")) {
ctx.write("has read message from server");
ctx.flush();
}
ReferenceCountUtil.release(msg);
}
}