用netty 實現IM聊天(一)
阿新 • • 發佈:2021-07-16
封裝的訊息體
package com.example.netty.im.common.message;
import lombok.Data;
/**
* @Class MsgBody
* @Description 訊息體
* @Author
* @Date 2021/7/14
**/
@Data
public class MsgBody {
//傳送人名稱
private String sendUserName;
private String msg;
}
NettyServerHandler
package com.example.netty.im.common.handler; import com.alibaba.fastjson.JSONObject; import com.example.netty.im.common.message.MsgBody; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelId; import io.netty.channel.SimpleChannelInboundHandler; import java.io.UnsupportedEncodingException; import java.util.HashMap; import java.util.Map; /** * @Class NettyServerHandler * 問題1:服務端為何沒有轉發到目的客戶端,而是隻返回給傳送訊息的客戶端, * 原因是服務端回覆訊息時,將訊息轉發給了自己,沒有轉發給其他客戶端 * @Author * @Date 2021/7/14 **/ public class NettyServerHandler extends SimpleChannelInboundHandler { // 儲存id和容器的關係 private static Map<String, ChannelHandlerContext> map = new HashMap<>(); @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object o) throws Exception { Channel channel = channelHandlerContext.channel(); ChannelId channelId = channel.id(); map.put(channelId.toString(), channelHandlerContext); ByteBuf byteBuf = (ByteBuf) o; String rev = getMessage(byteBuf); MsgBody msgBody = JSONObject.parseObject(rev, MsgBody.class); String format = String.format("伺服器接收到客戶端訊息,傳送人:%s, 傳送訊息:%s .", msgBody.getSendUserName(), msgBody.getMsg()); System.out.println(format); map.forEach((k, v) -> { try { // 出現問題1的原因:遇到不是自身的客戶端過濾掉了,但實際上返回訊息是需要自身的 if (channelId.toString().equals(k)) { return; } MsgBody sendMsgBody = new MsgBody(); sendMsgBody.setSendUserName(msgBody.getSendUserName()); sendMsgBody.setMsg(msgBody.getMsg()); v.writeAndFlush(getSendByteBuf(JSONObject.toJSONString(sendMsgBody))); System.out.println("服務端回覆訊息: " + JSONObject.toJSONString(sendMsgBody)); } catch (Exception e) { e.printStackTrace(); } }); } /** * 從ByteBuf 中獲取資訊,使用UTF-8編碼返回 * * @param buf * @return */ private String getMessage(ByteBuf buf) { byte[] con = new byte[buf.readableBytes()]; buf.readBytes(con); try { return new String(con, "UTF8"); } catch (UnsupportedEncodingException e) { e.printStackTrace(); return null; } } private ByteBuf getSendByteBuf(String message) { byte[] req = message.getBytes(); ByteBuf pingMessage = Unpooled.buffer(); pingMessage.writeBytes(req); return pingMessage; } }
NettyClientHandler
package com.example.netty.im.common.handler; //import cn.hutool.json.JSONObject; import com.alibaba.fastjson.JSONObject; import com.example.netty.im.common.message.MsgBody; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import java.io.UnsupportedEncodingException; /** * @Class NettyClientHandler * @Description TODO * @Author * @Date 2021/7/16 **/ public class NettyClientHandler extends SimpleChannelInboundHandler { private ByteBuf firstMessage; private ChannelHandlerContext ctx; private String userName; public String getUserName() { return userName; } public void setUserName(String userName) { this.userName = userName; } public void sendMsg(String str) { byte[] data = str.getBytes(); firstMessage = Unpooled.buffer(); firstMessage.writeBytes(data); ctx.writeAndFlush(firstMessage); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { this.ctx = ctx; MsgBody msgBody = new MsgBody(); msgBody.setSendUserName(userName); msgBody.setMsg("進入聊天室"); byte[] data = JSONObject.toJSONString(msgBody).getBytes(); firstMessage = Unpooled.buffer(); firstMessage.writeBytes(data); ctx.writeAndFlush(firstMessage); } @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object o) throws Exception { ByteBuf buf = (ByteBuf) o; String rev = getMessage(buf); MsgBody msgBody = JSONObject.parseObject(rev, MsgBody.class); String format = String.format("客戶端收到服務端的訊息,傳送人:%s , 傳送訊息:%s .", msgBody.getSendUserName(), msgBody.getMsg()); System.out.println(format); } private String getMessage(ByteBuf buf) { byte[] conn = new byte[buf.readableBytes()]; buf.readBytes(conn); try { return new String(conn, "UTF8"); } catch (UnsupportedEncodingException e) { e.printStackTrace(); return null; } } }
NettyServer
package com.example.netty.im.server; import com.example.netty.im.common.handler.NettyServerHandler; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; /** * @Class NettyServer * @Description netty 的服務端 * @Author * @Date 2021/7/14 **/ public class NettyServer { private int port; public NettyServer(int port) { this.port = port; bind(); } private void bind() { EventLoopGroup parentEventLoopGroup = new NioEventLoopGroup(); EventLoopGroup childEventLoopGroup = new NioEventLoopGroup(); try { // 服務端引導類 ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(parentEventLoopGroup, childEventLoopGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) // 連線數 .option(ChannelOption.TCP_NODELAY, true) // 不延遲,訊息立即傳送 .childOption(ChannelOption.SO_KEEPALIVE, true) // 長連線 .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline channelPipeline = socketChannel.pipeline(); NettyServerHandler serverHandler = new NettyServerHandler(); // 新增NettyServerHandler,用來處理 channelPipeline.addLast(serverHandler); } }); ChannelFuture channelFuture = serverBootstrap.bind(port).sync(); if (channelFuture.isSuccess()) { System.out.println(" 啟動Netty 服務成功,埠號:" + this.port); } // 關閉連線 channelFuture.channel().closeFuture().sync(); } catch (Exception e) { System.out.println("啟動Netty服務異常,異常資訊: " + e.getMessage()); e.printStackTrace(); } finally { if (parentEventLoopGroup != null) { parentEventLoopGroup.shutdownGracefully(); } if (childEventLoopGroup != null) { childEventLoopGroup.shutdownGracefully(); } } } public static void main(String[] args) { final int port = 10086; new NettyServer(port); } }
NettyClient
package com.example.netty.im.client;
import com.alibaba.fastjson.JSONObject;
import com.example.netty.im.common.handler.NettyClientHandler;
import com.example.netty.im.common.message.MsgBody;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.util.Scanner;
/**
* @Class NettyClient
* @Description TODO
* @Author
* @Date 2021/7/16
**/
public class NettyClient {
private NettyClientHandler nettyClientHandler;
private int port;
private String host;
private String sendUserName;
public NettyClient(int port, String host, String sendUserName) throws InterruptedException {
this.port = port;
this.host = host;
this.sendUserName = sendUserName;
start();
}
private void start() throws InterruptedException {
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.group(eventLoopGroup)
.remoteAddress(host, port)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel)
throws Exception {
nettyClientHandler = new NettyClientHandler();
nettyClientHandler.setUserName(sendUserName);
socketChannel.pipeline().addLast(nettyClientHandler);
}
});
ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
if (channelFuture.isSuccess()) {
new Thread(new Runnable() {
@Override
public void run() {
Scanner sc = new Scanner(System.in);
while (sc.hasNext()) {
MsgBody msgBody = new MsgBody();
msgBody.setSendUserName(sendUserName);
msgBody.setMsg(sc.next());
nettyClientHandler.sendMsg(JSONObject.toJSONString(msgBody));
}
}
}).start();
System.err.println(sendUserName + "連線伺服器成功");
}
channelFuture.channel().closeFuture().sync();
} finally {
eventLoopGroup.shutdownGracefully();
}
}
}
三個模擬客戶端
package com.example.netty.im.client;
/**
* @Class MockClient01
* @Description TODO
* @Author
* @Date 2021/7/16
**/
public class MockClient01 {
public static void main(String[] args) {
try {
new NettyClient(10086, "localhost", "Tom");
}catch (InterruptedException e){
e.printStackTrace();
}
}
}
package com.example.netty.im.client;
/**
* @Class MockClient02
* @Description TODO
* @Author
* @Date 2021/7/16
**/
public class MockClient02 {
public static void main(String[] args) {
try {
new NettyClient(10086, "localhost", "Jerry");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
package com.example.netty.im.client;
/**
* @Class MockClient03
* @Description TODO
* @Author
* @Date 2021/7/16
**/
public class MockClient03 {
public static void main(String[] args) {
try {
new NettyClient(10086, "localhost", "Jack");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
啟動服務端和3個模擬客戶端