三、Netty伺服器與多客戶連線利用廣播方式完全理解netty的讀寫機制
阿新 • • 發佈:2018-11-04
本篇採用的編解碼是netty自帶的字串的格式的編解碼,使用者1可以控制檯發訊息(輸入完訊息要加換行符號,是一個訊息結束的分隔符,後面我會在編解碼中詳細講解)給伺服器,伺服器廣播給其他所有的客戶端,這個機制,我們遊戲開發伺服器中應用很普遍,遊戲中的廣播,遊戲中你看到其他玩家在操作。。。都是利用這個廣播機制。
案例如下:
package com.zhurong.netty.test3; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; /** * Description: * User: zhurong * Date: 2018-09-24 23:17 */ public class NettyChatServer { public static void main(String[] args) { //接收連線 EventLoopGroup bossGroup = new NioEventLoopGroup(); //連線傳送給work EventLoopGroup workerGroup = new NioEventLoopGroup(); try { System.out.println("伺服器啟動成功!"); ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class). childHandler(new NettyChatServerInitializer()); ChannelFuture channelFuture = serverBootstrap.bind(8000).sync(); channelFuture.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
package com.zhurong.netty.test3; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.DelimiterBasedFrameDecoder; import io.netty.handler.codec.Delimiters; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import io.netty.util.CharsetUtil; /** * Description: 客戶端與伺服器端連線一旦建立,這個類中方法就會被回撥 * User: zhurong * Date: 2018-09-24 21:29 */ public class NettyChatServerInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); //解碼器 pipeline.addLast(new DelimiterBasedFrameDecoder(4096, Delimiters.lineDelimiter())); pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8)); pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8)); pipeline.addLast(new NettyChatServerHandler()); } }
package com.zhurong.netty.test3; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.util.concurrent.GlobalEventExecutor; /** * 此處定義客戶端和伺服器端傳遞的是字串所以用了String * Description: * User: zhurong * Date: 2018-09-24 23:30 */ public class NettyChatServerHandler extends SimpleChannelInboundHandler<String> { private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { System.out.println(ctx.channel().remoteAddress()+"------>"+msg); //返回資料給客戶端,是一個非同步的操作 channelGroup.forEach(channel ->{ System.out.println("channel:"+channel); if(ctx.channel() != channel){ channel.writeAndFlush(ctx.channel().remoteAddress()+"傳送的訊息" + msg+"\n"); System.out.println("傳送訊息給客戶端:"+ctx.channel().remoteAddress()+",msg:"+msg+"\n"); }else { channel.writeAndFlush("是自己的訊息\n"); System.out.println("是自己的訊息:"+ msg); } }); // channelGroup.writeAndFlush("伺服器已經收到你們客戶端訊息了!"); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { System.out.println("server 推送的訊息:"+ ctx.channel().remoteAddress() + "註冊進了伺服器\n"); channelGroup.writeAndFlush("server 推送的訊息:"+ ctx.channel().remoteAddress() + "註冊進了伺服器\n"); channelGroup.add(ctx.channel()); } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { channelGroup.writeAndFlush("server 推送的訊息:"+ ctx.channel().remoteAddress() + "離開了伺服器\n"); System.out.println("server 推送的訊息:"+ ctx.channel().remoteAddress() + "離開了伺服器\n"); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { channelGroup.writeAndFlush(ctx.channel().remoteAddress() + "上線了\n"); System.out.println(ctx.channel().remoteAddress() + "上線了\n"); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { channelGroup.writeAndFlush(ctx.channel().remoteAddress() + "下線了\n"); System.out.println(ctx.channel().remoteAddress() + "下線了\n"); } }
package com.zhurong.netty.test3;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
/**
* Description:
* User: zhurong
* Date: 2018-09-24 23:37
*/
public class NettyChatClient {
public static void main(String[] args) throws InterruptedException, IOException {
EventLoopGroup eventExecutors = new NioEventLoopGroup();
try {
System.out.println("客戶端啟動成功");
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventExecutors).channel(NioSocketChannel.class).handler(new NettyChatClientInitializer());
ChannelFuture channelFuture = bootstrap.connect("localhost",8000).sync();
Channel channel = channelFuture.channel();
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in));
for(;;){
channel.writeAndFlush(bufferedReader.readLine() + "\r\n");
}
// channelFuture.channel().closeFuture().sync();
}finally {
eventExecutors.shutdownGracefully();
}
}
}
package com.zhurong.netty.test3;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;
/**
* Netstat –ano|findstr
* Description: 客戶端與伺服器端連線一旦建立,這個類中方法就會被回撥
* User: zhurong
* Date: 2018-09-24 21:29
*/
public class NettyChatClientInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//解碼器
pipeline.addLast(new DelimiterBasedFrameDecoder(4096, Delimiters.lineDelimiter()));
pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
pipeline.addLast(new NettyChatClientHandler());
}
}
package com.zhurong.netty.test3;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
/**
* Description:
* User: zhurong
* Date: 2018-09-24 22:01
*/
public class NettyChatClientHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println("收到伺服器的訊息:"+ctx.channel().remoteAddress() + msg);
// System.out.println("msg:"+ msg);
// ctx.writeAndFlush("from client "+ System.currentTimeMillis());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush("acitve客戶端第一條資訊");
}
}
現在是不是對netty又進步一熟悉了!