1. 程式人生 > >基於netty-socketio的web聊天--傳送純文字訊息

基於netty-socketio的web聊天--傳送純文字訊息

在WEB專案中,伺服器向WEB頁面推送訊息是一種常見的業務需求。PC端的推送技術可以使用socket建立一個長連線來實現。傳統的web服務都是客戶端發出請求,服務端給出響應。但是現在直觀的要求是允許特定時間內在沒有客戶端發起請求的情況下服務端主動推送訊息到客戶端。最近的預警系統中,需要服務端向預警系統推送商品行情和K線相關的資料,所以對常用的WEB端推送方式進行調研。常見的手段主要包括以下幾種:

  • 輪詢(俗稱“拉”,polling):Ajax 隔一段時間向伺服器傳送請求,詢問資料是否發生改變,從而進行增量式的更新。輪詢的時間間隔成了一個問題:間隔太短,會有大量的請求傳送到伺服器,會對伺服器負載造成影響;間隔太長業務資料的實時性得不到保證。連。使用輪詢的優點是實現邏輯簡單,缺點是無效請求的數量多,在使用者量較大的情況下,伺服器負載較高。因此輪詢的方式通常在併發數量較少、並且對訊息實時性要求不高的情況下使用。
  • 長輪詢技術(long-polling):客戶端向伺服器傳送Ajax請求,伺服器接到請求後hold住連線,直到有新訊息或超時(設定)才返回響應資訊並關閉連線,客戶端處理完響應資訊後再向伺服器傳送新的請求。長輪詢技術的優點是訊息實時性高,無訊息的情況下不會進行頻繁的請求;缺點是服務端維持和客戶端的連線會消耗掉一部分資源。
  • 外掛提供socket方式:比如利用Flash XMLSocket,Java Applet套介面,Activex包裝的socket。優點是對原生socket的支援,和PC端和移動端的實現方式相似;缺點是瀏覽器需要安裝相應的外掛。
  • WebSocket:是HTML5開始提供的一種瀏覽器與伺服器間進行全雙工通訊的網路技術。其優點是更好的節省伺服器資源和頻寬並達到實時通訊;缺點是目前還未普及,瀏覽器支援不好;

  綜上,考慮到瀏覽器相容性和效能問題,採用長輪詢(long-polling)是一種比較好的方式。netty-socketio是一個開源的Socket.io伺服器端的一個java的實現, 它基於Netty框架。 專案地址為: 

以下基於Netty-socketIO實現一個簡單的聊天室功能,首先引入依賴:

        <dependency>
			<groupId>com.corundumstudio.socketio</groupId>
			<artifactId>netty-socketio</artifactId>
			<version>1.7.7</version>
		</dependency>
		<dependency>
			<groupId>org.slf4j</groupId>
			<artifactId>slf4j-simple</artifactId>
			<version>1.7.7</version>
		</dependency>

定義Listen,使用者監聽Oncennect、disconnect和OnMSG事件:

package com.ps.learn.socketio.service;

import com.corundumstudio.socketio.SocketIOClient;
import com.corundumstudio.socketio.annotation.OnConnect;
import com.corundumstudio.socketio.annotation.OnDisconnect;
import com.corundumstudio.socketio.annotation.OnEvent;
import com.ps.learn.socketio.entity.MsgBean;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;

/**
 * Created by Administrator on 2018/7/21 0021.
 *  定義Listen,使用者監聽Oncennect、disconnect和OnMSG事件:
 */
@Service("eventListenner")
public class EventListenner {
    @Resource(name = "clientCache")
    private SocketIOClientCache clientCache;

    @Resource(name = "socketIOResponse")
    private SocketIOResponse socketIOResponse;

    @OnConnect
    public void onConnect(SocketIOClient client) {
        //根據使用者id 或Cookie 存放 此處為模擬場景
        String no0 = client.getHandshakeData().getSingleUrlParam("no");
        System.out.println("工號為no = "+no0+"的使用者建立WebSocket連線");
        clientCache.addClient(client,no0);
        System.out.println("建立連線");
    }

