1. 程式人生 > >Netty - 粘包分包以及自定義資料包協議

Netty - 粘包分包以及自定義資料包協議

1.粘包和分包

這裡簡單介紹一下粘包和分包的概念,比如我們需要傳遞這串資料give me a coffee give me a tea,最後接收到的資料可能是give me a coffeegive me a tea(粘包現象),也可能是give me
a coffeegive me a tea(分包現象),造成這些現象的原因的主要就是一點:沒有一個穩定的資料結構。
我們可以通過一些簡單的方法去避免這些問題,比如給資料新增分隔符:
give me a coffee|give me a tea|
但是當資料中本身存在這些分隔符的時候,也會造成錯誤地分割,這裡我常用的是利用資料長度+資料的方式去避免這些問題的(16give me a coffee13give me a tea),有些同學可能會說資料中也有數字不是會造成一樣的結果嗎?若我們已經讀取到後面需要讀取的資料的位數,就不會再去考慮後面固定長度資料內部內容,其實已經避免了這個問題的發生。

Server.java
package com.pk.server;

import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.jboss.netty.handler.codec.string.StringDecoder; import org.jboss.netty.handler.codec.string.StringEncoder; import java.net.InetSocketAddress; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * 粘包分包 * @author hzk * @date 2018/10/22 */ public class Server
{ public static void main(String[] args){ //服務類 ServerBootstrap serverBootstrap = new ServerBootstrap(); //boos執行緒監聽埠 worker執行緒負責資料讀寫 ExecutorService boss = Executors.newCachedThreadPool(); ExecutorService worker = Executors.newCachedThreadPool(); //設定NioSocket工廠 serverBootstrap.setFactory(new NioServerSocketChannelFactory(boss,worker)); //設定管道工廠 serverBootstrap.setPipelineFactory(new ChannelPipelineFactory() { @Override public ChannelPipeline getPipeline() throws Exception { ChannelPipeline channelPipeline = Channels.pipeline(); //channelPipeline.addLast("decoder",new MyDecoderHandler()); channelPipeline.addLast("decoder",new StringDecoder()); channelPipeline.addLast("encoder",new StringEncoder()); channelPipeline.addLast("handlerOne",new MyHandlerOne()); return channelPipeline; } }); serverBootstrap.bind(new InetSocketAddress(8888)); System.out.println("Server Start..."); } }
MyHandlerOne.java
package com.pk.server;

import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
import org.jboss.netty.channel.UpstreamMessageEvent;

/**
 * 粘包分包
 * @author hzk
 * @date 2018/10/22
 */
public class MyHandlerOne extends SimpleChannelHandler{

    private int count = 1;
    @Override
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
        System.out.println(e.getMessage()+":"+(count++));
    }
}

Client.java
package com.pk.client;

import java.io.IOException;
import java.net.Socket;
import java.nio.ByteBuffer;

/**
 * @author hzk
 * @date 2018/10/22
 */
public class Client {
    
    public static void main(String[] args) throws IOException {
        Socket socket = new Socket("127.0.0.1", 8888);
        String msg = "cliengggg";
        byte[] bytes = msg.getBytes();
        //由於要先快取byte陣列長度 需要一個int型別 所以需要加上4
        ByteBuffer allocate = ByteBuffer.allocate(4+bytes.length);
        allocate.putInt(bytes.length);
        allocate.put(bytes);

        byte[] array = allocate.array();
        for(int i =1;i<1000;i++){
            socket.getOutputStream().write(array);
        }
        socket.close();
    }
}

Server執行結果:
Server Start...
   	cliengggg   	cliengggg   	cliengggg   	cliengggg   	cliengggg   	cliengggg   	cliengggg   	cliengggg   	cliengggg   	cliengggg   	cliengggg   	cliengggg   	cliengggg   	cliengggg   	cliengggg   	clieng:1
ggg   	cliengggg   	cliengggg   	cliengggg   	cliengggg   	cliengggg   	cliengggg   	cliengggg   	cliengggg   	cliengggg   	cliengggg   	cliengggg   	cliengggg   	cliengggg   	cliengggg   	cliengggg   	cliengggg   	cliengggg   	cliengggg   	cliengggg   	cliengggg   	cliengggg   	cliengggg   	cliengggg   	cliengggg   	cliengggg   	cliengggg   	cliengggg   	cliengggg   	cliengggg   	cliengggg   	cliengggg   	cliengggg   	cliengggg   	cliengggg   	cliengggg   	 	cliengggg   	cliengggg   	cliengggg   	cliengggg   	cliengggg   	cliengggg   	cliengggg   	cliengggg   	cliengggg   	cliengggg   	cliengggg   	cliengggg   	cliengggg   	cliengggg   	:2
cliengggg   	cliengggg   	cliengggg   	cliengggg   	cliengggg   	cliengggg   	cliengggg   	cliengggg   	cliengggg   		cli	cliengggg   	cliengggg   	cliengggg   	cliengggg   	cliengggg   	cliengggg   	cliengggg   	cliengggg   	cliengggg   	cliengggg   	cliengggg   	cliengggg   	cliengggg   	cliengg:3
gg   	cliengggg   	cliengggg   	cliengggg   	cliengggg   	cliengggg   	cliengggg   	cliengggg   	cliengggg   	cliengggg   	cliengggg   	cliengggg   	cliengggg   	cliengggg   	cliengggg   	cliengggg   	cliengggg   	cliengggg   		cliengggg   	cliengggg   	cliengggg   	cliengggg   	cliengggg   	cliengggg:4
   	cliengggg   	cliengggg   	cliengggg   	cliengggg   	cliengggg   	cliengggg   	cliengggg   	cliengggg   	cliengggg   	cliengggg   	cliengggg   	cliengggg   	cliengggg:5
   	cliengggg   	cliengggg   	cliengggg:6

這裡可以很明顯看出來粘包分包產生的結果, FrameDecoder 這個decoder可以幫助解決粘包分包問題。

MyDecoderHandler.java
package com.pk.server;

import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.handler.codec.frame.FrameDecoder;

/**
 * @author hzk
 * @date 2018/10/22
 */
public class MyDecoderHandler extends FrameDecoder {
    @Override
    protected Object decode(ChannelHandlerContext channelHandlerContext, Channel channel, ChannelBuffer channelBuffer) throws Exception {
        if(channelBuffer.readableBytes()>4){
            
            if(channelBuffer.readableBytes() >2048){
                channelBuffer.skipBytes(channelBuffer.readableBytes());
            }

            //標記
            channelBuffer.markReaderIndex();
            //長度
            int length = channelBuffer.readInt();

            if(channelBuffer.readableBytes() < length){
                channelBuffer.resetReaderIndex();
                //資料包不完整,快取當前剩餘的buffer資料,等待接下來的資料包
                return null;
            }

            //讀資料
            byte[] bytes = new byte[length];
            channelBuffer.readBytes(bytes);
            //傳遞
            return new String(bytes);
        }

        return null;
    }
}

Server執行結果:
Server Start...
cliengggg:1
cliengggg:2
cliengggg:3
cliengggg:4
cliengggg:5
cliengggg:6
cliengggg:7
cliengggg:8
cliengggg:9
cliengggg:10
cliengggg:11
cliengggg:12
cliengggg:13
cliengggg:14

我們將Sever中配置的decoder換成我們自定義的解碼器,可以看出來解決了我們的分包粘包問題。

2.自定義資料包協議

在自定義資料包協議之前,我們需要先了解幾個點。
Q:訊息如何在管道中流轉,當前的一個handler如何往下面的一個handler傳遞一個物件?
A:一個管道中會有多個handler,handler往下傳遞物件的方法是sendUpstream(event)。
Q:為什麼FrameDecoder return的物件就是往下傳遞的物件?
A:其實就是呼叫了sendUpstream方法。
Q:buffer裡面資料未被讀取完怎麼辦?為什麼return null就可以快取buffer?
A:都是由於cumulation快取。FrameDecoder裡面的cumulation其實就是一個快取的buffer物件。
Q:socket攻擊是什麼?
A:把長度定義的很大,這種資料包,通常被稱為socket攻擊,位元組流式攻擊。

我們這裡給出一個示例去確定handler是否通過sendUpstream傳遞物件的。

Server.java
package com.pipeline;

import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;

import java.net.InetSocketAddress;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * pipeline 服務端
 * @author hzk
 * @date 2018/10/22
 */
public class Server {
    
    public static void main(String[] args){
        //服務類
        ServerBootstrap serverBootstrap = new ServerBootstrap();

        //boos執行緒監聽埠 worker執行緒負責資料讀寫
        ExecutorService boss = Executors.newCachedThreadPool();
        ExecutorService worker = Executors.newCachedThreadPool();

        //設定NioSocket工廠
        serverBootstrap.setFactory(new NioServerSocketChannelFactory(boss,worker));

        //設定管道工廠
        serverBootstrap.setPipelineFactory(new ChannelPipelineFactory() {
            @Override
            public ChannelPipeline getPipeline() throws Exception {
                ChannelPipeline channelPipeline = Channels.pipeline();
                channelPipeline.addLast("handlerOne",new MyHandlerOne());
                channelPipeline.addLast("handlerTwo",new MyHandlerTwo());
                return channelPipeline;
            }
        });

        serverBootstrap.bind(new InetSocketAddress(8888));
        System.out.println("Server Start...");
    }
}

MyHandlerOne .java
package com.pipeline;

import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.*;

/**
 * @author hzk
 * @date 2018/10/22
 */
public class MyHandlerOne extends SimpleChannelHandler{

    @Override
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
        ChannelBuffer channelBuffer = (ChannelBuffer) e.getMessage();
        byte[] array = channelBuffer.array();
        String msg = new String(array);
        System.out.println("Handler One:"+msg);

        //傳遞
        ctx.sendUpstream(new UpstreamMessageEvent(ctx.getChannel(),"abc",e.getRemoteAddress()));
        ctx.sendUpstream(new UpstreamMessageEvent(ctx.getChannel(),"efg",e.getRemoteAddress()));
    }
}

