netty實現UDP多播
阿新 • • 發佈:2018-11-10
廣播方
/** * 類說明:廣播 */ public class LogEventBroadcaster { private final EventLoopGroup group; private final Bootstrap bootstrap; public LogEventBroadcaster(InetSocketAddress remoteAddress) { group = new NioEventLoopGroup(); bootstrap = new Bootstrap(); //引導該 NioDatagramChannel(無連線的) bootstrap.group(group).channel(NioDatagramChannel.class) //設定 SO_BROADCAST 套接字選項 .option(ChannelOption.SO_BROADCAST,true) .handler(new LogEventEncoder(remoteAddress)); } public void run() throws Exception { //繫結 Channel Channel ch = bootstrap.bind(0).sync().channel(); long count = 0; //啟動主處理迴圈,模擬日誌傳送 for (;;) { ch.writeAndFlush(new LogMsg(null, ++count, LogConst.getLogInfo())); try { //休眠 2 秒,如果被中斷,則退出迴圈; Thread.sleep(2000); } catch (InterruptedException e) { Thread.interrupted(); break; } } } public void stop() { group.shutdownGracefully(); } public static void main(String[] args) throws Exception { //建立並啟動一個新的 UdpQuestionSide 的例項 LogEventBroadcaster broadcaster = new LogEventBroadcaster( //表明本應用傳送的報文並沒有一個確定的目的地,也就是進行廣播 new InetSocketAddress("255.255.255.255", LogConst.MONITOR_SIDE_PORT)); try { broadcaster.run(); } finally { broadcaster.stop(); } } }
編碼器
/** * 類說明:編碼,將實際的日誌實體類編碼為DatagramPacket */ public class LogEventEncoder extends MessageToMessageEncoder<LogMsg> { private final InetSocketAddress remoteAddress; //LogEventEncoder 建立了即將被髮送到指定的 InetSocketAddress // 的 DatagramPacket 訊息 public LogEventEncoder(InetSocketAddress remoteAddress) { this.remoteAddress = remoteAddress; } @Override protected void encode(ChannelHandlerContext channelHandlerContext, LogMsg logMsg, List<Object> out) throws Exception { byte[] msg = logMsg.getMsg().getBytes(CharsetUtil.UTF_8); //容量的計算:兩個long型+訊息的內容+分割符 ByteBuf buf = channelHandlerContext.alloc() .buffer(8*2 + msg.length + 1); //將傳送時間寫入到 ByteBuf中 buf.writeLong(logMsg.getTime()); //將訊息id寫入到 ByteBuf中 buf.writeLong(logMsg.getMsgId()); //新增一個 SEPARATOR buf.writeByte(LogMsg.SEPARATOR); //將日誌訊息寫入 ByteBuf中 buf.writeBytes(msg); //將一個擁有資料和目的地地址的新 DatagramPacket 新增到出站的訊息列表中 out.add(new DatagramPacket(buf, remoteAddress)); } }
接收方
/** * 類說明:接收方 */ public class LogEventMonitor { private final EventLoopGroup group; private final Bootstrap bootstrap; public LogEventMonitor(InetSocketAddress address) { group = new NioEventLoopGroup(); bootstrap = new Bootstrap(); //引導該 NioDatagramChannel bootstrap.group(group) .channel(NioDatagramChannel.class) //設定套接字選項 SO_BROADCAST .option(ChannelOption.SO_BROADCAST, true) //允許埠重用,可開啟多個接收方 .option(ChannelOption.SO_REUSEADDR,true) .handler( new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel channel) throws Exception { ChannelPipeline pipeline = channel.pipeline(); pipeline.addLast(new LogEventDecoder()); pipeline.addLast(new LogEventHandler()); } } ) .localAddress(address); } public Channel bind() { //繫結 Channel。注意,DatagramChannel 是無連線的 return bootstrap.bind().syncUninterruptibly().channel(); } public void stop() { group.shutdownGracefully(); } public static void main(String[] args) throws Exception { //構造一個新的 UdpAnswerSide並指明監聽埠 LogEventMonitor monitor = new LogEventMonitor( new InetSocketAddress(LogConst.MONITOR_SIDE_PORT)); try { //繫結本地監聽埠 Channel channel = monitor.bind(); System.out.println("UdpAnswerSide running"); channel.closeFuture().sync(); } finally { monitor.stop(); } } }
解碼器
/**
* 類說明:解碼,將DatagramPacket解碼為實際的日誌實體類
*/
public class LogEventDecoder extends MessageToMessageDecoder<DatagramPacket> {
@Override
protected void decode(ChannelHandlerContext ctx,
DatagramPacket datagramPacket, List<Object> out)
throws Exception {
//獲取對 DatagramPacket 中的資料(ByteBuf)的引用
ByteBuf data = datagramPacket.content();
//獲得傳送時間
long sendTime = data.readLong();
System.out.println("接受到"+sendTime+"傳送的訊息");
//獲得訊息的id
long msgId = data.readLong();
//獲得分隔符SEPARATOR
byte sepa = data.readByte();
//獲取讀索引的當前位置,就是分隔符的索引+1
int idx = data.readerIndex();
//提取日誌訊息,從讀索引開始,到最後為日誌的資訊
String sendMsg = data.slice(idx ,
data.readableBytes()).toString(CharsetUtil.UTF_8);
//構建一個新的 LogMsg 物件,並且將它新增到(已經解碼的訊息的)列表中
LogMsg event = new LogMsg(datagramPacket.sender(),
msgId, sendMsg);
//作為本handler的處理結果,交給後面的handler進行處理
out.add(event);
}
}
接收方處理器
/**
* 類說明:日誌的業務處理類,實際的業務處理,接受日誌資訊
*/
public class LogEventHandler
extends SimpleChannelInboundHandler<LogMsg> {
@Override
public void exceptionCaught(ChannelHandlerContext ctx,
Throwable cause) throws Exception {
//當異常發生時,列印棧跟蹤資訊,並關閉對應的 Channel
cause.printStackTrace();
ctx.close();
}
@Override
public void channelRead0(ChannelHandlerContext ctx,
LogMsg event) throws Exception {
//建立 StringBuilder,並且構建輸出的字串
StringBuilder builder = new StringBuilder();
builder.append(event.getTime());
builder.append(" [");
builder.append(event.getSource().toString());
builder.append("] :[");
builder.append(event.getMsgId());
builder.append("] :");
builder.append(event.getMsg());
//列印 LogMsg 的資料
System.out.println(builder.toString());
}
}
訊息實體
/**
* 類說明:日誌實體類
*/
public final class LogMsg {
public static final byte SEPARATOR = (byte) ':';
/*源的 InetSocketAddress*/
private final InetSocketAddress source;
/*訊息內容*/
private final String msg;
/*訊息id*/
private final long msgId;
/*訊息傳送或者接受的時間*/
private final long time;
//用於傳入訊息的建構函式
public LogMsg(String msg) {
this(null, msg,-1,System.currentTimeMillis());
}
//用於傳出訊息的建構函式
public LogMsg(InetSocketAddress source, long msgId,
String msg) {
this(source,msg,msgId,System.currentTimeMillis());
}
public LogMsg(InetSocketAddress source, String msg, long msgId, long time) {
this.source = source;
this.msg = msg;
this.msgId = msgId;
this.time = time;
}
//返回傳送 LogMsg 的源的 InetSocketAddress
public InetSocketAddress getSource() {
return source;
}
//返回訊息內容
public String getMsg() {
return msg;
}
//返回訊息id
public long getMsgId() {
return msgId;
}
//返回訊息中的時間
public long getTime() {
return time;
}
}
訊息工具類
/**
* 類說明:日誌資訊,用String陣列代替
*/
public class LogConst {
public final static int MONITOR_SIDE_PORT = 9998;
private static final String[] LOG_INFOS = {
"20180912:mark-machine:Send sms to 10001",
"20180912:lison-machine:Send email to [email protected]",
"20180912:james-machine:Happen Exception",
"20180912:peter-machine:人生不能象做菜,把所有的料都準備好了才下鍋",
"20180912:deer-machine:牽著你的手,就象左手牽右手沒感覺,但砍下去也會痛!",
"20180912:king-machine:我聽別人說這世界上有一種鳥是沒有腳的," +
"它只能一直飛呀飛呀,飛累了就在風裡面睡覺,這種鳥一輩子只能下地一次," +
"那一次就是它死亡的時候.",
"20180912:mark-machine:多年以後我有個綽號叫西毒,任何人都可以變得狠毒," +
"只要你嘗試過什麼叫妒嫉.我不介意其他人怎麼看我," +
"我只不過不想別人比我更開心.我以為有一些人永遠不會妒嫉," +
"因為他太驕傲 . 在我出道的時候,我認識了一個人," +
"因為他喜歡在東邊出沒,所以很多年以後,他有個綽號叫東邪.",
"20180912:lison-machine:做人如果沒有夢想,那和鹹魚有什麼區別",
"20180912:james-machine:恐懼讓你淪為囚犯,希望讓你重獲自由," +
"堅強的人只能救贖自己,偉大的人才能拯救別人." +
"記著,希望是件好東西,而且從沒有一樣好東西會消逝." +
"忙活,或者等死.",
"20180912:peter-machine:世界上最遠的距離不是生和死," +
"而是我站在你的面前卻不能說:我愛你",
"20180912:deer-machine:成功的含義不在於得到什麼," +
"而是在於你從那個奮鬥的起點走了多遠.",
"20180912:king-machine:一個人殺了一個人,他是殺人犯.是壞人," +
"當一個人殺了成千上萬人後,他是英雄,是大好人",
"20180912:mark-machine:世界在我掌握中,我卻掌握不住對你的感情",
"20180912:lison-machine:我害怕前面的路,但是一想到你,就有能力向前走了。",
"20180912:james-machine:早就勸你別吸菸,可是煙霧中的你是那麼的美," +
"叫我怎麼勸得下口。",
"20180912:peter-machine:如果你只做自己能力範圍之內的事情,就永遠無法進步。" +
"昨天已成為歷史,明天是未知的,而今天是上天賜予我們的禮物," +
"這就是為什麼我們把它叫做現在!",
"20180912:deer-machine:年輕的時候有賊心沒賊膽,等到了老了吧," +
"賊心賊膽都有了,可賊又沒了。",
"20180912:king-machine:別看現在鬧得歡,小心將來拉清單。"};
private final static Random r = new Random();
public static String getLogInfo(){
return LOG_INFOS[r.nextInt(LOG_INFOS.length-1)];
}
}
效果
開啟一個廣播方,3個接收方: