1. 程式人生 > >Netty 系列九(支援UDP協議).

Netty 系列九(支援UDP協議).

一、基礎知識

    UDP 協議相較於 TCP 協議的特點:

1、無連線協議,沒有持久化連線;2、每個 UDP 資料報都是一個單獨的傳輸單元;3、一定的資料報丟失;4、沒有重傳機制,也不管資料報是否可達;5、速度比TCP快很多,可用來高效處理大量資料 —— 犧牲了握手以及訊息管理機制。6、常用於音訊、視訊場景,可以忍受一定的資料包丟失,追求速度上的提升。

    TCP 協議採用的是一種叫做單播的傳輸形式,UDP 協議提供了向多個接收者傳送訊息的額外傳輸形式(多播、廣播):

單播(TCP 和 UDP):傳送訊息給一個由唯一的地址所標識的單一的網路目的地。多播(UDP):傳輸給一個預定義的主機組。廣播(UDP):傳輸到網路(或者子網)上的所有主機。

二、功能說明

    廣播方:開啟一個檔案,通過 UDP 使用特殊的受限廣播地址或者零網路地址 255.255.255.255,把每一行作為一個訊息廣播到一個指定的埠。

    接收方:通過 UDP 廣播,只需簡單地通過在指定的埠上啟動一個監聽程式,便可以建立一個事件監視器來接收日誌訊息。所有的在該 UDP 埠上監聽的事件監聽器都將會接收到廣播資訊。

三、實現

    下圖展示了怎麼將我們的 檔案資料 廣播為 UDP訊息:所有的將要被傳輸的資料都被封裝在了 LogEvent 訊息中。 LogEventBroadcaster 將把這些寫入到 Channel 中,並通過 ChannelPipeline 傳送它們,在那裡它們將會被轉換(編碼)為 DatagramPacket 訊息。最後,他們都將通過 UDP 被廣播,並由遠端節點(監視器)所捕獲。

    Netty 中支援 UDP 協議主要通過以下相關類:

DatagramPacket:使用 ByteBuf 作為資料來源,是 UDP 協議傳輸的訊息容器。

DatagramChannel:擴充套件了 Netty 的 Channel 抽象以支援 UDP 的多播組管理,它的實現類 NioDatagramChannnel 用來和遠端節點通訊。

Bootstrap:UDP 協議的引導類,使用 bind() 方法繫結 Channel。    

public class LogEvent {
    public static final byte SEPARATOR = ':';
    
/** * IP套接字地址(IP地址+埠號) */ private final InetSocketAddress inetSocketAddress; /** * 檔名 */ private final String logfile; /** * 訊息內容 */ private final String msg; private final long received; /** * 用於傳入訊息的建構函式 * * @param inetSocketAddress * @param logfile * @param msg * @param received */ public LogEvent(InetSocketAddress inetSocketAddress, String logfile, String msg, long received) { this.inetSocketAddress = inetSocketAddress; this.logfile = logfile; this.msg = msg; this.received = received; } /** * 用於傳出訊息的建構函式 * * @param logfile * @param msg */ public LogEvent(String logfile, String msg) { this(null, logfile, msg, -1); } public InetSocketAddress getInetSocketAddress() { return inetSocketAddress; } public String getLogfile() { return logfile; } public String getMsg() { return msg; } public long getReceived() { return received; } }
檔案實體類 LogEvent.java
public class LogEventEncoder extends MessageToMessageEncoder<LogEvent> {
    private final InetSocketAddress remoteAddress;

    public LogEventEncoder(InetSocketAddress remoteAddress) {
        this.remoteAddress = remoteAddress;
    }


    @Override
    protected void encode(ChannelHandlerContext ctx, LogEvent msg, List<Object> out) throws Exception {
        byte[] file = msg.getLogfile().getBytes(CharsetUtil.UTF_8);
        byte[] content = msg.getMsg().getBytes(CharsetUtil.UTF_8);
        ByteBuf byteBuf = ctx.alloc().buffer(file.length + content.length + 1);
        byteBuf.writeBytes(file);
        byteBuf.writeByte(LogEvent.SEPARATOR);
        byteBuf.writeBytes(content);
        out.add(new DatagramPacket(byteBuf, remoteAddress));
    }
}
編碼器 LogEventEncoder.java

    該編碼器實現了將 LogEvent 實體類內容轉換為 DatagramPacket UDP資料報。

public class LogEventBroadcaster {
    private final EventLoopGroup group;
    private final Bootstrap bootstrap;
    private final File file;

    public LogEventBroadcaster(InetSocketAddress address, File file) {
        group = new NioEventLoopGroup();
        bootstrap = new Bootstrap();
        bootstrap.group(group)
                //引導該 NioDatagramChannel(無連線的)
                .channel(NioDatagramChannel.class)
                // 設定 SO_BROADCAST 套接字選項
                .option(ChannelOption.SO_BROADCAST, true)
                .handler(new LogEventEncoder(address));
        this.file = file;
    }
    
    public void run() throws InterruptedException, IOException {
        //繫結 Channel,UDP 協議的連線用 bind() 方法
        Channel channel = bootstrap.bind(0).sync().channel();
        long pointer = 0;
        //長輪詢 監聽是否有新的日誌檔案生成
        while (true) {
            long length = file.length();
            if (length < pointer) {
                // 如果有必要,將檔案指標設定到該檔案的最後一個位元組
                pointer = length;
            } else {
                RandomAccessFile raf = new RandomAccessFile(file, "r");
                // 確保當前的檔案指標,以確保沒有任何的舊資料被髮送
                raf.seek(pointer);
                String line;
                while ((line = raf.readLine()) != null) {
                    //對於每個日誌條目,寫入一個 LogEvent 到 Channel 中,最後加入一個換行符號
                    channel.writeAndFlush(new LogEvent(file.getAbsolutePath(), line + System.getProperty("line.separator")));
                }
                pointer = raf.getFilePointer();
                raf.close();
            }
            try {
                // 休眠一秒,如果被中斷,則退出迴圈,否則重新處理它
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                while (!Thread.interrupted()) {
                    break;
                }
            }
        }
    }

    public void stop() {
        group.shutdownGracefully();
    }

    public static void main(String[] args) throws IOException, InterruptedException {
        InetSocketAddress socketAddress = new InetSocketAddress("255.255.255.255", 8888);
        File file = new File("E:\\2018-09-12.log");
        LogEventBroadcaster logEventBroadcaster = new LogEventBroadcaster(socketAddress, file);
        try {
            logEventBroadcaster.run();
        } finally {
            logEventBroadcaster.stop();
        }
    }
}

    現在,我們來測試一下這個 UDP 廣播類,首先我們需要一個工具 nmap ,用它來監聽 UDP 的 8888 埠,以接收我們廣播的日誌檔案。下載地址:

    下載完成後,命令列進入安裝目錄,執行命令:ncat.exe -l -u -p 8888 ,監聽 UDP 埠。

 

     當然,也可以自己寫個測試類監聽 UDP 埠,列印日誌檢視。這裡我沒有用 Netty 寫監聽類,直接用了 java 原生的 DatagramSocket 和 DatagramPacket 寫的監聽類,如下:

public class UDPServer {

    public static void main(String[] args) {
        DatagramSocket server = null;
        try {
            server = new DatagramSocket(8888);
            byte[] datas = new byte[1024];
            //用一個位元組陣列接收UDP包,位元組陣列在傳遞給建構函式時是空的
            while (true) {
                DatagramPacket datagramPacket = new DatagramPacket(datas, datas.length);
                server.receive(datagramPacket);
                System.out.println(new String(datas));
            }
        } catch (SocketException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            server.close();
        }
    }
}
UDPServer.java

    基於 Netty 的監聽類實現可以參考我上傳 GitHub 上的原始碼。

參考資料:《Netty IN ACTION》