1. 程式人生 > 其它 >09 netty的序列化演算法擴充套件和引數配置

09 netty的序列化演算法擴充套件和引數配置

1 序列化演算法

完整程式碼

序列化演算法要求:序列化演算法能夠實現物件 => 位元組陣列 => 位元組陣列的轉化。

  • 序列化時,需要將 Java 物件變為要傳輸的資料(可以是 byte[],或 json 等,最終都需要變成 byte[])
  • 反序列化時,需要將傳入的正文資料還原成 Java 物件,便於處理

如何實現序列化演算法的可擴充套件性======

可以定義一個序列化介面,該介面規範了序列化和反序列化方法,此外可以在該介面內部通過Enum實現序列介面,從而對外提供不同的序列化演算法

需求:實現介面能夠支援以下序列化演算法,並且能夠支援進一步擴充套件

序列化實現
JDK自帶的序列化演算法
JSON(gson)
  • 實際測試時會發現採用json進行序列化產生的byte陣列位元組數目要明顯少於JDK預設的序列化演算法

介面實現

package extension.rpcFramework.protocol;

import com.google.gson.*;

import java.io.*;
import java.lang.reflect.Type;
import java.nio.charset.StandardCharsets;

public interface Serializer {
    // 反序列化方法: 位元組陣列 => 物件
    /*
        反序列化演算法中有的需要提前知道class型別資訊,
        JDK的反序列化演算法不需要該資訊,原因在於型別資訊包含在位元組陣列中
        Class<T>是class物件的泛型
        <T>用於告訴編譯器該方法時泛型方法
     */
    <T> T deserialize(Class<T> clazz, byte[] bytes);
    // 序列化方法:  物件 => 位元組陣列
    <T> byte[] serialize(T object);

    int ordinal();

    // JDK方法實現序列化
    enum Algorithm implements Serializer{
        Java{
            @Override
            public <T> T deserialize(Class<T> clazz, byte[] bytes) {
                T message = null;
                try {
                    ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));
                    message = (T)ois.readObject();
                } catch (IOException | ClassNotFoundException e) {
                    throw new RuntimeException("反序列化錯誤",e);
                }
                return message;
            }

            @Override
            public <T> byte[] serialize(T object) {
                //objectOutputStream:把物件轉成位元組資料的輸出到檔案中儲存,物件的輸出過程稱為序列化,可實現物件的持久儲存
                ByteArrayOutputStream bos = new ByteArrayOutputStream();
                ObjectOutputStream oos = null;
                try {
                    oos = new ObjectOutputStream(bos);
                    oos.writeObject(object);
                    return bos.toByteArray();
                } catch (IOException e) {
                    throw new RuntimeException("序列化錯誤",e);
                }
            }
        },
        Json{
            @Override
            public <T> T deserialize(Class<T> clazz, byte[] bytes) {
                Gson gson = new GsonBuilder().registerTypeAdapter(Class.class,new Serializer.ClassCodec()).create();
                String json = new String(bytes,StandardCharsets.UTF_8);  // bytes陣列=>json字串
                // return new Gson().fromJson(json,clazz);
                return gson.fromJson(json,clazz);
            }

            @Override
            public <T> byte[] serialize(T object) {
                Gson gson = new GsonBuilder().registerTypeAdapter(Class.class,new Serializer.ClassCodec()).create();
                //String json = new Gson().toJson(object);      // 物件 => json字串
                String json = gson.toJson(object);
                return json.getBytes(StandardCharsets.UTF_8); // 注意:序列化和反序列化的編碼方式要保持一致
            }
        }
    }
    // 讓GSON支援String Class型別的序列化和反序列化,FastJson不存在這種問題
    class ClassCodec implements JsonSerializer<Class<?>>, JsonDeserializer<Class<?>> {
        @Override
        public Class<?> deserialize(JsonElement jsonElement, Type type, JsonDeserializationContext jsonDeserializationContext) throws JsonParseException {
            String str = jsonElement.getAsString();
            Class<?> res = null;
            try {
                res =  Class.forName(str);
            } catch (ClassNotFoundException e) {
                e.printStackTrace();
            }
            return res;
        }

        @Override
        public JsonElement serialize(Class<?> aClass, Type type, JsonSerializationContext jsonSerializationContext) {
            return new JsonPrimitive(aClass.getName());
        }
    }
}
  • 上述方法通過列舉的方式支援兩種序列化演算法
package extension.rpcFramework.protocol;

