JAVA Netty實現聊天室+私聊功能的示例程式碼
阿新 • • 發佈:2020-08-20
功能介紹
使用Netty框架實現聊天室功能,伺服器可監控客戶端上下限狀態,訊息轉發。同時實現了點對點私聊功能。技術點我都在程式碼中做了備註,這裡不再重複寫了。希望能給想學習netty的同學一點參考。
伺服器程式碼
伺服器入口程式碼
package nio.test.netty.groupChat; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GenericFutureListener; /** * netty群聊 伺服器端 * @author zhang * */ public class NettyChatServer { private int port; public NettyChatServer(int port){ this.port = port; } //初始化 netty伺服器 private void init() throws Exception{ EventLoopGroup boss = new NioEventLoopGroup(1); EventLoopGroup work = new NioEventLoopGroup(16); try { ServerBootstrap boot = new ServerBootstrap(); boot.group(boss,work); boot.channel(NioServerSocketChannel.class);//設定boss selector建立channel使用的物件 boot.option(ChannelOption.SO_BACKLOG,128);//boss 等待連線的 佇列長度 boot.childOption(ChannelOption.SO_KEEPALIVE,true); //讓客戶端保持長期活動狀態 boot.childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { //從channel中獲取pipeline 並往裡邊新增Handler ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("encoder",new StringEncoder()); pipeline.addLast("decoder",new StringDecoder()); pipeline.addLast(new ServerMessageHandler());//自定義Handler來處理訊息 } }); System.out.println("伺服器開始啟動..."); //繫結埠 ChannelFuture channelFuture = boot.bind(port).sync(); channelFuture.addListener(new GenericFutureListener<Future<? super Void>>() { @Override public void operationComplete(Future<? super Void> future) throws Exception { if(future.isSuccess()){ System.out.println("伺服器正在啟動..."); } if(future.isDone()){ System.out.println("伺服器啟動成功...OK"); } } }); //監聽channel關閉 channelFuture.channel().closeFuture().sync(); channelFuture.addListener(new GenericFutureListener<Future<? super Void>>() { @Override public void operationComplete(Future<? super Void> future) throws Exception { if(future.isCancelled()){ System.out.println("伺服器正在關閉.."); } if(future.isCancellable()){ System.out.println("伺服器已經關閉..OK"); } } }); }finally{ boss.shutdownGracefully(); work.shutdownGracefully(); } } /** * 啟動伺服器 main 函式 * @param args * @throws Exception */ public static void main(String[] args) throws Exception { new NettyChatServer(9090).init(); } }
伺服器端訊息處理Handler
package nio.test.netty.groupChat; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.util.concurrent.GlobalEventExecutor; import java.text.SimpleDateFormat; import java.util.Date; import java.util.HashMap; import java.util.Map; /** * 自定義 伺服器端訊息處理Handler * @author zhang * */ public class ServerMessageHandler extends SimpleChannelInboundHandler<String>{ /** * 管理全域性的channel * GlobalEventExecutor.INSTANCE 全域性事件監聽器 * 一旦將channel 加入 ChannelGroup 就不要用手動去 * 管理channel的連線失效後移除操作,他會自己移除 */ private static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); /** * 為了實現私聊功能,這裡key儲存使用者的唯一標識, * 我儲存 客戶端的埠號 * 當然 這個集合也需要自己去維護 使用者的上下線 不能像 ChannelGroup那樣自己去維護 */ private static Map<String,Channel> all = new HashMap<String,Channel>(); private SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); /** * 處理收到的訊息 */ @Override protected void channelRead0(ChannelHandlerContext ctx,String msg) throws Exception { Channel channel = ctx.channel(); /** * 這裡簡單判斷 如果內容裡邊包含#那麼就是私聊 */ if(msg.contains("#")){ String id = msg.split("#")[0]; String body = msg.split("#")[1]; Channel userChannel = all.get(id); String key = channel.remoteAddress().toString().split(":")[1]; userChannel.writeAndFlush(sf.format(new Date())+"\n 【使用者】 "+key+" 說 : "+body); return; } //判斷當前訊息是不是自己傳送的 for(Channel c : channels){ String addr = c.remoteAddress().toString(); if(channel !=c){ c.writeAndFlush(sf.format(new Date())+"\n 【使用者】 "+addr+" 說 : "+msg); }else{ c.writeAndFlush(sf.format(new Date())+"\n 【自己】 "+addr+" 說 : "+msg); } } } /** * 建立連線以後第一個呼叫的方法 */ @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); String addr = channel.remoteAddress().toString(); /** * 這裡 ChannelGroup 底層封裝會遍歷給所有的channel傳送訊息 * */ channels.writeAndFlush(sf.format(new Date())+"\n 【使用者】 "+addr+" 加入聊天室 "); channels.add(channel); String key = channel.remoteAddress().toString().split(":")[1]; all.put(key,channel); } /** * channel連線狀態就緒以後呼叫 */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { String addr = ctx.channel().remoteAddress().toString(); System.out.println(sf.format(new Date())+" \n【使用者】 "+addr+" 上線 "); } /** * channel連線狀態斷開後觸發 */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { String addr = ctx.channel().remoteAddress().toString(); System.out.println(sf.format(new Date())+" \n【使用者】 "+addr+" 下線 "); //下線移除 String key = ctx.channel().remoteAddress().toString().split(":")[1]; all.remove(key); } /** * 連線發生異常時觸發 */ @Override public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause) throws Exception { //System.out.println("連線發生異常!"); ctx.close(); } /** * 斷開連線會觸發該訊息 * 同時當前channel 也會自動從ChannelGroup中被移除 */ @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); String addr = channel.remoteAddress().toString(); /** * 這裡 ChannelGroup 底層封裝會遍歷給所有的channel傳送訊息 * */ channels.writeAndFlush(sf.format(new Date())+"\n 【使用者】 "+addr+" 離開了 "); //列印 ChannelGroup中的人數 System.out.println("當前線上人數是:"+channels.size()); System.out.println("all:"+all.size()); } }
客戶端主方法程式碼
package nio.test.netty.groupChat; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GenericFutureListener; import java.util.Scanner; public class NettyChatClient { private String ip; private int port; public NettyChatClient(String ip,int port){ this.ip = ip; this.port = port; } /** * 初始化客戶 */ private void init() throws Exception{ //建立監聽事件的監聽器 EventLoopGroup work = new NioEventLoopGroup(); try { Bootstrap boot = new Bootstrap(); boot.group(work); boot.channel(NioSocketChannel.class); boot.handler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("encoder",new StringDecoder()); pipeline.addLast(new ClientMessageHandler()); } }); ChannelFuture channelFuture = boot.connect(ip,port).sync(); channelFuture.addListener(new GenericFutureListener<Future<? super Void>>() { @Override public void operationComplete(Future<? super Void> future) throws Exception { if(future.isSuccess()){ System.out.println("客戶端啟動中..."); } if(future.isDone()){ System.out.println("客戶端啟動成功...OK!"); } } }); System.out.println(channelFuture.channel().localAddress().toString()); System.out.println("#################################################"); System.out.println("~~~~~~~~~~~~~~埠號#訊息內容~~這樣可以給單獨一個使用者發訊息~~~~~~~~~~~~~~~~~~"); System.out.println("#################################################"); /** * 這裡用控制檯輸入資料 */ Channel channel = channelFuture.channel(); //獲取channel Scanner scanner = new Scanner(System.in); while(scanner.hasNextLine()){ String str = scanner.nextLine(); channel.writeAndFlush(str+"\n"); } channelFuture.channel().closeFuture().sync(); scanner.close(); } finally { work.shutdownGracefully(); } } /** * 主方法入口 * @param args * @throws Exception */ public static void main(String[] args) throws Exception{ new NettyChatClient("127.0.0.1",9090).init(); } }
客戶端訊息處理Handler
package nio.test.netty.groupChat; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; /** * 客戶點訊息處理 Handler * @author zhang * */ public class ClientMessageHandler extends SimpleChannelInboundHandler<String> { /** * 處理收到的訊息 */ @Override protected void channelRead0(ChannelHandlerContext ctx,String msg) throws Exception { System.out.println(msg); } /** * 連線異常後觸發 */ @Override public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause) throws Exception { ctx.close(); } }
測試結果
啟動了四個客戶端 伺服器端日誌效果如下:
客戶端一端日誌:
客戶端二日誌:
客戶端三日誌:
客戶端四日誌:
現在在客戶端四傳送訊息:
每個客戶端都可以收到訊息:
軟化關閉客戶端客戶端三:
伺服器日誌:
其他客戶端日誌:
傳送私聊訊息:
這個客戶端收不到訊息
到此這篇關於JAVA Netty實現聊天室+私聊功能的示例程式碼的文章就介紹到這了,更多相關JAVA Netty聊天室內容請搜尋我們以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援我們!