1. 程式人生 > >【netty這點事兒】ByteBuf 的使用模式

【netty這點事兒】ByteBuf 的使用模式

hello listening apache -c oev direct eap 因此 ati

堆緩沖區

最常用的 ByteBuf 模式是將數據存儲在 JVM 的堆空間中。 這種模式被稱為支撐數組
(backing array), 它能在沒有使用池化的情況下提供快速的分配和釋放。

直接緩沖區

直接緩沖區的內容將駐留在常規的會被垃圾回收的堆之外。直接緩沖區對於網絡數據傳輸是理想的選擇。因為如果你的數據包含在一個在堆上分配的緩沖區中,那麽事實上,在通過套接字發送它之前,JVM將會在內部把你的緩沖區復制到一個直接緩沖區中。
直接緩沖區的主要缺點是,相對於基於堆的緩沖區,它們的分配和釋放都較為昂貴。所以該類型內存只在收發數據包內存使用效率較高,在netty應用層中不合適。

復合緩沖區

讓我們考慮一下一個由兩部分——頭部和主體——組成的將通過 HTTP 協議傳輸的消息。這兩部分由應用程序的不同模塊產生, 將會在消息被發送的時候組裝。該應用程序可以選擇為多個消息重用相同的消息主體。當這種情況發生時,對於每個消息都將會創建一個新的頭部。
因為我們不想為每個消息都重新分配這兩個緩沖區,所以使用 CompositeByteBuf 是一個
完美的選擇。
需要註意的是, Netty使用了CompositeByteBuf來優化套接字的I/O操作,盡可能地消除了
由JDK的緩沖區實現所導致的性能以及內存使用率的懲罰。這種優化發生在Netty的核心代碼中,因此不會被暴露出來,但是你應該知道它所帶來的影響。

ByteBuf 的分配方式

池化的分配 PooledByteBufAllocator是ByteBufAllocator的默認方式

可以通過 Channel(每個都可以有一個不同的 ByteBufAllocator 實例)或者綁定到
ChannelHandler 的 ChannelHandlerContext 獲取一個到 ByteBufAllocator 的引用。
池化了ByteBuf的實例以提高性能並最大限度地減少內存碎片。此實現使用了一種稱為jemalloc的已被大量現代操作系統所采用的高效方法來分配內存。
該方式在netty中是默認方式。

非池化的分配 UnpooledByteBufAllocator

可能某些情況下,你未能獲取一個到 ByteBufAllocator 的引用。對於這種情況,Netty 提供了一個簡單的稱為 Unpooled 的工具類, 它提供了靜態的輔助方法來創建未池化的 ByteBuf實例。

依托http進行性能測試

netty http 代碼

HttpServer.java
package http.server;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.http.HttpRequestDecoder; import io.netty.handler.codec.http.HttpResponseEncoder; public class HttpServer { private static Log log = LogFactory.getLog(HttpServer.class); public void start(int port) throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { // server端發送的是httpResponse,所以要使用HttpResponseEncoder進行編碼 ch.pipeline().addLast(new HttpResponseEncoder()); // server端接收到的是httpRequest,所以要使用HttpRequestDecoder進行解碼 ch.pipeline().addLast(new HttpRequestDecoder()); ch.pipeline().addLast(new HttpServerInboundHandler()); } }).option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true); ChannelFuture f = b.bind(port).sync(); f.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } public static void main(String[] args) throws Exception { HttpServer server = new HttpServer(); log.info("Http Server listening on 5656 ..."); server.start(5656); } }

HttpServerInboundHandler.java

package http.server;

import static io.netty.handler.codec.http.HttpResponseStatus.OK;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpVersion;

public class HttpServerInboundHandler extends ChannelInboundHandlerAdapter {
    private static Log log = LogFactory.getLog(HttpServerInboundHandler.class);

    // private HttpRequest request;

    // static ByteBuf buf = Unpooled.wrappedBuffer("hello world".getBytes());
    byte[] bs = "hello world".getBytes();

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // if (msg instanceof HttpRequest) {
        // request = (HttpRequest) msg;
        //
        // String uri = request.uri();
        // // System.out.println("Uri:" + uri);
        // }
        // if (msg instanceof HttpContent) {
        // HttpContent content = (HttpContent) msg;
        // ByteBuf buf = content.content();
        // // System.out.println(buf.toString(io.netty.util.CharsetUtil.UTF_8));
        // buf.release();

        // String res = "hello world.";
        // ByteBuf buf = Unpooled.wrappedBuffer(bs);
        // ByteBuf buf = Unpooled.directBuffer();
        // ByteBuf buf = Unpooled.buffer();
        // ByteBuf buf = ctx.alloc().heapBuffer();// 池化堆內存
        // ByteBuf buf = ctx.alloc().directBuffer(); // 池化直接內存
        // ByteBuf buf = Unpooled.buffer();// 非池化堆內存
        ByteBuf buf = Unpooled.directBuffer();// 非池化堆內存
        buf.writeBytes(bs);
        FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, OK, buf);

        // response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain");
        response.headers().set(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
        /*
         * if (HttpHeaders.isKeepAlive(request)) { response.headers().set(CONNECTION, Values.KEEP_ALIVE); }
         */
        ctx.write(response);
        // ctx.flush();
        // }
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        log.error(cause.getMessage());
        ctx.close();
    }

}

池化堆內存 ctx.alloc().heapBuffer()

Running 20s test @ http://127.0.0.1:5656/
  4 threads and 100 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency     3.76ms    9.31ms 180.34ms   92.96%
    Req/Sec   138.20k    43.16k  210.22k    66.50%
  10957832 requests in 20.09s, 522.51MB read
Requests/sec: 545332.08
Transfer/sec:     26.00MB
real    0m20.104s
user    0m10.441s
sys 0m44.703s

池化直接內存 ctx.alloc().directBuffer()

Running 20s test @ http://127.0.0.1:5656/
  4 threads and 100 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency     4.47ms    9.99ms 149.70ms   91.76%
    Req/Sec   138.51k    41.31k  209.94k    63.38%
  10981466 requests in 20.09s, 523.64MB read
Requests/sec: 546684.37
Transfer/sec:     26.07MB
real    0m20.098s
user    0m10.890s
sys 0m45.081s

非池化堆內存 Unpooled.buffer()

Running 20s test @ http://127.0.0.1:5656/
  4 threads and 100 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency     4.00ms    8.72ms 150.05ms   91.52%
    Req/Sec   138.84k    42.05k  209.72k    63.81%
  11017442 requests in 20.09s, 525.35MB read
Requests/sec: 548379.99
Transfer/sec:     26.15MB
real    0m20.101s
user    0m10.639s
sys 0m45.191s

非池化直接內存 Unpooled.directBuffer()

Running 20s test @ http://127.0.0.1:5656/
  4 threads and 100 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency     3.64ms    9.36ms 156.79ms   92.71%
    Req/Sec   124.55k    33.90k  191.90k    71.61%
  9890536 requests in 20.07s, 471.62MB read
Requests/sec: 492854.62
Transfer/sec:     23.50MB
real    0m20.076s
user    0m9.774s
sys 0m41.801s

【netty這點事兒】ByteBuf 的使用模式