1. 程式人生 > >netty實現UDP多播

netty實現UDP多播

廣播方

/**
 * 類說明:廣播
 */
public class LogEventBroadcaster {
    private final EventLoopGroup group;
    private final Bootstrap bootstrap;

    public LogEventBroadcaster(InetSocketAddress remoteAddress) {
        group = new NioEventLoopGroup();
        bootstrap = new Bootstrap();
        //引導該 NioDatagramChannel(無連線的)
        bootstrap.group(group).channel(NioDatagramChannel.class)

             //設定 SO_BROADCAST 套接字選項
             .option(ChannelOption.SO_BROADCAST,true)

             .handler(new LogEventEncoder(remoteAddress));
    }

    public void run() throws Exception {
        //繫結 Channel
        Channel ch = bootstrap.bind(0).sync().channel();
        long count = 0;

        //啟動主處理迴圈,模擬日誌傳送
        for (;;) {
            ch.writeAndFlush(new LogMsg(null, ++count,
                    LogConst.getLogInfo()));
            try {
                //休眠 2 秒,如果被中斷,則退出迴圈;
                Thread.sleep(2000);

            } catch (InterruptedException e) {
                Thread.interrupted();
                break;
            }
        }
    }

    public void stop() {
        group.shutdownGracefully();
    }

    public static void main(String[] args) throws Exception {

        //建立並啟動一個新的 UdpQuestionSide 的例項
        LogEventBroadcaster broadcaster = new LogEventBroadcaster(
                //表明本應用傳送的報文並沒有一個確定的目的地,也就是進行廣播
                new InetSocketAddress("255.255.255.255",
                        LogConst.MONITOR_SIDE_PORT));
        try {
            broadcaster.run();
        }
        finally {
            broadcaster.stop();
        }
    }
}

編碼器

/**
 * 類說明:編碼,將實際的日誌實體類編碼為DatagramPacket
 */
public class LogEventEncoder extends MessageToMessageEncoder<LogMsg> {
    private final InetSocketAddress remoteAddress;

    //LogEventEncoder 建立了即將被髮送到指定的 InetSocketAddress
    // 的 DatagramPacket 訊息
    public LogEventEncoder(InetSocketAddress remoteAddress) {
        this.remoteAddress = remoteAddress;
    }

    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext,
                          LogMsg logMsg, List<Object> out) throws Exception {
        byte[] msg = logMsg.getMsg().getBytes(CharsetUtil.UTF_8);
        //容量的計算:兩個long型+訊息的內容+分割符
        ByteBuf buf = channelHandlerContext.alloc()
                .buffer(8*2 + msg.length + 1);
        //將傳送時間寫入到 ByteBuf中
        buf.writeLong(logMsg.getTime());
        //將訊息id寫入到 ByteBuf中
        buf.writeLong(logMsg.getMsgId());
        //新增一個 SEPARATOR
        buf.writeByte(LogMsg.SEPARATOR);
        //將日誌訊息寫入 ByteBuf中
        buf.writeBytes(msg);
        //將一個擁有資料和目的地地址的新 DatagramPacket 新增到出站的訊息列表中
        out.add(new DatagramPacket(buf, remoteAddress));
    }
}

接收方

/**
 * 類說明:接收方
 */
public class LogEventMonitor {
    private final EventLoopGroup group;
    private final Bootstrap bootstrap;

    public LogEventMonitor(InetSocketAddress address) {
        group = new NioEventLoopGroup();
        bootstrap = new Bootstrap();
        //引導該 NioDatagramChannel
        bootstrap.group(group)
            .channel(NioDatagramChannel.class)

            //設定套接字選項 SO_BROADCAST
            .option(ChannelOption.SO_BROADCAST, true)

            //允許埠重用,可開啟多個接收方
            .option(ChannelOption.SO_REUSEADDR,true)

            .handler( new ChannelInitializer<Channel>() {
                @Override
                protected void initChannel(Channel channel)
                    throws Exception {
                    ChannelPipeline pipeline = channel.pipeline();

                    pipeline.addLast(new LogEventDecoder());

                    pipeline.addLast(new LogEventHandler());
                }
            } )
            .localAddress(address);
    }

