1. 程式人生 > >Netty入門基礎筆記

Netty入門基礎筆記

一、Netty 入門基礎

1.1、基礎概念

Netty是一個 NIO client-server(客戶端伺服器)框架,使用Netty可以快速開發網路應用,例如伺服器和客戶端協議。Netty提供了一種新的方式來開發網路應用程式,這種新的方式使得它很容易使用和有很強的擴充套件性。Netty的內部實現是很複雜的,但是Netty提供了簡單易用的api從網路處理程式碼中解耦業務邏輯。Netty是完全基於NIO實現的,所以整個Netty都是非同步的

網路應用程式通常需要有較高的可擴充套件性,無論是Netty還是其他的基於JavaNIO的框架,都會提供可擴充套件性的解決方案。Netty中 一個關鍵組成部分是它的非同步特性

Netty是最流行的NIO框架, 他的健壯性、功能、效能、可定製性和可擴充套件性在同類框架都是首屈一指的。 它已經得到成百,上千的商業/商用專案驗證,如HadoopRPC框架Avro、 強大的RocketMQ、還有主流的分散式通訊框架Dubbox等等

1.2 架構組成

在這裡插入圖片描述

二、入門案例

2.1 實現步驟

Netty實現通訊的步驟:

  • 建立兩個的NIO執行緒組,-一個專門用於網路事件處理(接受客戶端的連線),另一-個則進行網路通訊讀寫。
  • 建立一個ServerBootstrap物件, 配置Netty的一系列引數, 例如接受傳出資料的快取大小等等。
  • 建立一個實際處理資料的類Channellnitializer,
    進行初始化的準備工作,比如設定接受傳出資料的字符集、格式、已經實際處理資料的介面。
  • 繫結埠,執行同步阻塞方法等待伺服器端啟動即可。

2.2 入門案例

  • maven依賴
 <!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
 <dependency>
     <groupId>io.netty</groupId>
     <artifactId>netty-all</artifactId>
     <version>5.0.0.Alpha2</
version
>
</dependency>
  • 服務端程式碼
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class Server {

	public static void main(String[] args) throws Exception {
		//1 建立線兩個程組 
		//一個是用於處理伺服器端接收客戶端連線的
		//一個是進行網路通訊的(網路讀寫的)
		EventLoopGroup pGroup = new NioEventLoopGroup();
		EventLoopGroup cGroup = new NioEventLoopGroup();
		
		//2 建立輔助工具類,用於伺服器通道的一系列配置
		ServerBootstrap b = new ServerBootstrap();
		b.group(pGroup, cGroup)		//繫結倆個執行緒組
		.channel(NioServerSocketChannel.class)		//指定NIO的模式
		.option(ChannelOption.SO_BACKLOG, 1024)		//設定tcp緩衝區
		.option(ChannelOption.SO_SNDBUF, 32*1024)	//設定傳送緩衝大小
		.option(ChannelOption.SO_RCVBUF, 32*1024)	//這是接收緩衝大小
		.option(ChannelOption.SO_KEEPALIVE, true)	//保持連線
		.childHandler(new ChannelInitializer<SocketChannel>() {
			@Override
			protected void initChannel(SocketChannel sc) throws Exception {
				//3 在這裡配置具體資料接收方法的處理
				sc.pipeline().addLast(new ServerHandler());
			}
		});
		
		//4 進行繫結 
		ChannelFuture cf1 = b.bind(8765).sync();
		//5 等待關閉
		cf1.channel().closeFuture().sync();

		pGroup.shutdownGracefully();
		cGroup.shutdownGracefully();
	}
}
  • 服務端的handler
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

public class ServerHandler extends ChannelHandlerAdapter {
	@Override
	public void channelActive(ChannelHandlerContext ctx) throws Exception {
		System.out.println("server channel active... ");
	}
	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg)
			throws Exception {
			ByteBuf buf = (ByteBuf) msg;
			byte[] req = new byte[buf.readableBytes()];
			buf.readBytes(req);
			String body = new String(req, "utf-8");
			System.out.println("Server :" + body );
			String response = "進行返回給客戶端的響應:" + body ;
			ctx.writeAndFlush(Unpooled.copiedBuffer(response.getBytes()));
			//.addListener(ChannelFutureListener.CLOSE);
	}

	@Override
	public void channelReadComplete(ChannelHandlerContext ctx)
			throws Exception {
		System.out.println("讀完了");
		ctx.flush();
	}
	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable t)
			throws Exception {
		ctx.close();
	}
}
  • 客戶端程式碼
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

public class Client {
	public static void main(String[] args) throws Exception{
		EventLoopGroup group = new NioEventLoopGroup();
		Bootstrap b = new Bootstrap();
		b.group(group)
		.channel(NioSocketChannel.class)
		.handler(new ChannelInitializer<SocketChannel>() {
			@Override
			protected void initChannel(SocketChannel sc) throws Exception {
				sc.pipeline().addLast(new ClientHandler());
			}
		});
		
		ChannelFuture cf1 = b.connect("127.0.0.1", 8765).sync();
		//傳送訊息
		Thread.sleep(1000);
		cf1.channel().writeAndFlush(Unpooled.copiedBuffer("777".getBytes()));
		cf1.channel().writeAndFlush(Unpooled.copiedBuffer("666".getBytes()));
		Thread.sleep(2000);
		cf1.channel().writeAndFlush(Unpooled.copiedBuffer("888".getBytes()));
		cf1.channel().closeFuture().sync();
		group.shutdownGracefully();
	}
}
  • 客戶端handler
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.ReferenceCountUtil;

public class ClientHandler extends ChannelHandlerAdapter{

	@Override
	public void channelActive(ChannelHandlerContext ctx) throws Exception {
	}
	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		try {
			ByteBuf buf = (ByteBuf) msg;
			
			byte[] req = new byte[buf.readableBytes()];
			buf.readBytes(req);
			
			String body = new String(req, "utf-8");
			System.out.println("Client :" + body );
			String response = "收到伺服器端的返回資訊:" + body;
		} finally {
			ReferenceCountUtil.release(msg);
		}
	}
	@Override
	public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {

	}
	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
			throws Exception {
		ctx.close();
	}
}

三、TCP粘包、拆包問題

3.1 概念引入

TCP是一個“流”協議,所謂流就是沒有界限的。大家可以想象下如果河裡的水就好比資料,他們是連成一片的, 沒有分界線,TCP底層並不瞭解上層的業務資料具體的含義,它會根據TCP緩衝區的實際情況進行包的劃分,也就是說,在業務上, 我們一個完整的包可能會被TCP分成多個包進行傳送, 也可能把多個小包封裝成一個大的資料包傳送出去,這就是所謂的TCP粘包、拆包問題。

3.2 問題原因與解決方案:

分析TCP粘包、拆包問題的產生原因:

  1. 應用程式write寫入的位元組大小大於套介面傳送緩衝區的大小
  2. 進行MSS大小的TCP分段
  3. 乙太網幀的payload大於MTU進行IP分片

粘包拆包問題的解決方案,常有三種方案:

  1. 訊息定長,例如每個報文的大小固定為200個位元組,如果不夠,空位補空格;
  2. 在包尾部增加特殊字元進行分割,例如加回車等
  3. 講訊息分為訊息頭和訊息體,在訊息頭中包含表示訊息總長度的欄位,然後進行業務邏輯的處理

3.3 Netty解決方法

  • 分隔符類DelimiterBasedFrameDecoder (自定義分隔符)
  • FixedLengthFrameDecoder(定長)

3.3.1 分隔符方式

  • 在服務端,修改輔助類
