Netty零拷貝技術在RocketMQ中的實踐
阿新 • • 發佈:2021-07-29
零拷貝技術
實現零拷貝有2種方式實現
1 mmap+write
系統呼叫函式會直接把核心緩衝區裡的資料「對映」到使用者空間,這樣,作業系統核心與使用者空間就不需要再進行任何的資料拷貝操作。
public static void mappedByteBufferTest() { try (RandomAccessFile randomAccessFile = new RandomAccessFile("netty/src/main/resources/1.txt", "rw");) { final FileChannel channel = randomAccessFile.getChannel(); /** * 引數1 FileChannel.MapMode.READ_WRITE 讀寫模式 * 引數2 0: 可以直接修改的起始位置 * 引數3 5: 是對映到記憶體的大小,即將1.txt的多少個位元組對映到記憶體 * 實際型別:directByteBuffer */ final MappedByteBuffer map = channel.map(FileChannel.MapMode.READ_WRITE, 0, 5); map.put(0, (byte) 'H'); map.put(3, (byte) '9'); } catch (IOException e) { e.printStackTrace(); } }
2 sendfile
可以直接把核心緩衝區裡的資料拷貝到 socket 緩衝區裡,不再拷貝到使用者態。
FileChannel型別 public abstract long transferTo(long position, long count, WritableByteChannel target)throws IOException;
netty零拷貝實現
CompositeByteBuf
- 指在 Java 之上(user space)允許 CompositeByteBuf 使用單個 ByteBuf 一樣操作多個 ByteBuf 而不需要任何 copy。
- 以及允許使用
slice
package netty.zerocopy; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import java.nio.charset.StandardCharsets; /** * netty 零拷貝 * @author huangyichun * @date 2020/12/9 */ public class CompositeDemo { public static void main(String[] args) { ByteBuf buf1 = Unpooled.copiedBuffer("hello, world", StandardCharsets.UTF_8); ByteBuf buf2 = Unpooled.copiedBuffer("let's go", StandardCharsets.UTF_8); ByteBuf compositeBuf = Unpooled.wrappedBuffer(buf1, buf2); compositeBuf.setBytes(1, "my name".getBytes()); System.out.println(buf1.toString(StandardCharsets.UTF_8)); System.out.println(buf2.toString(StandardCharsets.UTF_8)); System.out.println(compositeBuf.toString(StandardCharsets.UTF_8)); } }
FileRegion
如果你所在的系統支援 zero copy,則可以使用FileRegion來寫入 Channel,實際是就是呼叫上文Nio零拷貝中的transferTo方法進行傳輸。
public void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { RandomAccessFile raf = null; long length = -1; try { raf = new RandomAccessFile(msg, "r"); length = raf.length(); } catch (Exception e) { ctx.writeAndFlush("ERR: " + e.getClass().getSimpleName() + ": " + e.getMessage() + '\n'); return; } finally { if (length < 0 && raf != null) { raf.close(); } } ctx.write("OK: " + raf.length() + '\n'); if (ctx.pipeline().get(SslHandler.class) == null) { // SSL not enabled - can use zero-copy file transfer. ctx.write(new DefaultFileRegion(raf.getChannel(), 0, length)); } else { // SSL enabled - cannot use zero-copy file transfer. ctx.write(new ChunkedFile(raf)); } ctx.writeAndFlush("\n"); }
RocketMQ使用FileRegion實現零拷貝
在broker的拉取訊息處理器PullMessageProcessor中,如果不使用堆記憶體,則使用Netty提供的零拷貝方案:
FileRegion fileRegion = new ManyMessageTransfer(response.encodeHeader(getMessageResult.getBufferTotalSize()), getMessageResult);
channel.writeAndFlush(fileRegion).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { getMessageResult.release(); if (!future.isSuccess()) { log.error("transfer many message by pagecache failed, {}", channel.remoteAddress(), future.cause()); } } });
建立ManyMessageTransfer extends AbstractReferenceCounted implements FileRegion ,具體看一下transferTo方法
@Override public long transferTo(WritableByteChannel target, long position) throws IOException { if (this.byteBufferHeader.hasRemaining()) { transferred += target.write(this.byteBufferHeader); return transferred; } else { List<ByteBuffer> messageBufferList = this.getMessageResult.getMessageBufferList(); for (ByteBuffer bb : messageBufferList) { if (bb.hasRemaining()) { transferred += target.write(bb); return transferred; } } } return 0; }
實際上,將多個緩衝區直接寫入到socketchannel裡面,避免了在記憶體中copy到一個緩衝區。
Ref:https://www.jianshu.com/p/3f9f56235d49