    @OnEvent("OnMSG")
    public void onSync(SocketIOClient client, MsgBean bean) {
        System.out.printf("收到訊息-from: %s to:%s\n", bean.getFrom(), bean.getTo());
        SocketIOClient ioClients = clientCache.getClient(bean.getTo());
        System.out.println("clientCache");
        if (ioClients == null) {
            System.out.println("你傳送訊息的使用者不線上");
            return;
        }
        socketIOResponse.sendEvent(ioClients,bean);
    }

    @OnDisconnect
    public void onDisconnect(SocketIOClient client) {
        System.out.println("關閉連線");
    }
}

定義訊息傳送類:

package com.ps.learn.socketio.service;

import com.corundumstudio.socketio.SocketIOClient;
import com.ps.learn.socketio.entity.MsgBean;
import org.springframework.stereotype.Service;

/**
 * Created by Administrator on 2018/7/21 0021.
 * 定義訊息傳送類:
 */
@Service("socketIOResponse")
public class SocketIOResponse {
    public void sendEvent(SocketIOClient client, MsgBean bean) {
        System.out.println("推送訊息");
        client.sendEvent("OnMSG", bean);
    }
}

定義Cache用於儲存所有和Web端的連線:

package com.ps.learn.socketio.service;

import com.corundumstudio.socketio.SocketIOClient;
import com.ps.learn.socketio.entity.MsgBean;
import org.springframework.stereotype.Service;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * Created by Administrator on 2018/7/21 0021.
 * 定義Cache用於儲存所有和Web端的連線:
 */
@Service("clientCache")
public class SocketIOClientCache {
    //String:EventType型別
    private Map<String,SocketIOClient> clients=new ConcurrentHashMap<String,SocketIOClient>();

    //使用者傳送訊息新增
    public void addClient(SocketIOClient client,MsgBean msgBean){
        clients.put(msgBean.getFrom(),client);
    }

    //使用者連線進來新增使用者
    public void addClient(SocketIOClient client,String userId){
        clients.put(userId,client);
    }

    //使用者退出時移除
    public void remove(MsgBean msgBean) {
        clients.remove(msgBean.getFrom());
    }

    //獲取所有
    public  SocketIOClient getClient(String to) {
        return clients.get(to);
    }
}

定義Server:

package com.ps.learn.socketio.service;

import com.corundumstudio.socketio.Configuration;
import com.corundumstudio.socketio.SocketConfig;
import com.corundumstudio.socketio.SocketIOServer;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;

/**
 * Created by Administrator on 2018/7/21 0021.
 * /繼承InitializingBean,使Spring載入完配置檔案,自動執行如下方法
 */
@Service("chatServer")
public class ChatServer  implements InitializingBean {
    @Resource
    private EventListenner eventListenner;
    public void afterPropertiesSet() throws Exception {
        Configuration config = new Configuration();
        config.setPort(9098);

        SocketConfig socketConfig = new SocketConfig();
        socketConfig.setReuseAddress(true);
        socketConfig.setTcpNoDelay(true);
        socketConfig.setSoLinger(0);
        config.setSocketConfig(socketConfig);
        config.setHostname("localhost");

        SocketIOServer server = new SocketIOServer(config);
        server.addListeners(eventListenner);
        server.start();
        System.out.println("啟動正常");
    }
}

定義MSGbean:

package com.ps.learn.socketio.entity;

/**
 * Created by Administrator on 2018/7/21 0021.
 */
public class MsgBean {
    private String from;
    private String to;
    private String content;

    public String getFrom() {
        return from;
    }
    public void setFrom(String from) {
        this.from = from;
    }
    public String getTo() {
        return to;
    }
    public void setTo(String to) {
        this.to = to;
    }
    public String getContent() {
        return content;
    }
    public void setContent(String content) {
        this.content = content;
    }

    @Override
    public String toString() {
        return "MsgBean [from=" + from + ", to=" + to + ", content=" + content + "]";
    }
}

HTML頁面:

<!DOCTYPE html>
<html>
<head>
    <meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
    <title>Socketio chat</title>
    <script src="./jquery-1.7.2.min.js" type="text/javascript"></script>
    <script type="text/javascript" src="./socket.io/socket.io.js"></script>
    <style>
        body {
            padding: 20px;
        }
        #console {
            height: 400px;
            overflow: auto;
        }
        .username-msg {
            color: orange;
        }
        .connect-msg {
            color: green;
        }
        .disconnect-msg {
            color: red;
        }
        .send-msg {
            color: #888
        }
    </style>
</head>
<body>
<h1>Netty-socketio chat demo</h1>
<br />
<div id="console" class="well"></div>
<form class="well form-inline" onsubmit="return false;">
    <input id="to" class="input-xlarge" type="text" placeholder="to. . . " />
    <input id="content" class="input-xlarge" type="text" placeholder="content. . . " />
    <button type="button" onClick="sendMessage()" class="btn">Send</button>
    <button type="button" onClick="sendDisconnect()" class="btn">Disconnect</button>
</form>
</body>
<script type="text/javascript">
    var username = Math.random()+"22334";
    var socket = io.connect('http://localhost:9098?no=' + username);
    socket.on('connect',function() {
        output('<span class="connect-msg">Client has connected to the server!</span>');
    });

    socket.on('OnMSG', function(data) {
        outputData( data);
    });

    socket.on('disconnect',function() {
        output('<span class="disconnect-msg">The client has disconnected! </span>');
    });

    function sendDisconnect() {
        socket.disconnect();
    }

    function sendMessage() {
        var from = username;
        var to = $("#to").val();
        var content = $('#content').val();
        socket.emit('OnMSG', {
            from : from,
            to : to,
            content : content
        });
    }

    function output(message) {
        var currentTime = "<span class='time' >" + new Date() + "</span>";
        var element = $("<div>" + currentTime + " " + message + "</div>");
        $('#console').prepend(element);
    }
    function outputData(data) {
        console.log(data)
        var currentTime = "<span class='time' >" + new Date() + " from "+data.from+" </span>";
        var element = $("<span class='username-msg'><div>" + currentTime + "; message == " +data.content + "</div></span>");
        $('#console').prepend(element);
    }
</script>
</html>

新增namespace

package com.ps.uzkefu.apps.ctilink.service.impl;

import com.corundumstudio.socketio.*;
import com.corundumstudio.socketio.store.RedissonStoreFactory;
import com.corundumstudio.socketio.store.StoreFactory;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;

/**
 * Author:ZhuShangJin
 * Date:2018/10/8
 */
@Service
public class IPPhoneSocketServer implements InitializingBean {
    @Resource
    private IPPhoneEventListenner eventListenner;
    @Value("${ipphone.socket.server.port}")
    private Integer socketPort;
    @Autowired
    RedissonClient redissonClient;
    @Override
    public void afterPropertiesSet() throws Exception {
        Configuration config = new Configuration();
        config.setPort(socketPort);
        StoreFactory storeFactory = new RedissonStoreFactory(redissonClient);
        SocketConfig socketConfig = new SocketConfig();
        socketConfig.setReuseAddress(true);
        socketConfig.setTcpNoDelay(true);
        socketConfig.setSoLinger(0);
        socketConfig.setTcpKeepAlive(true);
        config.setSocketConfig(socketConfig);
        //解決圖片過大
//        ERROR 3044 --- [ntLoopGroup-3-7] c.c.s.listener.DefaultExceptionListener  : Max frame length of 65536 has been exceeded.
        config.setMaxFramePayloadLength(65536000);
        config.setStoreFactory(storeFactory);
        config.setWorkerThreads(100);

        SocketIOServer server = new SocketIOServer(config);
//        不加namespace
//         server.addListeners(eventListenner);


        //新增 namespace
        SocketIONamespace ipPhoneSocketNameSpace = server.addNamespace("/ipPhone")  ;
        ipPhoneSocketNameSpace.addListeners(eventListenner);
        server.start();
        System.out.println("IPPhoneSocketServer啟動正常");
    }




}

demo地址