import extension.rpcFramework.config.Config;
import extension.rpcFramework.message.Message;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageCodec;
import lombok.extern.slf4j.Slf4j;
import java.util.List;

/*
    該處理的前置handler必須是LengthFieldBasedDecoder
 */
@Slf4j
@ChannelHandler.Sharable
public class MessageCodecSharable extends MessageToMessageCodec<ByteBuf, Message> {
    static byte[] magicNum = {'l','u','c','k'};
    @Override
    protected void encode(ChannelHandlerContext ctx, Message msg, List<Object> outList) {
        try{
            ByteBuf out = ctx.alloc().buffer();
            // 1位元組的協議版本
            Byte version = 1;
            // 1位元組的序列化方式: 0表示JDK,1表示json(通過配置類確定)
            
            int serialWay = algorithm.ordinal();  // 獲取該列舉類的序數寫入協議中

            // 總位元組數目 = 16(如果不是2的冪可以填充)
            out.writeBytes(magicNum);             // 4位元組的協議魔數
            out.writeByte(version);                // 1位元組的協議版本
            out.writeByte(serialWay);              // 1位元組的序列化方式: 0表示JDK,1表示json
            out.writeByte(msg.getMessageType());   //  1位元組指令型別
            out.writeInt(msg.getSequenceId());     // 4位元組序列號

            //objectOutputStream:把物件轉成位元組資料的輸出到檔案中儲存,物件的輸出過程稱為序列化,可實現物件的持久儲存
            byte[] content = algorithm.serialize(msg);
            out.writeInt(content.length);              // 寫入物件序列化的後的位元組陣列長度
            out.writeByte(0xff);                       //  填充字元,湊滿2的冪為16
            out.writeBytes(content);                   // 寫入物件序列化陣列
            outList.add(out);
        }catch (Exception e){
            e.printStackTrace();
        }

    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out){
        try{
            int magicNum = in.readInt();
            byte version = in.readByte();
            // serialType 和 messageType會在反序列化過程中確定演算法型別和class型別 !!!!
            byte serialType = in.readByte();
            byte messageType = in.readByte();
            int sequenceId = in.readInt();
            int length = in.readInt();
            byte padding = in.readByte();
            byte[] arr = new byte[length];
            in.readBytes(arr,0,length);
            Serializer algorithm = Serializer.Algorithm.values()[serialType];                  // 確定序列化演算法
            Message message = algorithm.deserialize(Message.getMessageClass(messageType),arr);
            out.add(message);
        }catch (Exception e){
            e.printStackTrace();
        }

    }
}

  • 序列化時通過Serializer algorithm = Config.getSerializerAlgorithm()這個配置類確定序列化演算法
  • 反序列化時通過自定義協議中的序列化演算法欄位確定序列化演算法

2 網路連線的部分關鍵引數

Java中SocketChannel和ServerSocketChannel的理解:服務端必須建立ServerSocketChannel後監聽埠後,客戶端才能夠請求服務端建立SocketChannel,服務端接受到客戶端的請求後,為客戶端建立socketChannel實現雙方通訊。SocketChannel可以有多種實現,Java網路通訊通常採用NioServerSocketChannel。

ServerSocketChannel的作用:監聽新的TCP連線,建立Socket Channel,沒有傳輸資料的能力。
引數 引數設定物件 備註
CONNECT_TIMEOUT_MILLIS SocketChannel,ServerSocketChannel 設定請求連線建立請求的超時時間
SO_BACKLOG ServerSocketChannel
TCP_NODELAY SocketChannel 開啟/關閉 naggle演算法,預設是開啟的
SO_SNDBUF SocketChannel TCP傳送視窗大小
SO_RCVBUF SocketChannel,ServerSocketChannal TCP接受視窗大小
ALLOCATOR SocketChannel 該分配器用於分配使用者主動需要的buffer,呼叫ctx.alloc方法分配
RCVBUF_ALLOCATOR SocketChannel 該分配器用於分配存放入站資料的buffer,分配的buffer大小能夠自適應調整,使用直接記憶體
  • SO_SNDBUF ,SO_RCVBUF通常不用配置,現代作業系統支援自動調節傳輸層緩衝區大小

2-1 連線超時時間

設定方式

需求:客戶端與服務端建立channel連線時

