Netty 系列九(支持UDP協議).
一、基礎知識
UDP 協議相較於 TCP 協議的特點:
1、無連接協議,沒有持久化連接;
2、每個 UDP 數據報都是一個單獨的傳輸單元;
3、一定的數據報丟失;
4、沒有重傳機制,也不管數據報是否可達;
5、速度比TCP快很多,可用來高效處理大量數據 —— 犧牲了握手以及消息管理機制。
6、常用於音頻、視頻場景,可以忍受一定的數據包丟失,追求速度上的提升。
TCP 協議采用的是一種叫做單播的傳輸形式,UDP 協議提供了向多個接收者發送消息的額外傳輸形式(多播、廣播):
單播(TCP 和 UDP):發送消息給一個由唯一的地址所標識的單一的網絡目的地。
多播(UDP):傳輸給一個預定義的主機組。
二、功能說明
廣播方:打開一個文件,通過 UDP 使用特殊的受限廣播地址或者零網絡地址 255.255.255.255,把每一行作為一個消息廣播到一個指定的端口。
接收方:通過 UDP 廣播,只需簡單地通過在指定的端口上啟動一個監聽程序,便可以創建一個事件監視器來接收日誌消息。所有的在該 UDP 端口上監聽的事件監聽器都將會接收到廣播信息。
三、實現
下圖展示了怎麽將我們的 文件數據 廣播為 UDP消息:所有的將要被傳輸的數據都被封裝在了 LogEvent 消息中。 LogEventBroadcaster 將把這些寫入到 Channel 中,並通過 ChannelPipeline 發送它們,在那裏它們將會被轉換(編碼)為 DatagramPacket 消息。最後,他們都將通過 UDP 被廣播,並由遠程節點(監視器)所捕獲。
Netty 中支持 UDP 協議主要通過以下相關類:
DatagramPacket:使用 ByteBuf 作為數據源,是 UDP 協議傳輸的消息容器。
DatagramChannel:擴展了 Netty 的 Channel 抽象以支持 UDP 的多播組管理,它的實現類 NioDatagramChannnel 用來和遠程節點通信。
Bootstrap:UDP 協議的引導類,使用 bind() 方法綁定 Channel。
public class LogEvent { public static final byte SEPARATOR = ‘:‘;文件實體類 LogEvent.java/** * IP套接字地址(IP地址+端口號) */ private final InetSocketAddress inetSocketAddress; /** * 文件名 */ private final String logfile; /** * 消息內容 */ private final String msg; private final long received; /** * 用於傳入消息的構造函數 * * @param inetSocketAddress * @param logfile * @param msg * @param received */ public LogEvent(InetSocketAddress inetSocketAddress, String logfile, String msg, long received) { this.inetSocketAddress = inetSocketAddress; this.logfile = logfile; this.msg = msg; this.received = received; } /** * 用於傳出消息的構造函數 * * @param logfile * @param msg */ public LogEvent(String logfile, String msg) { this(null, logfile, msg, -1); } public InetSocketAddress getInetSocketAddress() { return inetSocketAddress; } public String getLogfile() { return logfile; } public String getMsg() { return msg; } public long getReceived() { return received; } }
public class LogEventEncoder extends MessageToMessageEncoder<LogEvent> { private final InetSocketAddress remoteAddress; public LogEventEncoder(InetSocketAddress remoteAddress) { this.remoteAddress = remoteAddress; } @Override protected void encode(ChannelHandlerContext ctx, LogEvent msg, List<Object> out) throws Exception { byte[] file = msg.getLogfile().getBytes(CharsetUtil.UTF_8); byte[] content = msg.getMsg().getBytes(CharsetUtil.UTF_8); ByteBuf byteBuf = ctx.alloc().buffer(file.length + content.length + 1); byteBuf.writeBytes(file); byteBuf.writeByte(LogEvent.SEPARATOR); byteBuf.writeBytes(content); out.add(new DatagramPacket(byteBuf, remoteAddress)); } }編碼器 LogEventEncoder.java
該編碼器實現了將 LogEvent 實體類內容轉換為 DatagramPacket UDP數據報。
public class LogEventBroadcaster { private final EventLoopGroup group; private final Bootstrap bootstrap; private final File file; public LogEventBroadcaster(InetSocketAddress address, File file) { group = new NioEventLoopGroup(); bootstrap = new Bootstrap(); bootstrap.group(group) //引導該 NioDatagramChannel(無連接的) .channel(NioDatagramChannel.class) // 設置 SO_BROADCAST 套接字選項 .option(ChannelOption.SO_BROADCAST, true) .handler(new LogEventEncoder(address)); this.file = file; } public void run() throws InterruptedException, IOException { //綁定 Channel,UDP 協議的連接用 bind() 方法 Channel channel = bootstrap.bind(0).sync().channel(); long pointer = 0; //長輪詢 監聽是否有新的日誌文件生成 while (true) { long length = file.length(); if (length < pointer) { // 如果有必要,將文件指針設置到該文件的最後一個字節 pointer = length; } else { RandomAccessFile raf = new RandomAccessFile(file, "r"); // 確保當前的文件指針,以確保沒有任何的舊數據被發送 raf.seek(pointer); String line; while ((line = raf.readLine()) != null) { //對於每個日誌條目,寫入一個 LogEvent 到 Channel 中,最後加入一個換行符號 channel.writeAndFlush(new LogEvent(file.getAbsolutePath(), line + System.getProperty("line.separator"))); } pointer = raf.getFilePointer(); raf.close(); } try { // 休眠一秒,如果被中斷,則退出循環,否則重新處理它 Thread.sleep(1000); } catch (InterruptedException e) { while (!Thread.interrupted()) { break; } } } } public void stop() { group.shutdownGracefully(); } public static void main(String[] args) throws IOException, InterruptedException { InetSocketAddress socketAddress = new InetSocketAddress("255.255.255.255", 8888); File file = new File("E:\\2018-09-12.log"); LogEventBroadcaster logEventBroadcaster = new LogEventBroadcaster(socketAddress, file); try { logEventBroadcaster.run(); } finally { logEventBroadcaster.stop(); } } }
現在,我們來測試一下這個 UDP 廣播類,首先我們需要一個工具 nmap ,用它來監聽 UDP 的 8888 端口,以接收我們廣播的日誌文件。下載地址: https://nmap.org/dist/nmap-7.70-win32.zip
下載完成後,命令行進入安裝目錄,執行命令:ncat.exe -l -u -p 8888 ,監聽 UDP 端口。
當然,也可以自己寫個測試類監聽 UDP 端口,打印日誌查看。這裏我沒有用 Netty 寫監聽類,直接用了 java 原生的 DatagramSocket 和 DatagramPacket 寫的監聽類,如下:
public class UDPServer { public static void main(String[] args) { DatagramSocket server = null; try { server = new DatagramSocket(8888); byte[] datas = new byte[1024]; //用一個字節數組接收UDP包,字節數組在傳遞給構造函數時是空的 while (true) { DatagramPacket datagramPacket = new DatagramPacket(datas, datas.length); server.receive(datagramPacket); System.out.println(new String(datas)); } } catch (SocketException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } finally { server.close(); } } }UDPServer.java
基於 Netty 的監聽類實現可以參考我上傳 GitHub 上的源代碼。
參考資料:《Netty IN ACTION》
演示源代碼:https://github.com/JMCuixy/NettyDemo/tree/master/src/main/java/org/netty/demo/udp
Netty 系列九(支持UDP協議).