//伺服器輔助類
	ServerBootstrap b = new ServerBootstrap();
	b.group(pGroup, cGroup)
	 .channel(NioServerSocketChannel.class)
	 .option(ChannelOption.SO_BACKLOG, 1024)
	 .option(ChannelOption.SO_SNDBUF, 32*1024)
	 .option(ChannelOption.SO_RCVBUF, 32*1024)
	 .childHandler(new ChannelInitializer<SocketChannel>() {
		@Override
		protected void initChannel(SocketChannel sc) throws Exception {
			//設定特殊分隔符
			ByteBuf buf = Unpooled.copiedBuffer("$_".getBytes());
			sc.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, buf));
			//設定字串形式的解碼
			sc.pipeline().addLast(new StringDecoder());
			sc.pipeline().addLast(new ServerHandler());
		}
	});
  • client端同時修改分隔符
Bootstrap b = new Bootstrap();
b.group(group)
 .channel(NioSocketChannel.class)
 .handler(new ChannelInitializer<SocketChannel>() {
	@Override
	protected void initChannel(SocketChannel sc) throws Exception {
		ByteBuf buf = Unpooled.copiedBuffer("$_".getBytes());
		sc.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, buf));
		sc.pipeline().addLast(new StringDecoder());
		sc.pipeline().addLast(new ClientHandler());
	}
});

3.3.2 長連線和短連線

長短連線問題,通過新增監聽器,去監聽服務端訊息是否寫完

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
		throws Exception {
		ByteBuf buf = (ByteBuf) msg;
		byte[] req = new byte[buf.readableBytes()];
		buf.readBytes(req);
		String body = new String(req, "utf-8");
		System.out.println("Server :" + body );
		String response = "進行返回給客戶端的響應:" + body ;
		//當確認服務端確認寫完,則斷開連線
		ctx.writeAndFlush(Unpooled.copiedBuffer(response.getBytes()))
		.addListener(ChannelFutureListener.CLOSE);
}

3.3.3 服務端多次write,一次flush

public class Client {
	public static void main(String[] args) throws Exception{
		EventLoopGroup group = new NioEventLoopGroup();
		Bootstrap b = new Bootstrap();
		b.group(group)
		.channel(NioSocketChannel.class)
		.handler(new ChannelInitializer<SocketChannel>() {
			@Override
			protected void initChannel(SocketChannel sc) throws Exception {
				sc.pipeline().addLast(new ClientHandler());
			}
		});
		
		ChannelFuture cf1 = b.connect("127.0.0.1", 8765).sync();
		//傳送訊息  
		Thread.sleep(1000);
		//寫的快取區
		cf1.channel().write(Unpooled.copiedBuffer("hello netty".getBytes()));
		cf1.channel().write(Unpooled.copiedBuffer("hello netty".getBytes()));
		cf1.channel().write(Unpooled.copiedBuffer("hello netty".getBytes()));
		//需要flush。否則不會寫出
		cf1.channel().flush();
		cf1.channel().closeFuture().sync();
		group.shutdownGracefully();
	}
}
  • 控制檯輸出情況:
 Client :進行返回給客戶端的響應:hello nettyhello nettyhello netty

3.3.4 停頓傳送

public class Client {
	public static void main(String[] args) throws Exception{
		EventLoopGroup group = new NioEventLoopGroup();
		Bootstrap b = new Bootstrap();
		b.group(group)
		.channel(NioSocketChannel.class)
		.handler(new ChannelInitializer<SocketChannel>() {
			@Override
			protected void initChannel(SocketChannel sc) throws Exception {
				sc.pipeline().addLast(new ClientHandler());
			}
		});
		
		ChannelFuture cf1 = b.connect("127.0.0.1", 8765).sync();
		//傳送訊息
		cf1.channel().writeAndFlush(Unpooled.copiedBuffer("777".getBytes()));
		cf1.channel().writeAndFlush(Unpooled.copiedBuffer("666".getBytes()));
		//停頓,當一個間隔,則會發生一次輸出。
		Thread.sleep(2000);
		cf1.channel().writeAndFlush(Unpooled.copiedBuffer("888".getBytes()));
		cf1.channel().closeFuture().sync();
		group.shutdownGracefully();
	}
}
  • 控制檯輸出情況
Client :進行返回給客戶端的響應:777666
Client :進行返回給客戶端的響應:888

3.3.5 繫結多個 埠

  • 服務端
public class Serv