    public Channel bind() {
        //繫結 Channel。注意,DatagramChannel 是無連線的
        return bootstrap.bind().syncUninterruptibly().channel();
    }

    public void stop() {
        group.shutdownGracefully();
    }

    public static void main(String[] args) throws Exception {
        //構造一個新的 UdpAnswerSide並指明監聽埠
        LogEventMonitor monitor = new LogEventMonitor(
            new InetSocketAddress(LogConst.MONITOR_SIDE_PORT));
        try {
            //繫結本地監聽埠
            Channel channel = monitor.bind();
            System.out.println("UdpAnswerSide running");
            channel.closeFuture().sync();
        } finally {
            monitor.stop();
        }
    }
}

解碼器

/**
 * 類說明:解碼,將DatagramPacket解碼為實際的日誌實體類
 */
public class LogEventDecoder extends MessageToMessageDecoder<DatagramPacket> {

    @Override
    protected void decode(ChannelHandlerContext ctx,
                          DatagramPacket datagramPacket, List<Object> out)
        throws Exception {
        //獲取對 DatagramPacket 中的資料(ByteBuf)的引用
        ByteBuf data = datagramPacket.content();
        //獲得傳送時間
        long sendTime = data.readLong();
        System.out.println("接受到"+sendTime+"傳送的訊息");
        //獲得訊息的id
        long msgId = data.readLong();
        //獲得分隔符SEPARATOR
        byte sepa = data.readByte();
        //獲取讀索引的當前位置,就是分隔符的索引+1
        int idx = data.readerIndex();
        //提取日誌訊息,從讀索引開始,到最後為日誌的資訊
        String sendMsg = data.slice(idx ,
            data.readableBytes()).toString(CharsetUtil.UTF_8);
        //構建一個新的 LogMsg 物件,並且將它新增到(已經解碼的訊息的)列表中
        LogMsg event = new LogMsg(datagramPacket.sender(),
                msgId, sendMsg);
        //作為本handler的處理結果,交給後面的handler進行處理
        out.add(event);
    }
}

接收方處理器

/**
 * 類說明:日誌的業務處理類,實際的業務處理,接受日誌資訊
 */
public class LogEventHandler
    extends SimpleChannelInboundHandler<LogMsg> {

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx,
        Throwable cause) throws Exception {
        //當異常發生時,列印棧跟蹤資訊,並關閉對應的 Channel
        cause.printStackTrace();
        ctx.close();
    }

    @Override
    public void channelRead0(ChannelHandlerContext ctx,
        LogMsg event) throws Exception {
        //建立 StringBuilder,並且構建輸出的字串
        StringBuilder builder = new StringBuilder();
        builder.append(event.getTime());
        builder.append(" [");
        builder.append(event.getSource().toString());
        builder.append("] :[");
        builder.append(event.getMsgId());
        builder.append("] :");
        builder.append(event.getMsg());
        //列印 LogMsg 的資料
        System.out.println(builder.toString());
    }
}

訊息實體

/**
 * 類說明:日誌實體類
 */
public final class LogMsg {
    public static final byte SEPARATOR = (byte) ':';
    /*源的 InetSocketAddress*/
    private final InetSocketAddress source;
    /*訊息內容*/
    private final String msg;
    /*訊息id*/
    private final long msgId;
    /*訊息傳送或者接受的時間*/
    private final long time;

    //用於傳入訊息的建構函式
    public LogMsg(String msg) {
        this(null, msg,-1,System.currentTimeMillis());
    }

    //用於傳出訊息的建構函式
    public LogMsg(InetSocketAddress source, long msgId,
                  String msg) {
        this(source,msg,msgId,System.currentTimeMillis());
    }