package extension.chatRoom.client;
import extension.chatRoom.protocol.ProtocolFrameDecoder;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class TestConnectionTimeOut {
    /*
       客戶端超時配置:
       1) 通過bootstrap.option()配置SocketChannel連線超時時間

       服務端連線超時配置
       1) new ServerBootstrap().option(ChannelOption.CONNECT_TIMEOUT_MILLIS,100)
       2) new ServerBootstrap().childOption(ChannelOption.CONNECT_TIMEOUT_MILLIS,100)
       服務端可以通過option和childOption配置超時時間,二者配置的物件不同,一個是ServerSocketChannel
       另外一個是SocketChannel
     */
    public static void main(String[] args) {
        NioEventLoopGroup g = new NioEventLoopGroup();
        LoggingHandler LOGIN_HANDLER = new LoggingHandler();
        try{
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.channel(NioSocketChannel.class);
            // 設定超時時間為300ms
            bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS,300);
            bootstrap.group(g);
            bootstrap.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(LOGIN_HANDLER);
                }
            });
            Channel channel = bootstrap.connect("localhost", 8080).sync().channel();
            channel.closeFuture().sync();
        }catch (Exception e){
            log.debug("client error",e);
        }finally {
            g.shutdownGracefully();
        }
    }
}

不啟動伺服器,執行上述程式碼,當超時時間設定為300ms,日誌輸出

16:09:38 [DEBUG] [nioEventLoopGroup-2-1] i.n.h.l.LoggingHandler - [id: 0xfa647f23] REGISTERED
16:09:38 [DEBUG] [nioEventLoopGroup-2-1] i.n.h.l.LoggingHandler - [id: 0xfa647f23] CONNECT: localhost/127.0.0.1:8080
16:09:38 [DEBUG] [nioEventLoopGroup-2-1] i.n.h.l.LoggingHandler - [id: 0xfa647f23] CLOSE
16:09:38 [DEBUG] [nioEventLoopGroup-2-1] i.n.h.l.LoggingHandler - [id: 0xfa647f23] UNREGISTERED
16:09:38 [DEBUG] [main] e.c.c.TestConnectionTimeOut - client error
io.netty.channel.ConnectTimeoutException: connection timed out: localhost/127.0.0.1:8080
	at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe$1.run(AbstractNioChannel.java:263)
	at io.netty.util.concurrent.PromiseTask$RunnableAdapter.call(PromiseTask.java:38)
	at io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:127)
	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:416)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:515)
	at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(Thread.java:748)

超時時間設定為5s

16:11:05 [DEBUG] [nioEventLoopGroup-2-1] i.n.h.l.LoggingHandler - [id: 0x99d830bc] REGISTERED
16:11:05 [DEBUG] [nioEventLoopGroup-2-1] i.n.h.l.LoggingHandler - [id: 0x99d830bc] CONNECT: localhost/127.0.0.1:8080
16:11:07 [DEBUG] [nioEventLoopGroup-2-1] i.n.h.l.LoggingHandler - [id: 0x99d830bc] CLOSE
16:11:07 [DEBUG] [nioEventLoopGroup-2-1] i.n.h.l.LoggingHandler - [id: 0x99d830bc] UNREGISTERED
16:11:07 [DEBUG] [main] e.c.c.TestConnectionTimeOut - client error
io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: no further information: localhost/127.0.0.1:8080
Caused by: java.net.ConnectException: Connection refused: no further information
	at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
	at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
	at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:327)
	at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:336)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:685)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:632)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:549)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:511)
	at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(Thread.java:748)

總結:程式碼中設定的是netty的超時時間,如果該超時時間過長,當連線無法建立時,底層也會丟擲連線異常。

超時異常原始碼

背景:客戶端中主執行緒呼叫connect方法後會發起NIO執行緒與服務端建立連線,此時主執行緒可以採用

  • 1)sync()方法同步等待連線建立結果
  • 2)不進行等待,讓NIO執行緒得到連線建立結果後呼叫主執行緒提供的回撥函式,

這裡以第1種情況為例,分析主執行緒如果獲取NIO執行緒連線超時的結果:

step1:客戶端首先執行connect方法

ChannelFuture cfu =  bootstrap.connect("localhost", 8080).sync();

step2:丟擲連線超時異常,檢視方法AbstractNioChannel.java:263行程式碼塊

