Netty-使用netty實現群聊-程式碼實現
阿新 • • 發佈:2021-02-03
1、功能介紹
和之前的NIO實現群聊機制一樣,這裡使用Netty實現:
- 1、當有新使用者上線時,通知其他使用者上線,下線同理
- 2、使用者x傳送一條訊息,伺服器端轉發給其他所有線上伺服器
2、程式碼結構
3、GroupChatServer
package groupchat;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel. socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import java.beans.Encoder;
public class GroupChatServer {
//監聽埠
private int port;
public GroupChatServer(int port){
this.port = port;
}
//編寫run方法,處理客戶端的請求
public void run() throws Exception{
//建立兩個執行緒組
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
//其中NioEventLoppGroup預設八個執行緒
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
//建立ServerBootstrap,伺服器啟動引導類
ServerBootstrap b = new ServerBootstrap();
//設定ServerBootstrap相關引數
b.group(bossGroup, workerGroup)//roup函式設定兩個EventLoop
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() {//指定相關handler
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//獲取pipeline
ChannelPipeline pipeline = ch.pipeline();
//向pipeline中加入解碼器
pipeline.addLast("decoder", new StringDecoder());
//向pipeline中加入編碼器
pipeline.addLast("encoder", new StringDecoder());
//加入自己的業務處理handler,需要自己寫!關鍵部分
pipeline.addLast(new GroupChatServerHandler());
}
});
System.out.println("Netty伺服器已啟動");
ChannelFuture channelFuture = b.bind(port).sync();
//監聽關閉事件
channelFuture.channel().closeFuture().sync();
}finally {
//關閉
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
//主函式,也需要丟擲異常
public static void main(String[] args) throws Exception{
//啟動一個伺服器
new GroupChatServer(7000);
}
}
4、GroupChatServerHandler
package groupchat;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
import java.text.SimpleDateFormat;
import java.util.SimpleTimeZone;
public class GroupChatServerHandler extends SimpleChannelInboundHandler<String> {
//定義一個channelle組,管理所有的channel
//GlobalEventExecutor.INSTANCE是全域性的時間執行器,是一個單例,幫助執行channelGroup
private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
//輸出接收到訊息的時間用
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
//當連線建立之後,第一個被執行,將當前的channel加入到channelGroup
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception{
Channel channel = ctx.channel();
//將該客戶端加入聊天的訊息 傳送給其他的線上客戶端
//writeAndFlush方法會自動遍歷所有的channel,併發送括號內訊息
//最後加入當前訊息傳送的時間
channelGroup.writeAndFlush("客戶端" + channel.remoteAddress() + "加入聊天\n"
+ sdf.format(new java.util.Date()) + "\n");
//channelGroup中加入該channel
channelGroup.add(channel);
}
//斷開連線,觸發該通知給當前線上的客戶
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
channelGroup.writeAndFlush("客戶端" + channel.remoteAddress() + "離開聊天了\n");
System.out.println("測試用:"+"channelGroup_size=" + channelGroup.size() );
}
//channel出於活動狀態,提示xxx已上線
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception{
System.out.println(ctx.channel().remoteAddress() + "上線");
}
//提示xxx離線
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception{
System.out.println(ctx.channel().remoteAddress() + "離線了");
}
//讀取資料並轉發資料給當前線上的其他客戶端
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
//獲取當前channel
//給自己傳送的訊息是不一樣的
final Channel channel = channelHandlerContext.channel();
//根據不同的情況,需要會送不同的訊息
channelGroup.forEach(ch -> {
if(channel != ch)
ch.writeAndFlush("[客戶]" + channel.remoteAddress() + "傳送了訊息:" + s + "\n");
else
ch.writeAndFlush("[自己]" + "傳送了訊息:" + s + "\n");
});
}
//發生異常時
@Override
public void exceptionCaught(ChannelHandlerContext var1, Throwable var2) throws Exception{
//關閉通道
var1.close();
}
}
5、GroupChatClient
package groupchat;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.pool.ChannelPool;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import javax.naming.event.EventContext;
import java.awt.*;
import java.util.Scanner;
public class GroupChatClient {
//主機地址
private final String host;
//埠
private final int port;
public GroupChatClient(String host, int port) {
this.host = host;
this.port = port;
}
public void run() throws Exception{
EventLoopGroup eventExecutors = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap()
.group(eventExecutors)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception{
//得到pipeline
ChannelPipeline pipeline = ch.pipeline();
//加入相關handler
pipeline.addLast("decoder",new StringDecoder());
pipeline.addLast("encoder",new StringEncoder());
//加入自定義handler
pipeline.addLast(new GroupChatServerHandler());
}
});
ChannelFuture channelFuture = bootstrap.connect(host,port).sync();
//得到當前channel
Channel channel = channelFuture.channel();
System.out.println("......" + channel.localAddress() + "......");
//客戶端需要輸入資訊,使用Scanner掃描器
Scanner scanner = new Scanner(System.in);
while(scanner.hasNextLine()){
String msg = scanner.nextLine();
//將訊息通過channel傳送給伺服器端
channel.writeAndFlush(msg + "\r\n");
}
}finally {
eventExecutors.shutdownGracefully();
}
}
//客戶端主函式
public static void main(String[] args) throws Exception {
new GroupChatClient("127.0.0.1",7000).run();
}
}
6、GroupChatClientHandler
package groupchat;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
public class GroupChatClientHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) throws Exception {
System.out.println(msg.trim());
}
}
7、測試方法
- 1)新建一個Maven工程,匯入netty依賴
- 2)按照2結構搭建程式碼
- 3)執行伺服器端GroupChatServer
- 4)執行一個客戶端GroupChatClient,再切換埠執行新的客戶端,進行測試,收發訊息/通知與否
8、遇到的問題
遇到Exception異常Exception in thread "main" io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: no further information: /127.0.0.1:53853
應該是IP或者埠設定錯了,嘗試設定一下本地計算機