MyHandlerTwo .java
package com.pipeline;

import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
import org.jboss.netty.channel.UpstreamMessageEvent;

/**
 * @author hzk
 * @date 2018/10/22
 */
public class MyHandlerTwo extends SimpleChannelHandler{

    @Override
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
        String message = (String) e.getMessage();
        System.out.println("Handler Two:"+message);
    }
}

Server執行結果:
Server Start...
Handler One:clientgogogo
Handler Two:abc
Handler Two:efg

通過這個示例我們可以確定handler是通過方法sendUpstream往下傳遞的,那麼瞭解完了這些以後,我們自己自定義了一個數據包協議。
包頭(int-4)+模組號(short-2)+命令號(short-2)+資料長度(int-4)+資料
我們自定義採取的資料結構由這幾部分組成,這裡貼出專案結構以及程式碼。

專案結構

在這裡插入圖片描述

Client.java
package com.ithzk.client;

import com.ithzk.coder.RequestEncoder;
import com.ithzk.coder.ResponseDecoder;
import com.ithzk.constans.Constants;
import com.ithzk.model.Request;
import com.ithzk.module.customspass.request.FightRequest;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.*;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;

import java.net.InetSocketAddress;
import java.util.Scanner;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * Netty客戶端
 * @author hzk
 * @date 2018/10/8
 */