io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe$1.run(AbstractNioChannel.java:263)
	at io.netty.util.concurrent.PromiseTask$RunnableAdapter.call(PromiseTask.java:38)
	at io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:127)
	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:416)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:515)
	at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(Thread.java:748)
					connectPromise = promise;
                    requestedRemoteAddress = remoteAddress;
                    // Schedule connect timeout.
                    // 獲取設定的連線超時時間
                    int connectTimeoutMillis = config().getConnectTimeoutMillis();
                    if (connectTimeoutMillis > 0) {
                        // eventLoop執行緒池提交定時任務,定時時間就是設定的超時時間
                        connectTimeoutFuture = eventLoop().schedule(new Runnable() {
                            @Override
                            public void run() {
                                ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
                                ConnectTimeoutException cause =
                                        new ConnectTimeoutException("connection timed out: " + remoteAddress);
                                if (connectPromise != null && connectPromise.tryFailure(cause)) {
                                    close(voidPromise());
                                }
                            }
                        }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
                    }

從上述原始碼中可以總結出以下資訊:

  • 當設定超時時間後,連線超時檢測是通過向EventLoop執行緒池提交定時任務實現的
eventLoop().schedule(new Runnable() ....
  • 當定時任務執行時,通過Promise容器線上程間傳遞連線超時異常
connectPromise.tryFailure(cause):tryFailure方法會mark失敗並將cause通知所有的listeners

2-2 backlog引數的意義

netty中,backlog是服務端連線建立時需要設定的引數,該引數用於設定全連線佇列的大小。

linux中的backlog

背景

  1. 第一次握手,client 傳送 SYN 到 server,狀態修改為 SYN_SEND,server 收到,狀態改變為 SYN_REVD,並將該請求放入sync queue 佇列(半連線佇列)
  2. 第二次握手,server 回覆 SYN + ACK 給 client,client 收到,狀態改變為 ESTABLISHED,併發送 ACK 給 server
  3. 第三次握手,server 收到 ACK,狀態改變為 ESTABLISHED,將該請求從 sync queue 放入 accept queue(全連線佇列)

上述方案中每個連線建立請求會先後經過兩個佇列,這兩個佇列中的請求本質上不同在於處於三次握手的不同階段,狀態不一樣。實際上我們也可以通過一個佇列維護兩個狀態的請求,現有的Linux(2.2後)是採用兩個佇列。

Linux設定方式:在linux中,可以通過指定backlog的數值設定上述佇列的大小

佇列型別 Linux設定方式 Netty設定
sync queue - 半連線佇列 通過/proc/sys/net/ipv4/tcp_max_syn_backlog來設定
accept queue - 全連線佇列 全連線佇列大小取決於backlog 和somaxconn 的最小值,也就是 min(backlog,somaxconn) ChannelOption.SO_BACKLOG設定大小
  • Linux中somaxconn 是Linux核心引數,預設128,可通過/proc/sys/net/core/somaxconn進行配置,backlog是 listen(int sockfd,int backlog)函式中的引數backlog。
  • 如果全連線佇列(accpet queue )滿了,server 傳送拒絕連線的錯誤資訊到 client

netty設定全連線佇列大小程式碼

package extension.chatRoom.client;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.List;
@Slf4j
public class TestBackLogServer {
    public static void main(String[] args) {
        List<Integer> list = new ArrayList<>();
        NioEventLoopGroup boss = new NioEventLoopGroup();
        NioEventLoopGroup worker = new NioEventLoopGroup();
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.channel(NioServerSocketChannel.class);
            // 設定netty的ServerSocketChannel的全連線佇列大小為2,
            serverBootstrap.option(ChannelOption.SO_BACKLOG,2);    
            serverBootstrap.group(boss, worker);
            serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {

                }
            });
            Channel channel = serverBootstrap.bind(8080).sync().channel();
            channel.closeFuture().sync();
        } catch (Exception e) {
            log.error("server error", e);
            e.printStackTrace();
        } finally {
            boss.shutdownGracefully();
            worker.shutdownGracefully();
        }
    }
}

netty中backlog引數的預設配置

a)netty原始碼中 abstract class ServerSocketChannel中bind方法會有backlog引數:

    public abstract ServerSocketChannel bind(SocketAddress local, int backlog)
        throws IOException;

b) 搜尋呼叫bind方法的原始碼位置

  • 可以看到nio.channels和netty.channel.socket.nio中都呼叫了bind函式

c) 檢視netty的呼叫位置

    @Override
    protected void doBind(SocketAddress localAddress) throws Exception {
        if (PlatformDependent.javaVersion() >= 7) {
            javaChannel().bind(localAddress, config.getBacklog());
        } else {
            javaChannel().socket().bind(localAddress, config.getBacklog());
        }
    }

