「Netty實戰 02」手把手教你實現自己的第一個 Netty 應用!新手也能搞懂!
很多小夥伴搞不清楚為啥要學習 Netty ,今天這篇文章開始之前,簡單說一下自己的看法:
@目錄
覺得不錯的話,歡迎 star!ღ( ´・ᴗ・` )比心
- Netty 從入門到實戰系列文章地址:https://github.com/Snailclimb/netty-practical-tutorial 。
- RPC 框架原始碼地址:https://github.com/Snailclimb/guide-rpc-framework
下面,我會帶著大家搭建自己的第一個 Netty 版的 Hello World 小程式。
首先,讓我們來建立服務端。
服務端
我們可以通過 ServerBootstrap
來引導我們啟動一個簡單的 Netty 服務端,為此,你必須要為其指定下面三類屬性:
- 執行緒組(一般需要兩個執行緒組,一個負責接處理客戶端的連線,一個負責具體的 IO 處理)
- IO 模型(BIO/NIO)
- 自定義
ChannelHandler
(處理客戶端發過來的資料並返回資料給客戶端)
建立服務端
/** * @author shuang.kou * @createTime 2020年05月14日 20:28:00 */ public final class HelloServer { private final int port; public HelloServer(int port) { this.port = port; } private void start() throws InterruptedException { // 1.bossGroup 用於接收連線,workerGroup 用於具體的處理 EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { //2.建立服務端啟動引導/輔助類:ServerBootstrap ServerBootstrap b = new ServerBootstrap(); //3.給引導類配置兩大執行緒組,確定了執行緒模型 b.group(bossGroup, workerGroup) // (非必備)列印日誌 .handler(new LoggingHandler(LogLevel.INFO)) // 4.指定 IO 模型 .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) { ChannelPipeline p = ch.pipeline(); //5.可以自定義客戶端訊息的業務處理邏輯 p.addLast(new HelloServerHandler()); } }); // 6.繫結埠,呼叫 sync 方法阻塞知道繫結完成 ChannelFuture f = b.bind(port).sync(); // 7.阻塞等待直到伺服器Channel關閉(closeFuture()方法獲取Channel 的CloseFuture物件,然後呼叫sync()方法) f.channel().closeFuture().sync(); } finally { //8.優雅關閉相關執行緒組資源 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public static void main(String[] args) throws InterruptedException { new HelloServer(8080).start(); } }
簡單解析一下服務端的建立過程具體是怎樣的:
1.建立了兩個 NioEventLoopGroup
物件例項:bossGroup
和 workerGroup
。
bossGroup
: 用於處理客戶端的 TCP 連線請求。workerGroup
: 負責每一條連線的具體讀寫資料的處理邏輯,真正負責 I/O 讀寫操作,交由對應的 Handler 處理。
舉個例子:我們把公司的老闆當做 bossGroup,員工當做 workerGroup,bossGroup 在外面接完活之後,扔給 workerGroup 去處理。一般情況下我們會指定 bossGroup 的 執行緒數為 1(併發連線量不大的時候) ,workGroup 的執行緒數量為 CPU 核心數 *2
NioEventLoopGroup
類的無參建構函式設定執行緒數量的預設值就是 CPU 核心數 *2 。
2.建立一個服務端啟動引導/輔助類: ServerBootstrap
,這個類將引導我們進行服務端的啟動工作。
3.通過 .group()
方法給引導類 ServerBootstrap
配置兩大執行緒組,確定了執行緒模型。
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
4.通過channel()
方法給引導類 ServerBootstrap
指定了 IO 模型為NIO
NioServerSocketChannel
:指定服務端的 IO 模型為 NIO,與 BIO 程式設計模型中的ServerSocket
對應NioSocketChannel
: 指定客戶端的 IO 模型為 NIO, 與 BIO 程式設計模型中的Socket
對應
5.通過 .childHandler()
給引導類建立一個ChannelInitializer
,然後指定了服務端訊息的業務處理邏輯也就是自定義的ChannelHandler
物件
6.呼叫 ServerBootstrap
類的 bind()
方法繫結埠 。
//bind()是非同步的,但是,你可以通過 `sync()`方法將其變為同步。
ChannelFuture f = b.bind(port).sync();
自定義服務端 ChannelHandler 處理訊息
HelloServerHandler.java
/**
* @author shuang.kou
* @createTime 2020年05月14日 20:39:00
*/
@Sharable
public class HelloServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
try {
ByteBuf in = (ByteBuf) msg;
System.out.println("message from client:" + in.toString(CharsetUtil.UTF_8));
// 傳送訊息給客戶端
ctx.writeAndFlush(Unpooled.copiedBuffer("你也好!", CharsetUtil.UTF_8));
} finally {
ReferenceCountUtil.release(msg);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// Close the connection when an exception is raised.
cause.printStackTrace();
ctx.close();
}
}
這個邏輯處理器繼承自ChannelInboundHandlerAdapter
並重寫了下面 2 個方法:
channelRead()
:服務端接收客戶端傳送資料呼叫的方法exceptionCaught()
:處理客戶端訊息發生異常的時候被呼叫
客戶端
建立客戶端
public final class HelloClient {
private final String host;
private final int port;
private final String message;
public HelloClient(String host, int port, String message) {
this.host = host;
this.port = port;
this.message = message;
}
private void start() throws InterruptedException {
//1.建立一個 NioEventLoopGroup 物件例項
EventLoopGroup group = new NioEventLoopGroup();
try {
//2.建立客戶端啟動引導/輔助類:Bootstrap
Bootstrap b = new Bootstrap();
//3.指定執行緒組
b.group(group)
//4.指定 IO 模型
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
// 5.這裡可以自定義訊息的業務處理邏輯
p.addLast(new HelloClientHandler(message));
}
});
// 6.嘗試建立連線
ChannelFuture f = b.connect(host, port).sync();
// 7.等待連線關閉(阻塞,直到Channel關閉)
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
new HelloClient("127.0.0.1",8080, "你好,你真帥啊!哥哥!").start();
}
}
繼續分析一下客戶端的建立流程:
1.建立一個 NioEventLoopGroup
物件例項 (服務端建立了兩個 NioEventLoopGroup
物件)
2.建立客戶端啟動的引導類是 Bootstrap
3.通過 .group()
方法給引導類 Bootstrap
配置一個執行緒組
4.通過channel()
方法給引導類 Bootstrap
指定了 IO 模型為NIO
5.通過 .childHandler()
給引導類建立一個ChannelInitializer
,然後指定了客戶端訊息的業務處理邏輯也就是自定義的ChannelHandler
物件
6.呼叫 Bootstrap
類的 connect()
方法連線服務端,這個方法需要指定兩個引數:
inetHost
: ip 地址inetPort
: 埠號
public ChannelFuture connect(String inetHost, int inetPort) {
return this.connect(InetSocketAddress.createUnresolved(inetHost, inetPort));
}
public ChannelFuture connect(SocketAddress remoteAddress) {
ObjectUtil.checkNotNull(remoteAddress, "remoteAddress");
this.validate();
return this.doResolveAndConnect(remoteAddress, this.config.localAddress());
}
connect
方法返回的是一個 Future
型別的物件
public interface ChannelFuture extends Future<Void> {
......
}
也就是說這個方是非同步的,我們通過 addListener
方法可以監聽到連線是否成功,進而打印出連線資訊。具體做法很簡單,只需要對程式碼進行以下改動:
ChannelFuture f = b.connect(host, port).addListener(future -> {
if (future.isSuccess()) {
System.out.println("連線成功!");
} else {
System.err.println("連線失敗!");
}
}).sync();
自定義客戶端 ChannelHandler 處理訊息
HelloClientHandler.java
/**
* @author shuang.kou
* @createTime 2020年05月14日 20:46:00
*/
@Sharable
public class HelloClientHandler extends ChannelInboundHandlerAdapter {
private final String message;
public HelloClientHandler(String message) {
this.message = message;
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
System.out.println("client sen msg to server " + message);
ctx.writeAndFlush(Unpooled.copiedBuffer(message, CharsetUtil.UTF_8));
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf in = (ByteBuf) msg;
try {
System.out.println("client receive msg from server: " + in.toString(CharsetUtil.UTF_8));
} finally {
ReferenceCountUtil.release(msg);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
這個邏輯處理器繼承自 ChannelInboundHandlerAdapter
,並且覆蓋了下面三個方法:
channelActive()
:客戶端和服務端的連線建立之後就會被呼叫channelRead
:客戶端接收服務端傳送資料呼叫的方法exceptionCaught
:處理訊息發生異常的時候被呼叫
執行程式
首先執行服務端 ,然後再執行客戶端。
如果你看到,服務端控制檯打印出:
message from client:你好,你真帥啊!哥哥!
客戶端控制檯打印出:
client sen msg to server 你好,你真帥啊!哥哥!
client receive msg from server: 你也好!
說明你的 Netty 版的 Hello World 已經完成了!
總結
這篇文章我們自己實現了一個 Netty 版的 Hello World,並且詳細介紹了服務端和客戶端的建立流程。客戶端和服務端這塊的建立流程,套路基本都差不多,差別可能就在相關配置方面。
文中涉及的程式碼,你可以在這裡找到:https://github.com/Snailclimb/guide-rpc-framework-learning/tree/master/src/main/java/github/javaguide/netty/echo 。