Socket粘包問題終極解決方案—Netty版(2W字)!
阿新 • • 發佈:2021-01-13
上一篇我們講了《[Socket粘包問題的3種解決方案](https://mp.weixin.qq.com/s/ODxGlLrohCveH-2m-BSDWQ)》,但沒想到評論區竟然炸了。介於大家的熱情討論,以及不同的反饋意見,本文就來做一個擴充套件和延伸,試圖找到問題的最優解,以及訊息通訊的最優解決方案。
在正式開始之前,我們先對上篇評論中的幾個典型問題做一個簡單的回覆,不感興趣的朋友可直接劃過。
# 問題一:TCP存在粘包問題嗎?
先說答案:**TCP 本身並沒有粘包和半包一說,因為 TCP 本質上只是一個傳輸控制協議(Transmission Control Protocol,TCP),它是一種面向連線的、可靠的、基於位元組流的傳輸層通訊協議**,由 IETF 的 RFC 793 定義。
所謂的**協議本質上是一個約定**,就好比 Java 程式設計約定使用駝峰命名法一樣,約定的意義是為了讓通訊雙方,能夠正常的進行訊息互換的,那粘包和半包問題又是如何產生的呢?
這是因為在 TCP 的互動中,資料是以位元組流的形式進行傳輸的,而“流”的傳輸是沒有邊界的,因為沒有邊界所以就不能區分訊息的歸屬,從而就會產生粘包和半包問題(粘包和半包的定義,詳見[上一篇](https://mp.weixin.qq.com/s/ODxGlLrohCveH-2m-BSDWQ))。**所以說 TCP 協議本身並不存在粘包和半包問題,只是在使用中如果不能有效的確定流的邊界就會產生粘包和半包問題。**
# 問題二:分隔符是最優解決方案?
坦白的說,經過評論區大家的耐心“開導”,我也意識到了以結束符作為最終的解決方案存在一定的侷限性,比如當一條訊息中間如果出現了結束符就會造成半包的問題,所以如果是複雜的字串要對內容進行編碼和解碼處理,這樣才能保證結束符的正確性。
# 問題三:Socket 高效嗎?
這個問題的答案是否定的,其實上文在開頭已經描述了應用場景:「傳統的 Socket 程式設計」,學習它的意義就在於理解更早期更底層的一些知識,當然作為補充本文會提供更加高效的訊息通訊方案——Netty 通訊。
---
聊完了以上問題,接下來咱們先來補充一下上篇文章中提到的,將訊息分為訊息頭和訊息體的程式碼實現。
# 一、封裝訊息頭和訊息體
在開始寫伺服器端和客戶端之前,咱們先來編寫一個訊息的封裝類,使用它可以將訊息封裝成訊息頭和訊息體,如下圖所示:
![image.png](https://cdn.nlark.com/yuque/0/2021/png/92791/1610438454450-1af48acb-fb9a-48b1-bb7c-b9a14dfc5217.png#align=left&display=inline&height=148&margin=%5Bobject%20Object%5D&name=image.png&originHeight=296&originWidth=1096&size=22969&status=done&style=none&width=548)
訊息頭中儲存訊息體的長度,從而確定了訊息的邊界,便解決粘包和半包問題。
### 1.訊息封裝類
訊息的封裝類中提供了兩個方法:一個是將訊息轉換成訊息頭 + 訊息體的方法,另一個是讀取訊息頭的方法,具體實現程式碼如下:
```java
/**
* 訊息封裝類
*/
class SocketPacket {
// 訊息頭儲存的長度(佔 8 位元組)
static final int HEAD_SIZE = 8;
/**
* 將協議封裝為:協議頭 + 協議體
* @param context 訊息體(String 型別)
* @return byte[]
*/
public byte[] toBytes(String context) {
// 協議體 byte 陣列
byte[] bodyByte = context.getBytes();
int bodyByteLength = bodyByte.length;
// 最終封裝物件
byte[] result = new byte[HEAD_SIZE + bodyByteLength];
// 藉助 NumberFormat 將 int 轉換為 byte[]
NumberFormat numberFormat = NumberFormat.getNumberInstance();
numberFormat.setMinimumIntegerDigits(HEAD_SIZE);
numberFormat.setGroupingUsed(false);
// 協議頭 byte 陣列
byte[] headByte = numberFormat.format(bodyByteLength).getBytes();
// 封裝協議頭
System.arraycopy(headByte, 0, result, 0, HEAD_SIZE);
// 封裝協議體
System.arraycopy(bodyByte, 0, result, HEAD_SIZE, bodyByteLength);
return result;
}
/**
* 獲取訊息頭的內容(也就是訊息體的長度)
* @param inputStream
* @return
*/
public int getHeader(InputStream inputStream) throws IOException {
int result = 0;
byte[] bytes = new byte[HEAD_SIZE];
inputStream.read(bytes, 0, HEAD_SIZE);
// 得到訊息體的位元組長度
result = Integer.valueOf(new String(bytes));
return result;
}
}
```
### 2.編寫客戶端
接下來我們來定義客戶端,在客戶端中我們新增一組待發送的訊息,隨機給伺服器端傳送一個訊息,實現程式碼如下:
```java
/**
* 客戶端
*/
class MySocketClient {
public static void main(String[] args) throws IOException {
// 啟動 Socket 並嘗試連線伺服器
Socket socket = new Socket("127.0.0.1", 9093);
// 傳送訊息合集(隨機發送一條訊息)
final String[] message = {"Hi,Java.", "Hi,SQL~", "關注公眾號|Java中文社群."};
// 建立協議封裝物件
SocketPacket socketPacket = new SocketPacket();
try (OutputStream outputStream = socket.getOutputStream()) {
// 給伺服器端傳送 10 次訊息
for (int i = 0; i < 10; i++) {
// 隨機發送一條訊息
String msg = message[new Random().nextInt(message.length)];
// 將內容封裝為:協議頭+協議體
byte[] bytes = socketPacket.toBytes(msg);
// 傳送訊息
outputStream.write(bytes, 0, bytes.length);
outputStream.flush();
}
}
}
}
```
### 3.編寫伺服器端
伺服器端我們使用執行緒池來處理每個客戶端的業務請求,實現程式碼如下:
```java
/**
* 伺服器端
*/
class MySocketServer {
public static void main(String[] args) throws IOException {
// 建立 Socket 伺服器端
ServerSocket serverSocket = new ServerSocket(9093);
// 獲取客戶端連線
Socket clientSocket = serverSocket.accept();
// 使用執行緒池處理更多的客戶端
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(100, 150, 100,
TimeUnit.SECONDS, new LinkedBlockingQueue<>(1000));
threadPool.submit(() -> {
// 客戶端訊息處理
processMessage(clientSocket);
});
}
/**
* 客戶端訊息處理
* @param clientSocket
*/
private static void processMessage(Socket clientSocket) {
// Socket 封裝物件
SocketPacket socketPacket = new SocketPacket();
// 獲取客戶端傳送的訊息物件
try (InputStream inputStream = clientSocket.getInputStream()) {
while (true) {
// 獲取訊息頭(也就是訊息體的長度)
int bodyLength = socketPacket.getHeader(inputStream);
// 訊息體 byte 陣列
byte[] bodyByte = new byte[bodyLength];
// 每次實際讀取位元組數
int readCount = 0;
// 訊息體賦值下標
int bodyIndex = 0;
// 迴圈接收訊息頭中定義的長度
while (bodyIndex <= (bodyLength - 1) &&
(readCount = inputStream.read(bodyByte, bodyIndex, bodyLength)) != -1) {
bodyIndex += readCount;
}
bodyIndex = 0;
// 成功接收到客戶端的訊息並列印
System.out.println("接收到客戶端的資訊:" + new String(bodyByte));
}
} catch (IOException ioException) {
System.out.println(ioException.getMessage());
}
}
}
```
以上程式的執行結果如下:
![image.png](https://cdn.nlark.com/yuque/0/2021/png/92791/1610000914205-fd2d7297-e8da-419a-a8f8-894e7aad9f9c.png#align=left&display=inline&height=320&margin=%5Bobject%20Object%5D&name=image.png&originHeight=640&originWidth=872&size=120304&status=done&style=none&width=436)
從上述結果可以看出,訊息通訊正常,客戶端和伺服器端的互動中並沒有出現粘包和半包的問題。
# 二、使用 Netty 實現高效通訊
以上的內容都是針對傳統 Socket 程式設計的,但要實現更加高效的通訊和連線物件的複用就要使用 NIO(Non-Blocking IO,非阻塞 IO)或者 AIO(Asynchronous IO,非同步非阻塞 IO)了。
傳統的 Socket 程式設計是 BIO(Blocking IO,同步阻塞 IO),它和 NIO 和 AIO 的區別如下:
- BIO 來自傳統的 java.io 包,它是基於流模型實現的,互動的方式是同步、阻塞方式,也就是說在讀入輸入流或者輸出流時,在讀寫動作完成之前,執行緒會一直阻塞在那裡,它們之間的呼叫是可靠的線性順序。它的優點就是程式碼比較簡單、直觀;缺點就是 IO 的效率和擴充套件性很低,容易成為應用效能瓶頸。
- NIO 是 Java 1.4 引入的 java.nio 包,提供了 Channel、Selector、Buffer 等新的抽象,可以構建多路複用的、同步非阻塞 IO 程式,同時提供了更接近作業系統底層高效能的資料操作方式。
- AIO 是 Java 1.7 之後引入的包,是 NIO 的升級版本,提供了非同步非堵塞的 IO 操作方式,因此人們叫它 AIO(Asynchronous IO),非同步 IO 是基於事件和回撥機制實現的,也就是應用操作之後會直接返回,不會堵塞在那裡,當後臺處理完成,作業系統會通知相應的執行緒進行後續的操作。
> PS:AIO 可以看作是 NIO 的升級,它也叫 NIO 2。
傳統 Socket 的通訊流程:
![image.png](https://cdn.nlark.com/yuque/0/2021/png/92791/1610438985023-0cb20bcd-5370-4a1c-974c-8e56edcf0402.png#align=left&display=inline&height=346&margin=%5Bobject%20Object%5D&name=image.png&originHeight=692&originWidth=1110&size=56670&status=done&style=none&width=555)
NIO 的通訊流程:
![image.png](https://cdn.nlark.com/yuque/0/2021/png/92791/1610439346239-5e6346a6-2c7b-4ca9-9d0d-b85cf62805c5.png#align=left&display=inline&height=448&margin=%5Bobject%20Object%5D&name=image.png&originHeight=896&originWidth=1158&size=67367&status=done&style=none&width=579)
### 使用 Netty 替代傳統 NIO 程式設計
NIO 的設計思路雖然很好,但它的程式碼編寫比較麻煩,比如 Buffer 的使用和 Selector 的編寫等。並且在面對斷線重連、包丟失和粘包等複雜問題時手動處理的成本都很大,因此我們通常會使用 Netty 框架來替代傳統的 NIO。
### Netty 是什麼?
Netty 是一個非同步、事件驅動的用來做高效能、高可靠性的網路應用框架,使用它可以快速輕鬆地開發網路應用程式,極大的簡化了網路程式設計的複雜度。
Netty 主要優點有以下幾個:
1. 框架設計優雅,底層模型隨意切換適應不同的網路協議要求;
1. 提供很多標準的協議、安全、編碼解碼的支援;
1. 簡化了 NIO 使用中的諸多不便;
1. 社群非常活躍,很多開源框架中都使用了 Netty 框架,如 Dubbo、RocketMQ、Spark 等。
Netty 主要包含以下 3 個部分,如下圖所示:
![image.png](https://cdn.nlark.com/yuque/0/2021/png/92791/1610348157672-f5587d2d-bfa9-42c1-86fc-758e33b1edd5.png#align=left&display=inline&height=280&margin=%5Bobject%20Object%5D&name=image.png&originHeight=560&originWidth=920&size=182524&status=done&style=none&width=460)
這 3 個部分的功能介紹如下。
#### 1. Core 核心層
Core 核心層是 Netty 最精華的內容,它提供了底層網路通訊的通用抽象和實現,包括可擴充套件的事件模型、通用的通訊 API、支援零拷貝的 ByteBuf 等。
#### 2. Protocol Support 協議支援層
協議支援層基本上覆蓋了主流協議的編解碼實現,如 HTTP、SSL、Protobuf、壓縮、大檔案傳輸、WebSocket、文字、二進位制等主流協議,此外 Netty 還支援自定義應用層協議。Netty 豐富的協議支援降低了使用者的開發成本,基於 Netty 我們可以快速開發 HTTP、WebSocket 等服務。
#### 3. Transport Service 傳輸服務層
傳輸服務層提供了網路傳輸能力的定義和實現方法。它支援 Socket、HTTP 隧道、虛擬機器管道等傳輸方式。Netty 對 TCP、UDP 等資料傳輸做了抽象和封裝,使用者可以更聚焦在業務邏輯實現上,而不必關係底層資料傳輸的細節。
### Netty 使用
對 Netty 有了大概的認識之後,接下來我們用 Netty 來編寫一個基礎的通訊伺服器,它包含兩個端:伺服器端和客戶端,客戶端負責傳送訊息,伺服器端負責接收並列印訊息,具體的實現步驟如下。
#### 1.新增 Netty 框架
首先我們需要先新增 Netty 框架的支援,如果是 Maven 專案新增如下配置即可:
```xml
io.netty
netty-all
4.1.56.Final
```
##### Netty 版本說明
Netty 的 3.x 和 4.x 為主流的穩定版本,而最新的 5.x 已經是放棄的測試版了,因此推薦使用 Netty 4.x 的最新穩定版。
#### 2. 伺服器端實現程式碼
按照官方的推薦,這裡將伺服器端的程式碼分為以下 3 個部分:
- MyNettyServer:伺服器端的核心業務程式碼;
- ServerInitializer:伺服器端通道(Channel)初始化;
- ServerHandler:伺服器端接收到資訊之後的處理邏輯。
> PS:Channel 字面意思為“通道”,它是網路通訊的載體。Channel 提供了基本的 API 用於網路 I/O 操作,如 register、bind、connect、read、write、flush 等。Netty 自己實現的 Channel 是以 JDK NIO Channel 為基礎的,相比較於 JDK NIO,Netty 的 Channel 提供了更高層次的抽象,同時遮蔽了底層 Socket 的複雜性,賦予了 Channel 更加強大的功能,你在使用 Netty 時基本不需要再與 Java Socket 類直接打交道。
伺服器端的實現程式碼如下:
```java
// 定義伺服器的埠號
static final int PORT = 8007;
/**
* 伺服器端
*/
static class MyNettyServer {
public static void main(String[] args) {
// 建立一個執行緒組,用來負責接收客戶端連線
EventLoopGroup bossGroup = new NioEventLoopGroup();
// 建立另一個執行緒組,用來負責 I/O 的讀寫
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
// 建立一個 Server 例項(可理解為 Netty 的入門類)
ServerBootstrap b = new ServerBootstrap();
// 將兩個執行緒池設定到 Server 例項
b.group(bossGroup, workerGroup)
// 設定 Netty 通道的型別為 NioServerSocket(非阻塞 I/O Socket 伺服器)
.channel(NioServerSocketChannel.class)
// 設定建立連線之後的執行器(ServerInitializer 是我建立的一個自定義類)
.childHandler(new ServerInitializer());
// 繫結埠並且進行同步
ChannelFuture future = b.bind(PORT).sync();
// 對關閉通道進行監聽
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 資源關閉
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
/**
* 服務端通道初始化
*/
static class ServerInitializer extends ChannelInitializer {
// 字串編碼器和解碼器
private static final StringDecoder DECODER = new StringDecoder();
private static final StringEncoder ENCODER = new StringEncoder();
// 伺服器端連線之後的執行器(自定義的類)
private static final ServerHandler SERVER_HANDLER = new ServerHandler();
/**
* 初始化通道的具體執行方法
*/
@Override
public void initChannel(SocketChannel ch) {
// 通道 Channel 設定
ChannelPipeline pipeline = ch.pipeline();
// 設定(字串)編碼器和解碼器
pipeline.addLast(DECODER);
pipeline.addLast(ENCODER);
// 伺服器端連線之後的執行器,接收到訊息之後的業務處理
pipeline.addLast(SERVER_HANDLER);
}
}
/**
* 伺服器端接收到訊息之後的業務處理類
*/
static class ServerHandler extends SimpleChannelInboundHandler {
/**
* 讀取到客戶端的訊息
*/
@Override
public void channelRead0(ChannelHandlerContext ctx, String request) {
if (!request.isEmpty()) {
System.out.println("接到客戶端的訊息:" + request);
}
}
/**
* 資料讀取完畢
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
/**
* 異常處理,列印異常並關閉通道
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
```
#### 3.客戶端實現程式碼
客戶端的程式碼實現也是分為以下 3 個部分:
- MyNettyClient:客戶端核心業務程式碼;
- ClientInitializer:客戶端通道初始化;
- ClientHandler:接收到訊息之後的處理邏輯。
客戶端的實現程式碼如下:
```java
/**
* 客戶端
*/
static class MyNettyClient {
public static void main(String[] args) {
// 建立事件迴圈執行緒組(客戶端的執行緒組只有一個)
EventLoopGroup group = new NioEventLoopGroup();
try {
// Netty 客戶端啟動物件
Bootstrap b = new Bootstrap();
// 設定啟動引數
b.group(group)
// 設定通道型別
.channel(NioSocketChannel.class)
// 設定啟動執行器(負責啟動事件的業務執行,ClientInitializer 為自定義的類)
.handler(new ClientInitializer());
// 連線伺服器端並同步通道
Channel ch = b.connect("127.0.0.1", 8007).sync().channel();
// 傳送訊息
ChannelFuture lastWriteFuture = null;
// 給伺服器端傳送 10 條訊息
for (int i = 0; i < 10; i++) {
// 傳送給伺服器訊息
lastWriteFuture = ch.writeAndFlush("Hi,Java.");
}
// 在關閉通道之前,同步重新整理所有的訊息
if (lastWriteFuture != null) {
lastWriteFuture.sync();
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 釋放資源
group.shutdownGracefully();
}
}
}
/**
* 客戶端通道初始化類
*/
static class ClientInitializer extends ChannelInitializer {
// 字串編碼器和解碼器
private static final StringDecoder DECODER = new StringDecoder();
private static final StringEncoder ENCODER = new StringEncoder();
// 客戶端連線成功之後業務處理
private static final ClientHandler CLIENT_HANDLER = new ClientHandler();
/**
* 初始化客戶端通道
*/
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
// 設定(字串)編碼器和解碼器
pipeline.addLast(DECODER);
pipeline.addLast(ENCODER);
// 客戶端連線成功之後的業務處理
pipeline.addLast(CLIENT_HANDLER);
}
}
/**
* 客戶端連線成功之後的業務處理
*/
static class ClientHandler extends SimpleChannelInboundHandler {
/**
* 讀取到伺服器端的訊息
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) {
System.err.println("接到伺服器的訊息:" + msg);
}
/**
* 異常處理,列印異常並關閉通道
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
```
從以上程式碼可以看出,我們程式碼實現的功能是,客戶端給伺服器端傳送 10 條訊息。
編寫完上述程式碼之後,我們就可以啟動伺服器端和客戶端了,啟動之後,它們的執行結果如下:
![image.png](https://cdn.nlark.com/yuque/0/2021/png/92791/1610079100392-b43a282f-c01a-43f9-a271-acd9054eb431.png#align=left&display=inline&height=93&margin=%5Bobject%20Object%5D&name=image.png&originHeight=186&originWidth=2138&size=33425&status=done&style=none&width=1069)
從上述結果中可以看出,雖然客戶端和伺服器端實現了通訊,但在 Netty 的使用中依然存在粘包的問題,伺服器端一次收到了 10 條訊息,而不是每次只收到一條訊息,因此接下來我們要解決掉 Netty 中的粘包問題。
# 三、解決 Netty 粘包問題
在 Netty 中,解決粘包問題的常用方案有以下 3 種:
1. 設定固定大小的訊息長度,如果長度不足則使用空字元彌補,它的缺點比較明顯,比較消耗網路流量,因此不建議使用;
1. 使用分隔符來確定訊息的邊界,從而避免粘包和半包問題的產生;
1. 將訊息分為訊息頭和訊息體,在頭部中儲存有當前整個訊息的長度,只有在讀取到足夠長度的訊息之後才算是讀到了一個完整的訊息。
接下來我們分別來看後兩種推薦的解決方案。
## 1.使用分隔符解決粘包問題
在 Netty 中提供了 DelimiterBasedFrameDecoder 類用來以特殊符號作為訊息的結束符,從而解決粘包和半包的問題。
它的核心實現程式碼是在初始化通道(Channel)時,通過設定 DelimiterBasedFrameDecoder 來分隔訊息,需要在客戶端和伺服器端都進行設定,具體實現程式碼如下。
伺服器端核心實現程式碼如下:
```java
/**
* 服務端通道初始化
*/
static class ServerInitializer extends ChannelInitializer {
// 字串編碼器和解碼器
private static final StringDecoder DECODER = new StringDecoder();
private static final StringEncoder ENCODER = new StringEncoder();
// 伺服器端連線之後的執行器(自定義的類)
private static final ServerHandler SERVER_HANDLER = new ServerHandler();
/**
* 初始化通道的具體執行方法
*/
@Override
public void initChannel(SocketChannel ch) {
// 通道 Channel 設定
ChannelPipeline pipeline = ch.pipeline();
// 19 行:設定結尾分隔符【核心程式碼】(引數1:為訊息的最大長度,可自定義;引數2:分隔符[此處以換行符為分隔符])
pipeline.addLast(new DelimiterBasedFrameDecoder(1024, Delimiters.lineDelimiter()));
// 設定(字串)編碼器和解碼器
pipeline.addLast(DECODER);
pipeline.addLast(ENCODER);
// 伺服器端連線之後的執行器,接收到訊息之後的業務處理
pipeline.addLast(SERVER_HANDLER);
}
}
```
核心程式碼為第 19 行,程式碼中已經備註了方法的含義,這裡就不再贅述。
客戶端的核心實現程式碼如下:
```java
/**
* 客戶端通道初始化類
*/
static class ClientInitializer extends ChannelInitializer {
// 字串編碼器和解碼器
private static final StringDecoder DECODER = new StringDecoder();
private static final StringEncoder ENCODER = new StringEncoder();
// 客戶端連線成功之後業務處理
private static final ClientHandler CLIENT_HANDLER = new ClientHandler();
/**
* 初始化客戶端通道
*/
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
// 17 行:設定結尾分隔符【核心程式碼】(引數1:為訊息的最大長度,可自定義;引數2:分隔符[此處以換行符為分隔符])
pipeline.addLast(new DelimiterBasedFrameDecoder(1024, Delimiters.lineDelimiter()));
// 設定(字串)編碼器和解碼器
pipeline.addLast(DECODER);
pipeline.addLast(ENCODER);
// 客戶端連線成功之後的業務處理
pipeline.addLast(CLIENT_HANDLER);
}
}
```
完整的伺服器端和客戶端的實現程式碼如下:
```java
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
public class NettyExample {
// 定義伺服器的埠號
static final int PORT = 8007;
/**
* 伺服器端
*/
static class MyNettyServer {
public static void main(String[] args) {
// 建立一個執行緒組,用來負責接收客戶端連線
EventLoopGroup bossGroup = new NioEventLoopGroup();
// 建立另一個執行緒組,用來負責 I/O 的讀寫
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
// 建立一個 Server 例項(可理解為 Netty 的入門類)
ServerBootstrap b = new ServerBootstrap();
// 將兩個執行緒池設定到 Server 例項
b.group(bossGroup, workerGroup)
// 設定 Netty 通道的型別為 NioServerSocket(非阻塞 I/O Socket 伺服器)
.channel(NioServerSocketChannel.class)
// 設定建立連線之後的執行器(ServerInitializer 是我建立的一個自定義類)
.childHandler(new ServerInitializer());
// 繫結埠並且進行同步
ChannelFuture future = b.bind(PORT).sync();
// 對關閉通道進行監聽
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 資源關閉
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
/**
* 服務端通道初始化
*/
static class ServerInitializer extends ChannelInitializer {
// 字串編碼器和解碼器
private static final StringDecoder DECODER = new StringDecoder();
private static final StringEncoder ENCODER = new StringEncoder();
// 伺服器端連線之後的執行器(自定義的類)
private static final ServerHandler SERVER_HANDLER = new ServerHandler();
/**
* 初始化通道的具體執行方法
*/
@Override
public void initChannel(SocketChannel ch) {
// 通道 Channel 設定
ChannelPipeline pipeline = ch.pipeline();
// 設定結尾分隔符
pipeline.addLast(new DelimiterBasedFrameDecoder(1024, Delimiters.lineDelimiter()));
// 設定(字串)編碼器和解碼器
pipeline.addLast(DECODER);
pipeline.addLast(ENCODER);
// 伺服器端連線之後的執行器,接收到訊息之後的業務處理
pipeline.addLast(SERVER_HANDLER);
}
}
/**
* 伺服器端接收到訊息之後的業務處理類
*/
static class ServerHandler extends SimpleChannelInboundHandler {
/**
* 讀取到客戶端的訊息
*/
@Override
public void channelRead0(ChannelHandlerContext ctx, String request) {
if (!request.isEmpty()) {
System.out.println("接到客戶端的訊息:" + request);
}
}
/**
* 資料讀取完畢
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
/**
* 異常處理,列印異常並關閉通道
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
/**
* 客戶端
*/
static class MyNettyClient {
public static void main(String[] args) {
// 建立事件迴圈執行緒組(客戶端的執行緒組只有一個)
EventLoopGroup group = new NioEventLoopGroup();
try {
// Netty 客戶端啟動物件
Bootstrap b = new Bootstrap();
// 設定啟動引數
b.group(group)
// 設定通道型別
.channel(NioSocketChannel.class)
// 設定啟動執行器(負責啟動事件的業務執行,ClientInitializer 為自定義的類)
.handler(new ClientInitializer());
// 連線伺服器端並同步通道
Channel ch = b.connect("127.0.0.1", PORT).sync().channel();
// 傳送訊息
ChannelFuture lastWriteFuture = null;
// 給伺服器端傳送 10 條訊息
for (int i = 0; i < 10; i++) {
// 傳送給伺服器訊息
lastWriteFuture = ch.writeAndFlush("Hi,Java.\n");
}
// 在關閉通道之前,同步重新整理所有的訊息
if (lastWriteFuture != null) {
lastWriteFuture.sync();
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 釋放資源
group.shutdownGracefully();
}
}
}
/**
* 客戶端通道初始化類
*/
static class ClientInitializer extends ChannelInitializer {
// 字串編碼器和解碼器
private static final StringDecoder DECODER = new StringDecoder();
private static final StringEncoder ENCODER = new StringEncoder();
// 客戶端連線成功之後業務處理
private static final ClientHandler CLIENT_HANDLER = new ClientHandler();
/**
* 初始化客戶端通道
*/
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
// 設定結尾分隔符
pipeline.addLast(new DelimiterBasedFrameDecoder(1024, Delimiters.lineDelimiter()));
// 設定(字串)編碼器和解碼器
pipeline.addLast(DECODER);
pipeline.addLast(ENCODER);
// 客戶端連線成功之後的業務處理
pipeline.addLast(CLIENT_HANDLER);
}
}
/**
* 客戶端連線成功之後的業務處理
*/
static class ClientHandler extends SimpleChannelInboundHandler {
/**
* 讀取到伺服器端的訊息
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) {
System.err.println("接到伺服器的訊息:" + msg);
}
/**
* 異常處理,列印異常並關閉通道
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
}
```
最終的執行結果如下圖所示:
![image.png](https://cdn.nlark.com/yuque/0/2021/png/92791/1610078880035-219d2a25-91b2-476d-b099-194afdb54170.png#align=left&display=inline&height=299&margin=%5Bobject%20Object%5D&name=image.png&originHeight=598&originWidth=554&size=78570&status=done&style=none&width=277)
從上述結果中可以看出,Netty 可以正常使用了,它已經不存在粘包和半包問題了。
## 2.封裝訊息解決粘包問題
此解決方案的核心是將訊息分為訊息頭 + 訊息體,在訊息頭中儲存訊息體的長度,從而確定一條訊息的邊界,這樣就避免了粘包和半包問題了,它的實現過程如下圖所示:
![image.png](https://cdn.nlark.com/yuque/0/2021/png/92791/1610438170937-7cdbeee0-719b-4ebc-8beb-631a5cf23cd2.png#align=left&display=inline&height=469&margin=%5Bobject%20Object%5D&name=image.png&originHeight=938&originWidth=1204&size=82419&status=done&style=none&width=602)
在 Netty 中可以通過 LengthFieldPrepender(編碼)和 LengthFieldBasedFrameDecoder(解碼)兩個類實現訊息的封裝。和上一個解決方案類似,我們需要分別在伺服器端和客戶端通過設定通道(Channel)來解決粘包問題。
伺服器端的核心程式碼如下:
```java
/**
* 服務端通道初始化
*/
static class ServerInitializer extends ChannelInitializer {
// 字串編碼器和解碼器
private static final StringDecoder DECODER = new StringDecoder();
private static final StringEncoder ENCODER = new StringEncoder();
// 伺服器端連線之後的執行器(自定義的類)
private static final NettyExample.ServerHandler SERVER_HANDLER = new NettyExample.ServerHandler();
/**
* 初始化通道的具體執行方法
*/
@Override
public void initChannel(SocketChannel ch) {
// 通道 Channel 設定
ChannelPipeline pipeline = ch.pipeline();
// 18 行:訊息解碼:讀取訊息頭和訊息體
pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4));
// 20 行:訊息編碼:將訊息封裝為訊息頭和訊息體,在訊息前新增訊息體的長度
pipeline.addLast(new LengthFieldPrepender(4));
// 設定(字串)編碼器和解碼器
pipeline.addLast(DECODER);
pipeline.addLast(ENCODER);
// 伺服器端連線之後的執行器,接收到訊息之後的業務處理
pipeline.addLast(SERVER_HANDLER);
}
}
```
其中核心程式碼是 18 行和 20 行,通過 LengthFieldPrepender 實現編碼(將訊息打包成訊息頭 + 訊息體),通過 LengthFieldBasedFrameDecoder 實現解碼(從封裝的訊息中取出訊息的內容)。
LengthFieldBasedFrameDecoder 的引數說明如下:
- 引數 1:maxFrameLength - 傳送的資料包最大長度;
- 引數 2:lengthFieldOffset - 長度域偏移量,指的是長度域位於整個資料包位元組陣列中的下標;
- 引數 3:lengthFieldLength - 長度域自己的位元組數長度;
- 引數 4:lengthAdjustment – 長度域的偏移量矯正。 如果長度域的值,除了包含有效資料域的長度外,還包含了其他域(如長度域自身)長度,那麼,就需要進行矯正。矯正的值為:包長 - 長度域的值 – 長度域偏移 – 長度域長;
- 引數 5:initialBytesToStrip – 丟棄的起始位元組數。丟棄處於有效資料前面的位元組數量。比如前面有 4 個節點的長度域,則它的值為 4。
LengthFieldBasedFrameDecoder(1024,0,4,0,4) 的意思是:資料包最大長度為 1024,長度域佔首部的四個位元組,在讀資料的時候去掉首部四個位元組(即長度域)。
客戶端的核心實現程式碼如下:
```java
/**
* 客戶端通道初始化類
*/
static class ClientInitializer extends ChannelInitializer {
// 字串編碼器和解碼器
private static final StringDecoder DECODER = new StringDecoder();
private static final StringEncoder ENCODER = new StringEncoder();
// 客戶端連線成功之後業務處理
private static final NettyExample.ClientHandler CLIENT_HANDLER = new NettyExample.ClientHandler();
/**
* 初始化客戶端通道
*/
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
// 訊息解碼:讀取訊息頭和訊息體
pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4));
// 訊息編碼:將訊息封裝為訊息頭和訊息體,在響應位元組資料前面新增訊息體長度
pipeline.addLast(new LengthFieldPrepender(4));
// 設定(字串)編碼器和解碼器
pipeline.addLast(DECODER);
pipeline.addLast(ENCODER);
// 客戶端連線成功之後的業務處理
pipeline.addLast(CLIENT_HANDLER);
}
}
```
完整的伺服器端和客戶端的實現程式碼如下:
```java
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
/**
* 通過封裝 Netty 來解決粘包
*/
public class NettyExample {
// 定義伺服器的埠號
static final int PORT = 8007;
/**
* 伺服器端
*/
static class MyNettyServer {
public static void main(String[] args) {
// 建立一個執行緒組,用來負責接收客戶端連線
EventLoopGroup bossGroup = new NioEventLoopGroup();
// 建立另一個執行緒組,用來負責 I/O 的讀寫
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
// 建立一個 Server 例項(可理解為 Netty 的入門類)
ServerBootstrap b = new ServerBootstrap();
// 將兩個執行緒池設定到 Server 例項
b.group(bossGroup, workerGroup)
// 設定 Netty 通道的型別為 NioServerSocket(非阻塞 I/O Socket 伺服器)
.channel(NioServerSocketChannel.class)
// 設定建立連線之後的執行器(ServerInitializer 是我建立的一個自定義類)
.childHandler(new NettyExample.ServerInitializer());
// 繫結埠並且進行同步
ChannelFuture future = b.bind(PORT).sync();
// 對關閉通道進行監聽
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 資源關閉
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
/**
* 服務端通道初始化
*/
static class ServerInitializer extends ChannelInitializer {
// 字串編碼器和解碼器
private static final StringDecoder DECODER = new StringDecoder();
private static final StringEncoder ENCODER = new StringEncoder();
// 伺服器端連線之後的執行器(自定義的類)
private static final NettyExample.ServerHandler SERVER_HANDLER = new NettyExample.ServerHandler();
/**
* 初始化通道的具體執行方法
*/
@Override
public void initChannel(SocketChannel ch) {
// 通道 Channel 設定
ChannelPipeline pipeline = ch.pipeline();
// 訊息解碼:讀取訊息頭和訊息體
pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4));
// 訊息編碼:將訊息封裝為訊息頭和訊息體,在響應位元組資料前面新增訊息體長度
pipeline.addLast(new LengthFieldPrepender(4));
// 設定(字串)編碼器和解碼器
pipeline.addLast(DECODER);
pipeline.addLast(ENCODER);
// 伺服器端連線之後的執行器,接收到訊息之後的業務處理
pipeline.addLast(SERVER_HANDLER);
}
}
/**
* 伺服器端接收到訊息之後的業務處理類
*/
static class ServerHandler extends SimpleChannelInboundHandler {
/**
* 讀取到客戶端的訊息
*/
@Override
public void channelRead0(ChannelHandlerContext ctx, String request) {
if (!request.isEmpty()) {
System.out.println("接到客戶端的訊息:" + request);
}
}
/**
* 資料讀取完畢
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
/**
* 異常處理,列印異常並關閉通道
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
/**
* 客戶端
*/
static class MyNettyClient {
public static void main(String[] args) {
// 建立事件迴圈執行緒組(客戶端的執行緒組只有一個)
EventLoopGroup group = new NioEventLoopGroup();
try {
// Netty 客戶端啟動物件
Bootstrap b = new Bootstrap();
// 設定啟動引數
b.group(group)
// 設定通道型別
.channel(NioSocketChannel.class)
// 設定啟動執行器(負責啟動事件的業務執行,ClientInitializer 為自定義的類)
.handler(new NettyExample.ClientInitializer());
// 連線伺服器端並同步通道
Channel ch = b.connect("127.0.0.1", PORT).sync().channel();
// 傳送訊息
ChannelFuture lastWriteFuture = null;
// 給伺服器端傳送 10 條訊息
for (int i = 0; i < 10; i++) {
// 傳送給伺服器訊息
lastWriteFuture = ch.writeAndFlush("Hi,Java.\n");
}
// 在關閉通道之前,同步重新整理所有的訊息
if (lastWriteFuture != null) {
lastWriteFuture.sync();
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 釋放資源
group.shutdownGracefully();
}
}
}
/**
* 客戶端通道初始化類
*/
static class ClientInitializer extends ChannelInitializer {
// 字串編碼器和解碼器
private static final StringDecoder DECODER = new StringDecoder();
private static final StringEncoder ENCODER = new StringEncoder();
// 客戶端連線成功之後業務處理
private static final NettyExample.ClientHandler CLIENT_HANDLER = new NettyExample.ClientHandler();
/**
* 初始化客戶端通道
*/
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
// 訊息解碼:讀取訊息頭和訊息體
pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4));
// 訊息編碼:將訊息封裝為訊息頭和訊息體,在響應位元組資料前面新增訊息體長度
pipeline.addLast(new LengthFieldPrepender(4));
// 設定(字串)編碼器和解碼器
pipeline.addLast(DECODER);
pipeline.addLast(ENCODER);
// 客戶端連線成功之後的業務處理
pipeline.addLast(CLIENT_HANDLER);
}
}
/**
* 客戶端連線成功之後的業務處理
*/
static class ClientHandler extends SimpleChannelInboundHandler {
/**
* 讀取到伺服器端的訊息
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) {
System.err.println("接到伺服器的訊息:" + msg);
}
/**
* 異常處理,列印異常並關閉通道
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
}
```
以上程式的執行結果為:
![image.png](https://cdn.nlark.com/yuque/0/2021/png/92791/1610078880035-219d2a25-91b2-476d-b099-194afdb54170.png#align=left&display=inline&height=299&margin=%5Bobject%20Object%5D&name=image.png&originHeight=598&originWidth=554&size=78570&status=done&style=none&width=277)
# 四、總結
本文提供了傳統 Socket 通訊將訊息分為訊息頭和訊息體的具體程式碼實現,然而傳統的 Socket 在效能和複用性上表現一般,為了更加高效的實現通訊,我們可以使用 Netty 框架來替代傳統的 Socket 和 NIO 程式設計,但 Netty 在使用時依然會出現粘包的問題,於是我們提供了兩種最常見的解決方案:通過分隔符或將封裝訊息的解決方案,其中最後一種解決方案的使用更加廣泛。
#### 參考 & 鳴謝
《Netty 核心原理剖析與 RPC 實踐》