d) 檢視預設初始化的backlog

public class DefaultServerSocketChannelConfig extends DefaultChannelConfig
                                              implements ServerSocketChannelConfig {

    protected final ServerSocket javaSocket;
    private volatile int backlog = NetUtil.SOMAXCONN;    

    @Override
    public int getBacklog() {
        return backlog;
    }
// As a SecurityManager may prevent reading the somaxconn file we wrap this in a privileged block.
        //
        // See https://github.com/netty/netty/issues/3680
        SOMAXCONN = AccessController.doPrivileged(new PrivilegedAction<Integer>() {
            @Override
            public Integer run() {
                // Determine the default somaxconn (server socket backlog) value of the platform.
                // The known defaults:
                // - Windows NT Server 4.0+: 200
                // - Linux and Mac OS X: 128
                int somaxconn = PlatformDependent.isWindows() ? 200 : 128;
                File file = new File("/proc/sys/net/core/somaxconn");
                BufferedReader in = null;
                try {
                    // file.exists() may throw a SecurityException if a SecurityManager is used, so execute it in the
                    // try / catch block.
                    // See https://github.com/netty/netty/issues/4936
                    if (file.exists()) {
                        in = new BufferedReader(new FileReader(file));
                        somaxconn = Integer.parseInt(in.readLine());
                        if (logger.isDebugEnabled()) {
                            logger.debug("{}: {}", file, somaxconn);
                        }
                    } else {
                        // Try to get from sysctl
                        Integer tmp = null;
                        if (SystemPropertyUtil.getBoolean("io.netty.net.somaxconn.trySysctl", false)) {
                            tmp = sysctlGetInt("kern.ipc.somaxconn");
                            if (tmp == null) {
                                tmp = sysctlGetInt("kern.ipc.soacceptqueue");
                                if (tmp != null) {
                                    somaxconn = tmp;
                                }
                            } else {
                                somaxconn = tmp;
                            }
                        }

                        if (tmp == null) {
                            logger.debug("Failed to get SOMAXCONN from sysctl and file {}. Default: {}", file,
                                         somaxconn);
                        }
                    }
                } catch (Exception e) {
                    logger.debug("Failed to get SOMAXCONN from sysctl and file {}. Default: {}", file, somaxconn, e);
                } finally {
                    if (in != null) {
                        try {
                            in.close();
                        } catch (Exception e) {
                            // Ignored.
                        }
                    }
                }
                return somaxconn;
            }
        });
  • 上面原始碼上規定了backlog預設配置大小

2-3 檔案描述符開啟限制

ulimit -n      // Linux伺服器啟動指令碼中設定每個程序開啟的檔案描述符數量,屬於作業系統引數

2-4 Bytebuf的配置

 serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){

                        @Override
                        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                            ByteBuf buffer = ctx.alloc().buffer();
                        }
                    });
                }
            });

需求:指定上述程式碼中分配的buffer型別是非池化的堆記憶體( ByteBuf buffer = ctx.alloc().buffer();)

配置項 VM配置專案 配置值
是否首選分配直接記憶體 -Dio.netty.allocator.type= unpooled/pooled
是否開啟記憶體池化 -Dio.netty.noPreferDirect= true/false
上述配置讓netty使用的ByteBuf為非池化並且使用堆記憶體

netty原始碼中判定是否使用直接記憶體

        ByteBufAllocator alloc;
        if ("unpooled".equals(allocType)) {
            alloc = UnpooledByteBufAllocator.DEFAULT;
            logger.debug("-Dio.netty.allocator.type: {}", allocType);
        } else if ("pooled".equals(allocType)) {
            alloc = PooledByteBufAllocator.DEFAULT;
            logger.debug("-Dio.netty.allocator.type: {}", allocType);
        } else {
            alloc = PooledByteBufAllocator.DEFAULT;
            logger.debug("-Dio.netty.allocator.type: pooled (unknown: {})", allocType);
        }

        DEFAULT_ALLOCATOR = alloc;

netty原始碼中判定是否使用記憶體池化

 // We should always prefer direct buffers by default if we can use a Cleaner to release direct buffers.
        DIRECT_BUFFER_PREFERRED = CLEANER != NOOP
                                  && !SystemPropertyUtil.getBoolean("io.netty.noPreferDirect", false);
        if (logger.isDebugEnabled()) {
            logger.debug("-Dio.netty.noPreferDirect: {}", !DIRECT_BUFFER_PREFERRED);
        }