1. 程式人生 > 程式設計 >SpringBoot+Netty+WebSocket實現訊息傳送的示例程式碼

SpringBoot+Netty+WebSocket實現訊息傳送的示例程式碼

一.匯入Netty依賴

<dependency>
   <groupId>io.netty</groupId>
   <artifactId>netty-all</artifactId>
   <version>4.1.25.Final</version>
  </dependency>

二.搭建websocket伺服器

@Component
public class WebSocketServer {

 /**
  * 主執行緒池
  */
 private EventLoopGroup bossGroup;
 /**
  * 工作執行緒池
  */
 private EventLoopGroup workerGroup;
 /**
  * 伺服器
  */
 private ServerBootstrap server;
 /**
  * 回撥
  */
 private ChannelFuture future;

 public void start() {
  future = server.bind(9001);
  System.out.println("netty server - 啟動成功");
 }

 public WebSocketServer() {
  bossGroup = new NioEventLoopGroup();
  workerGroup = new NioEventLoopGroup();

  server = new ServerBootstrap();
  server.group(bossGroup,workerGroup)
    .channel(NioServerSocketChannel.class)
    .childHandler(new WebsocketInitializer());
 }
}

三.初始化Websocket

public class WebsocketInitializer extends ChannelInitializer<SocketChannel> {
 
 @Override
 protected void initChannel(SocketChannel ch) throws Exception {
  ChannelPipeline pipeline = ch.pipeline();
  // ------------------
  // 用於支援Http協議
  // ------------------
  // websocket基於http協議,需要有http的編解碼器
  pipeline.addLast(new HttpServerCodec());
  // 對寫大資料流的支援
  pipeline.addLast(new ChunkedWriteHandler());
  // 新增對HTTP請求和響應的聚合器:只要使用Netty進行Http程式設計都需要使用
  //設定單次請求的檔案的大小
  pipeline.addLast(new HttpObjectAggregator(1024 * 64));
  //webSocket 伺服器處理的協議,用於指定給客戶端連線訪問的路由 :/ws
  pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
  // 新增Netty空閒超時檢查的支援
  // 1. 讀空閒超時(超過一定的時間會發送對應的事件訊息)
  // 2. 寫空閒超時
  // 3. 讀寫空閒超時
  pipeline.addLast(new IdleStateHandler(4,8,12));
  //新增心跳處理
  pipeline.addLast(new HearBeatHandler());
  // 新增自定義的handler
  pipeline.addLast(new ChatHandler());

 }
}

四.建立Netty監聽器

@Component
public class NettyListener implements ApplicationListener<ContextRefreshedEvent> {

 @Resource
 private WebSocketServer websocketServer;

 @Override
 public void onApplicationEvent(ContextRefreshedEvent event) {
  if(event.getApplicationContext().getParent() == null) {
   try {
    websocketServer.start();
   } catch (Exception e) {
    e.printStackTrace();
   }
  }
 }
}

五.建立訊息通道

public class UserChannelMap {
 /**
  * 使用者儲存使用者id與通道的Map物件
  */
// private static Map<String,Channel> userChannelMap;

 /* static {
  userChannelMap = new HashMap<String,Channel>();
 }*/

 /**
  * 定義一個channel組,管理所有的channel
  * GlobalEventExecutor.INSTANCE 是全域性的事件執行器,是一個單例
  */
 private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

 /**
  * 存放使用者與Chanel的對應資訊,用於給指定使用者傳送訊息
  */
 private static ConcurrentHashMap<String,Channel> userChannelMap = new ConcurrentHashMap<>();

 private UserChannelMap(){}
 /**
  * 新增使用者id與channel的關聯
  * @param userNum
  * @param channel
  */
 public static void put(String userNum,Channel channel) {
  userChannelMap.put(userNum,channel);
 }

 /**
  * 根據使用者id移除使用者id與channel的關聯
  * @param userNum
  */
 public static void remove(String userNum) {
  userChannelMap.remove(userNum);
 }

 /**
  * 根據通道id移除使用者與channel的關聯
  * @param channelId 通道的id
  */
 public static void removeByChannelId(String channelId) {
  if(!StringUtils.isNotBlank(channelId)) {
   return;
  }
  for (String s : userChannelMap.keySet()) {
   Channel channel = userChannelMap.get(s);
   if(channelId.equals(channel.id().asLongText())) {
    System.out.println("客戶端連線斷開,取消使用者" + s + "與通道" + channelId + "的關聯");
    userChannelMap.remove(s);
    UserService userService = SpringUtil.getBean(UserService.class);
    userService.logout(s);
    break;
   }
  }
 }

 /**
  * 列印所有的使用者與通道的關聯資料
  */
 public static void print() {
  for (String s : userChannelMap.keySet()) {
   System.out.println("使用者id:" + s + " 通道:" + userChannelMap.get(s).id());
  }
 }

 /**
  * 根據好友id獲取對應的通道
  * @param receiverNum 接收人編號
  * @return Netty通道
  */
 public static Channel get(String receiverNum) {
  return userChannelMap.get(receiverNum);
 }

