Netty 框架學習 —— 傳輸
阿新 • • 發佈:2021-06-12
概述
流經網路的資料總是具有相同的型別:位元組,這些位元組如何傳輸主要取決於我們所說的網路傳輸。使用者並不關心傳輸的細節,只在乎位元組是否被可靠地傳送和接收
如果使用 Java 網路程式設計,你會發現,某些時候當你需要支援高併發連線,隨後你嘗試將阻塞傳輸切換為非阻塞傳輸,那麼你會因為這兩種 API 的截然不同而遇到問題。Netty 提供了一個通用的 API,這使得轉換更加簡單。
傳統的傳輸方式
這裡介紹僅使用 JDK API 來實現應用程式的阻塞(OIO)和非阻塞版本(NIO)
阻塞網路程式設計如下:
public class PlainOioServer { public void server(int port) throws IOException { // 將伺服器繫結到指定埠 final ServerSocket socket = new ServerSocket(port); try { while (true) { // 接收連線 final Socket clientSocket = socket.accept(); System.out.println("Accepted connection from " + clientSocket); // 建立一個新的執行緒來處理連線 new Thread(() -> { OutputStream out; try { out = clientSocket.getOutputStream(); // 將訊息寫給已連線的客戶端 out.write("Hi\r\n".getBytes(StandardCharsets.UTF_8)); out.flush(); // 關閉連線x clientSocket.close(); } catch (IOException e) { e.printStackTrace(); } finally { try { clientSocket.close(); } catch (IOException e) { e.printStackTrace(); } } }).start(); } } catch (IOException e) { e.printStackTrace(); } } }
這段程式碼可以處理中等數量的併發客戶端,但隨著併發連線的增多,你決定改用非同步網路程式設計,但非同步的 API 是完全不同的
非阻塞版本如下:
public class PlainNioServer { public void server(int port) throws IOException { ServerSocketChannel serverChannel = ServerSocketChannel.open(); serverChannel.configureBlocking(false); ServerSocket ssocket = serverChannel.socket(); InetSocketAddress address = new InetSocketAddress(port); // 將伺服器繫結到選定的埠 ssocket.bind(address); // 開啟 Selector 來處理 Channel Selector selector = Selector.open(); // 將 ServerSocket 註冊到 Selector 以接受連線 serverChannel.register(selector, SelectionKey.OP_ACCEPT); final ByteBuffer msg = ByteBuffer.wrap("Hi\r\n".getBytes()); while (true) { try { // 等待需要處理的新事件,阻塞將一直持續到下一個傳入事件 selector.select(); } catch (IOException e) { e.printStackTrace(); break; } Set<SelectionKey> readKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = readKeys.iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); iterator.remove(); try { // 檢查事件是否是一個新的已經就緒可以被接受的連線 if (key.isAcceptable()) { ServerSocketChannel server = (ServerSocketChannel) key.channel(); SocketChannel client = server.accept(); client.configureBlocking(false); // 接受客戶端,並將它註冊到選擇器 client.register(selector, SelectionKey.OP_WRITE | SelectionKey.OP_READ, msg.duplicate()); System.out.println("Accepted connection from " + client); } // 檢查套接字是否已經準備好寫資料 if (key.isWritable()) { SocketChannel client = (SocketChannel) key.channel(); ByteBuffer buffer = (ByteBuffer) key.attachment(); while (buffer.hasRemaining()) { // 將資料寫到已連線的客戶端 if (client.write(buffer) == 0) { break; } } client.close(); } } catch (IOException exception) { key.cancel(); try { key.channel().close(); } catch (IOException cex) { cex.printStackTrace(); } } } } } }
可以看到,阻塞和非阻塞的程式碼是截然不同的。如果為了實現非阻塞而完全重寫程式,無疑十分困難
基於 Netty 的傳輸
使用 Netty 的阻塞網路處理如下:
public class NettyOioServer { public void server(int port) throws Exception { final ByteBuf buf = Unpooled.unreleasableBuffer( Unpooled.copiedBuffer("Hi\n\r", StandardCharsets.UTF_8)); EventLoopGroup group = new OioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(group) // 使用阻塞模式 .channel(OioServerSocketChannel.class) .localAddress(new InetSocketAddress(port)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast( new SimpleChannelInboundHandler<>() { @Override protected void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception { ctx.writeAndFlush(buf.duplicate()) .addListener(ChannelFutureListener.CLOSE); } }); } }); ChannelFuture f = b.bind().sync(); f.channel().closeFuture().sync(); } finally { group.shutdownGracefully().sync(); } } }
而非阻塞版本和阻塞版本幾乎一模一樣,只需要改動兩處地方
EventLoopGroup group = new NioEventLoopGroup();
b.group(group).channel(NioServerSocketChannel.class);
傳輸 API
傳輸 API 的核心是 interface Channel,它被用於所有的 IO 操作。每個 Channel 都將被分配一個 ChannelPipeline 和 ChannelConfig,ChannelConfig 包含了該 Channel 的所有配置設定,ChannelPipeline 持有所有將應用於入站和出站資料以及事件的 ChannelHandler 例項
除了訪問所分配的 ChannelPipeline 和 ChannelConfig 之外,也可以利用 Channel 的其他方法
方法名 | 描述 |
---|---|
eventLoop | 返回分配給 Channel 的 EventLoop |
pipeline | 返回分配給 Channel 的 ChannelPipeline |
isActive | 如果 Channel 活動的,返回 true |
localAddress | 返回本地的 SocketAddress |
remoteAddress | 返回遠端的 SocketAddress |
write | 將資料寫到遠端節點 |
flush | 將之前已寫的資料沖刷到底層傳輸 |
writeAndFlush | 等同於呼叫 write() 並接著呼叫 flush() |
內建的傳輸
Netty 內建了一些可開箱即用的傳輸,但它們所支援的協議不盡相同,因此你必須選擇一個和你的應用程式所使用協議相容的傳輸
名稱 | 包 | 描述 |
---|---|---|
NIO | io.netty.channel.socket.nio | 使用 java.nio.channels 包作為基礎 |
Epoll | io.netty.channel.epoll | 由 JNI 驅動的 epoll() 和非阻塞 IO,可支援只有在 Linux 上可用的多種特性,比 NIO 傳輸更快,且完全非阻塞 |
OIO | io.netty.channel.socket.oio | 使用 java.net 包作為基礎 |
Local | io.netty.channel.local | 可以在 VM 內部通過管道進行通訊的本地傳輸 |
Embedded | io.netty.channel.embedded | Embedded 傳輸,允許使用 ChannelHandler 而不需要一個真正的基於網路的傳輸,主要用於測試 |