    public LogMsg(InetSocketAddress source, String msg, long msgId, long time) {
        this.source = source;
        this.msg = msg;
        this.msgId = msgId;
        this.time = time;
    }

    //返回傳送 LogMsg 的源的 InetSocketAddress
    public InetSocketAddress getSource() {
        return source;
    }

    //返回訊息內容
    public String getMsg() {
        return msg;
    }

    //返回訊息id
    public long getMsgId() {
        return msgId;
    }

    //返回訊息中的時間
    public long getTime() {
        return time;
    }
}

訊息工具類

/**
 * 類說明:日誌資訊,用String陣列代替
 */
public class LogConst {
    public final static int MONITOR_SIDE_PORT = 9998;
    private static final String[] LOG_INFOS = {
            "20180912:mark-machine:Send sms to 10001",
            "20180912:lison-machine:Send email to [email protected]",
            "20180912:james-machine:Happen Exception",
            "20180912:peter-machine:人生不能象做菜,把所有的料都準備好了才下鍋",
            "20180912:deer-machine:牽著你的手,就象左手牽右手沒感覺,但砍下去也會痛!",
            "20180912:king-machine:我聽別人說這世界上有一種鳥是沒有腳的," +
                    "它只能一直飛呀飛呀,飛累了就在風裡面睡覺,這種鳥一輩子只能下地一次," +
                    "那一次就是它死亡的時候.",
            "20180912:mark-machine:多年以後我有個綽號叫西毒,任何人都可以變得狠毒," +
                    "只要你嘗試過什麼叫妒嫉.我不介意其他人怎麼看我," +
                    "我只不過不想別人比我更開心.我以為有一些人永遠不會妒嫉," +
                    "因為他太驕傲 . 在我出道的時候,我認識了一個人," +
                    "因為他喜歡在東邊出沒,所以很多年以後,他有個綽號叫東邪.",
            "20180912:lison-machine:做人如果沒有夢想,那和鹹魚有什麼區別",
            "20180912:james-machine:恐懼讓你淪為囚犯,希望讓你重獲自由," +
                    "堅強的人只能救贖自己,偉大的人才能拯救別人." +
                    "記著,希望是件好東西,而且從沒有一樣好東西會消逝." +
                    "忙活,或者等死.",
            "20180912:peter-machine:世界上最遠的距離不是生和死," +
                    "而是我站在你的面前卻不能說:我愛你",
            "20180912:deer-machine:成功的含義不在於得到什麼," +
                    "而是在於你從那個奮鬥的起點走了多遠.",
            "20180912:king-machine:一個人殺了一個人,他是殺人犯.是壞人," +
                    "當一個人殺了成千上萬人後,他是英雄,是大好人",
            "20180912:mark-machine:世界在我掌握中,我卻掌握不住對你的感情",
            "20180912:lison-machine:我害怕前面的路,但是一想到你,就有能力向前走了。",
            "20180912:james-machine:早就勸你別吸菸,可是煙霧中的你是那麼的美," +
                    "叫我怎麼勸得下口。",
            "20180912:peter-machine:如果你只做自己能力範圍之內的事情,就永遠無法進步。" +
                    "昨天已成為歷史,明天是未知的,而今天是上天賜予我們的禮物," +
                    "這就是為什麼我們把它叫做現在!",
            "20180912:deer-machine:年輕的時候有賊心沒賊膽,等到了老了吧," +
                    "賊心賊膽都有了,可賊又沒了。",
            "20180912:king-machine:別看現在鬧得歡,小心將來拉清單。"};

    private final static Random r = new Random();
    public static String getLogInfo(){
        return LOG_INFOS[r.nextInt(LOG_INFOS.length-1)];
    }
}

效果

開啟一個廣播方,3個接收方:
在這裡插入圖片描述
在這裡插入圖片描述
在這裡插入圖片描述