 /**
  * 獲取channel組
  * @return
  */
 public static ChannelGroup getChannelGroup() {
  return channelGroup;
 }

 /**
  * 獲取使用者channel map
  * @return
  */
 public static ConcurrentHashMap<String,Channel> getUserChannelMap(){
  return userChannelMap;
 }
}

六.自定義訊息型別

public class Message {
 /**
  * 訊息型別
  */
 private Integer type;
 /**
  * 聊天訊息
  */
 private String message;
 /**
  * 擴充套件訊息欄位
  */
 private Object ext;
 public Integer getType() {
  return type;
 }

 public void setType(Integer type) {
  this.type = type;
 }

 public MarketChatRecord getChatRecord() {
  return marketChatRecord;
 }
 public void setChatRecord(MarketChatRecord chatRecord) {
  this.marketChatRecord = chatRecord;
 }

 public Object getExt() {
  return ext;
 }

 public void setExt(Object ext) {
  this.ext = ext;
 }

 @Override
 public String toString() {
  return "Message{" +
    "type=" + type +
    ",marketChatRecord=" + marketChatRecord +
    ",ext=" + ext +
    '}';
 }

}

七.建立處理訊息的handler

public class ChatHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
 private static final Logger log = LoggerFactory.getLogger(WebSocketServer.class);


 /**
  * 用來儲存所有的客戶端連線
  */
 private static ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

 /**
  *當Channel中有新的事件訊息會自動呼叫
  */
 @Override
 protected void channelRead0(ChannelHandlerContext ctx,TextWebSocketFrame msg) throws Exception {
  // 當接收到資料後會自動呼叫
  // 獲取客戶端傳送過來的文字訊息
  Gson gson = new Gson();
  log.info("伺服器收到訊息:{}",msg.text());
  System.out.println("接收到訊息資料為:" + msg.text());
  Message message = gson.fromJson(msg.text(),Message.class); 
//根據業務要求進行訊息處理
  switch (message.getType()) {
   // 處理客戶端連線的訊息
   case 0:
    // 建立使用者與通道的關聯
   // 處理客戶端傳送好友訊息
    break;
   case 1:
   // 處理客戶端的簽收訊息
    break;
   case 2:
    // 將訊息記錄設定為已讀
    break;
   case 3:
    // 接收心跳訊息
    break;
   default:
    break;
  }

 }

 // 當有新的客戶端連線伺服器之後,會自動呼叫這個方法
 @Override
 public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
  log.info("handlerAdded 被呼叫"+ctx.channel().id().asLongText());
  // 新增到channelGroup 通道組
  UserChannelMap.getChannelGroup().add(ctx.channel());
//  clients.add(ctx.channel());
 }

 @Override
 public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause) throws Exception {
  log.info("{異常:}"+cause.getMessage());
  // 刪除通道
  UserChannelMap.getChannelGroup().remove(ctx.channel());
  UserChannelMap.removeByChannelId(ctx.channel().id().asLongText());
  ctx.channel().close();
 }

 @Override
 public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
  log.info("handlerRemoved 被呼叫"+ctx.channel().id().asLongText());
  //刪除通道
  UserChannelMap.getChannelGroup().remove(ctx.channel());
  UserChannelMap.removeByChannelId(ctx.channel().id().asLongText());
  UserChannelMap.print();
 }

}

八.處理心跳

public class HearBeatHandler extends ChannelInboundHandlerAdapter {

 @Override
 public void userEventTriggered(ChannelHandlerContext ctx,Object evt) throws Exception {
  if(evt instanceof IdleStateEvent) {
   IdleStateEvent idleStateEvent = (IdleStateEvent)evt;

   if(idleStateEvent.state() == IdleState.READER_IDLE) {
    System.out.println("讀空閒事件觸發...");
   }
   else if(idleStateEvent.state() == IdleState.WRITER_IDLE) {
    System.out.println("寫空閒事件觸發...");
   }
   else if(idleStateEvent.state() == IdleState.ALL_IDLE) {
    System.out.println("---------------");
    System.out.println("讀寫空閒事件觸發");
    System.out.println("關閉通道資源");
    ctx.channel().close();
   }
  }
 }
}

搭建完成後呼叫測試

1.頁面訪問http://localhost:9001/ws
2.埠號9001和訪問路徑ws都是我們在上邊配置的,然後傳入我們自定義的訊息message型別。
3.大概流程:訊息傳送 :使用者1先連線通道,然後傳送訊息給使用者2,使用者2若是線上直接可以傳送給使用者,若沒線上可以將訊息暫存在redis或者通道里,使用者2連結通道的話,兩者可以直接通訊。
訊息推送 :使用者1連線通道,根據通道id查詢要推送的人是否線上,或者推送給所有人,這裡我只推送給指定的人。

到此這篇關於SpringBoot+Netty+WebSocket實現訊息傳送的示例程式碼的文章就介紹到這了,更多相關SpringBoot Netty WebSocket訊息傳送內容請搜尋我們以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援我們!