用SpringBoot整合Netty開發一個基於WebSocket的聊天室
阿新 • • 發佈:2018-12-14
前言
基於SpringBoot,藉助Netty控制長連結,使用WebSocket協議做一個實時的聊天室。
專案效果
專案統一登入路徑: http://localhost:8080/chat/netty 使用者名稱隨機生成,離線呼叫非同步方法,資料寫操作,登入顯示歷史聊天訊息
GitHub
專案名:InChat 專案地址:https://github.com/UncleCatMy... 專案介紹:基於Netty4與SpringBoot,聊天室WebSocket(文字圖片)加API呼叫Netty長連結執行傳送訊息(線上數、使用者列表)、Iot物聯網-MQTT協議、TCP/IP協議微控制器通訊,非同步儲存聊天資料
程式碼實操講解
隨機命名工具類
public class RandomNameUtil {
private static Random ran = new Random();
private final static int delta = 0x9fa5 - 0x4e00 + 1;
public static char getName(){
return (char)(0x4e00 + ran.nextInt(delta));
}
}
配置檔案yml
spring: datasource: driver-class-name: com.mysql.jdbc.Driver username: root password: root url: jdbc:mysql://localhost:3306/nettychat?characterEncoding=utf-8&useSSL=false jpa: show-sql: true netty: port: 8090 #監聽埠 bossThread: 2 #執行緒數 workerThread: 2 #執行緒數 keepalive: true #保持連線 backlog: 100
資料庫準備
SET FOREIGN_KEY_CHECKS=0; -- ---------------------------- -- Table structure for user_msg -- ---------------------------- DROP TABLE IF EXISTS `user_msg`; CREATE TABLE `user_msg` ( `id` int(11) NOT NULL AUTO_INCREMENT, `name` varchar(255) DEFAULT NULL, `msg` varchar(255) DEFAULT NULL, `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=19 DEFAULT CHARSET=utf8mb4; -- ---------------------------- -- Records of user_msg -- ---------------------------- INSERT INTO `user_msg` VALUES ('1', '亪', '今天不開心', '2018-08-14 14:26:02', '2018-08-14 14:26:02'); INSERT INTO `user_msg` VALUES ('2', '祐', '不錯呀', '2018-08-14 15:09:40', '2018-08-14 15:09:40'); INSERT INTO `user_msg` VALUES ('3', '搈', '開心 開心', '2018-08-14 15:09:40', '2018-08-14 15:09:40'); INSERT INTO `user_msg` VALUES ('4', '凶', '可以的,後面再做個深入一點的', '2018-08-14 15:18:35', '2018-08-14 15:18:35'); INSERT INTO `user_msg` VALUES ('5', '倎', '開源這個專案', '2018-08-14 15:18:35', '2018-08-14 15:18:35'); INSERT INTO `user_msg` VALUES ('6', '蝡', '1-someting', '2018-08-14 15:24:28', '2018-08-14 15:24:28'); INSERT INTO `user_msg` VALUES ('7', '弔', '不行呀', '2018-08-14 15:24:29', '2018-08-14 15:24:29'); INSERT INTO `user_msg` VALUES ('8', '習', '可以的', '2018-08-14 15:26:03', '2018-08-14 15:26:03'); INSERT INTO `user_msg` VALUES ('9', '蔫', '開源這個專案', '2018-08-14 15:26:03', '2018-08-14 15:26:03');
dataObject與JPA資料DAO
@Data
@Entity
@DynamicUpdate
public class UserMsg implements Serializable {
private static final long serialVersionUID = 4133316147283239759L;
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Integer id;
private String name;
private String msg;
private Date createTime;
private Date updateTime;
}
public interface UserMsgRepository extends JpaRepository<UserMsg,Integer> {
//本次未使用到自定義方法,JPA原生即可
}
NoSQL模擬環境
我沒有去配置虛擬機器環境,就本地模擬了
儲存使用者名稱稱與連結隨機ID
@Component
public class LikeRedisTemplate {
private Map<Object,Object> RedisMap = new ConcurrentHashMap<>();
public void save(Object id,Object name){
RedisMap.put(id,name);
}
public void delete(Object id){
RedisMap.remove(id);
}
public Object get(Object id){
return RedisMap.get(id);
}
}
聊天內容臨時儲存
@Component
public class LikeSomeCacheTemplate {
private Set<UserMsg> SomeCache = new LinkedHashSet<>();
public void save(Object user,Object msg){
UserMsg userMsg = new UserMsg();
userMsg.setName(String.valueOf(user));
userMsg.setMsg(String.valueOf(msg));
SomeCache.add(userMsg);
}
public Set<UserMsg> cloneCacheMap(){
return SomeCache;
}
public void clearCacheMap(){
SomeCache.clear();
}
}
非同步任務處理
@Component
public class MsgAsyncTesk {
@Autowired
private LikeSomeCacheTemplate cacheTemplate;
@Autowired
private UserMsgRepository userMsgRepository;
@Async
public Future<Boolean> saveChatMsgTask() throws Exception{
// System.out.println("啟動非同步任務");
Set<UserMsg> set = cacheTemplate.cloneCacheMap();
for (UserMsg item:set){
//儲存使用者訊息
userMsgRepository.save(item);
}
//清空臨時快取
cacheTemplate.clearCacheMap();
return new AsyncResult<>(true);
}
}
netty核心
配置類
@Data
@Component
@ConfigurationProperties(prefix = "netty")
public class NettyAccountConfig {
private int port;
private int bossThread;
private int workerThread;
private boolean keepalive;
private int backlog;
}
核心訊息處理類
@Component
@Qualifier("textWebSocketFrameHandler")
@ChannelHandler.Sharable
public class TextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame>{
public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
@Autowired
private LikeRedisTemplate redisTemplate;
@Autowired
private LikeSomeCacheTemplate cacheTemplate;
@Autowired
private MsgAsyncTesk msgAsyncTesk;
@Override
protected void channelRead0(ChannelHandlerContext ctx,
TextWebSocketFrame msg) throws Exception {
Channel incoming = ctx.channel();
String uName = String.valueOf(redisTemplate.get(incoming.id()));
for (Channel channel : channels) {
//將當前每個聊天內容進行儲存
System.out.println("儲存資料:"+uName+"-"+msg.text());
cacheTemplate.save(uName,msg.text());
if (channel != incoming){
channel.writeAndFlush(new TextWebSocketFrame("[" + uName + "]" + msg.text()));
} else {
channel.writeAndFlush(new TextWebSocketFrame("[you]" + msg.text() ));
}
}
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
System.out.println(ctx.channel().remoteAddress());
String uName = String.valueOf(RandomNameUtil.getName()); //用來獲取一個隨機的使用者名稱,可以用其他方式代替
//新使用者接入
Channel incoming = ctx.channel();
for (Channel channel : channels) {
channel.writeAndFlush(new TextWebSocketFrame("[新使用者] - " + uName + " 加入"));
}
redisTemplate.save(incoming.id(),uName); //儲存使用者
channels.add(ctx.channel());
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
Channel incoming = ctx.channel();
String uName = String.valueOf(redisTemplate.get(incoming.id()));
//使用者離開
for (Channel channel : channels) {
channel.writeAndFlush(new TextWebSocketFrame("[使用者] - " + uName + " 離開"));
}
redisTemplate.delete(incoming.id()); //刪除使用者
channels.remove(ctx.channel());
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
Channel incoming = ctx.channel();
System.out.println("使用者:"+redisTemplate.get(incoming.id())+"線上");
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
Channel incoming = ctx.channel();
System.out.println("使用者:"+redisTemplate.get(incoming.id())+"掉線");
msgAsyncTesk.saveChatMsgTask();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
Channel incoming = ctx.channel();
System.out.println("使用者:" + redisTemplate.get(incoming.id()) + "異常");
cause.printStackTrace();
ctx.close();
}
}
定義Initializer
@Component
@Qualifier("somethingChannelInitializer")
public class NettyWebSocketChannelInitializer extends ChannelInitializer<SocketChannel> {
@Autowired
private TextWebSocketFrameHandler textWebSocketFrameHandler;
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new HttpServerCodec());
pipeline.addLast(new HttpObjectAggregator(65536));
pipeline.addLast(new ChunkedWriteHandler());
pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
pipeline.addLast(textWebSocketFrameHandler); //這裡不能使用new,不然在handler中不能注入依賴
}
}
啟動建立Netty基本元件
@Component
public class NettyConfig {
@Autowired
private NettyAccountConfig nettyAccountConfig;
@Bean(name = "bossGroup", destroyMethod = "shutdownGracefully")
public NioEventLoopGroup bossGroup(){
return new NioEventLoopGroup(nettyAccountConfig.getBossThread());
}
@Bean(name = "workerGroup", destroyMethod = "shutdownGracefully")
public NioEventLoopGroup workerGroup(){
return new NioEventLoopGroup(nettyAccountConfig.getWorkerThread());
}
@Bean(name = "tcpSocketAddress")
public InetSocketAddress tcpPost(){
return new InetSocketAddress(nettyAccountConfig.getPort());
}
@Bean(name = "tcpChannelOptions")
public Map<ChannelOption<?>, Object> tcpChannelOptions(){
Map<ChannelOption<?>, Object> options = new HashMap<ChannelOption<?>, Object>();
options.put(ChannelOption.SO_KEEPALIVE, nettyAccountConfig.isKeepalive());
options.put(ChannelOption.SO_BACKLOG, nettyAccountConfig.getBacklog());
return options;
}
@Autowired
@Qualifier("somethingChannelInitializer")
private NettyWebSocketChannelInitializer nettyWebSocketChannelInitializer;
@Bean(name = "serverBootstrap")
public ServerBootstrap bootstrap(){
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup(), workerGroup())
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.DEBUG))
.childHandler(nettyWebSocketChannelInitializer);
Map<ChannelOption<?>, Object> tcpChannelOptions = tcpChannelOptions();
Set<ChannelOption<?>> keySet = tcpChannelOptions.keySet();
for (@SuppressWarnings("rawtypes") ChannelOption option : keySet) {
b.option(option, tcpChannelOptions.get(option));
}
return b;
}
}
服務啟動協助類
@Data
@Component
public class TCPServer {
@Autowired
@Qualifier("serverBootstrap")
private ServerBootstrap serverBootstrap;
@Autowired
@Qualifier("tcpSocketAddress")
private InetSocketAddress tcpPort;
private Channel serverChannel;
public void start() throws Exception {
serverChannel = serverBootstrap.bind(tcpPort).sync().channel().closeFuture().sync().channel();
}
@PreDestroy
public void stop() throws Exception {
serverChannel.close();
serverChannel.parent().close();
}
}
專案啟動
@SpringBootApplication
@EnableScheduling //啟動非同步任務
public class NettychatApplication {
public static void main(String[] args) throws Exception{
ConfigurableApplicationContext context = SpringApplication.run(NettychatApplication.class, args);
//注入NettyConfig 獲取對應Bean
NettyConfig nettyConfig = context.getBean(NettyConfig.class);
//注入TCPServer 獲取對應Bean
TCPServer tcpServer = context.getBean(TCPServer.class);
//啟動websocket的服務
tcpServer.start();
}
}
GitHub
專案名:InChat 專案地址:https://github.com/UncleCatMy... 專案介紹:基於Netty4與SpringBoot,聊天室WebSocket(文字圖片)加API呼叫Netty長連結執行傳送訊息(線上數、使用者列表)、Iot物聯網-MQTT協議、TCP/IP協議微控制器通訊,非同步儲存聊天資料
最後文末送波福利。現在加群即可獲取Java工程化、高效能及分散式、高效能、高架構。效能調優、Spring,MyBatis,Netty原始碼分析和大資料等多個知識點高階進階乾貨的直播免費學習許可權及領取相關資料,群號:835638062 點選連結加入群聊【Java高階架構】:https://jq.qq.com/?_wv=1027&k=5S3kL3v