public class Client {

    public static void main(String[] args) throws InterruptedException {
        //服務類
        ClientBootstrap clientBootstrap = new ClientBootstrap();

        //boss監聽埠,worker執行緒負責資料讀寫
        ExecutorService boss = Executors.newCachedThreadPool();
        ExecutorService worker = Executors.newCachedThreadPool();

        //設定socket工廠
        clientBootstrap.setFactory(new NioClientSocketChannelFactory());

        //設定管道工廠
        clientBootstrap.setPipelineFactory(new ChannelPipelineFactory() {
            @Override
            public ChannelPipeline getPipeline() throws Exception {
                //RequestEncoder -> ResponseDecoder ->ClientHandler
                ChannelPipeline pipeline = Channels.pipeline();
                pipeline.addLast("decoder",new ResponseDecoder());
                pipeline.addLast("encoder",new RequestEncoder());
                pipeline.addLast("clientHandler",new ClientHandler());
                return pipeline;
            }
        });

        //連線服務端
        ChannelFuture channelFuture = clientBootstrap.connect(new InetSocketAddress(Constants.AbstractNettyConfig.ADDRESS, Constants.AbstractNettyConfig.PORT));
        Channel channel = channelFuture.sync().getChannel();

        System.out.println("Client Start...");

        Scanner scanner = new Scanner(System.in);
        while(true){
            System.out.println("Please input:");
            int fightId = Integer.parseInt(scanner.nextLine());
            int count = Integer.parseInt(scanner.nextLine());

            FightRequest fightRequest = new FightRequest(fightId,count);
            Request request = new Request(Constants.AbstractModule.ONE,Constants.AbstractCmd.ONE,fightRequest.getBytes());
            //傳送請求
            channel.write(request);
        }
    }
}

ClientHandler.java
package com.ithzk.client;

import com.ithzk.constans.Constants;
import com.ithzk.model.Response;
import com.ithzk.module.customspass.response.FightResponse;
import org.jboss.netty.channel.*;

/**
 * 訊息接收處理類
 * @author hzk
 * @date 2018/10/8
 */
public class ClientHandler extends SimpleChannelHandler{

    /**
     * 接收訊息
     * @param ctx
     * @param e
     * @throws Exception
     */
    @Override
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
        Response response = (Response) e.getMessage();
        if(Constants.AbstractModule.ONE == response.getModule()){
            if(Constants.AbstractCmd.ONE == response.getCmd()){
                FightResponse fightResponse = new FightResponse();
                fightResponse.readFromBytes(response.getData());

                System.out.println("ClientHandler->messageReceived:"+fightResponse);
            }
        }else if(Constants.AbstractModule.TWO == response.getModule()){
            System.out.println("CMD:"+Constants.AbstractCmd.TWO);
        }

    }

    /**
     * 捕獲異常
     * @param ctx
     * @param e
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
        System.out.println("exceptionCaught");