1. 程式人生 > >基於Netty-Socket-io的無直接呼叫式的資料傳輸

基於Netty-Socket-io的無直接呼叫式的資料傳輸

實時訊息的推送,PC端的推送技術可以使用socket建立一個長連線來實現。傳統的web服務都是客戶端發出請求,服務端給出響應。但是現在直觀的要求是允許特定時間內在沒有客戶端發起請求的情況下服務端主動推送訊息到客戶端。

有哪些可以實現web訊息推送的技術:

①不斷地輪詢(俗稱“拉”,polling)是獲取實時訊息的一個手段:Ajax 隔一段時間(通常使用 JavaScript 的 setTimeout 函式)就去伺服器查詢是否有改變,從而進行增量式的更新。但是間隔多長時間去查詢成了問題,因為效能和即時性造成了嚴重的反比關係。間隔太短,連續不斷的請求會沖垮伺服器,間隔太長,務器上的新資料就需要越多的時間才能到達客戶機。

優點:服務端邏輯簡單;

缺點:其中大多數請求可能是無效請求,在大量使用者輪詢很頻繁的情況下對伺服器的壓力很大;

應用:併發使用者量少,而且要求訊息的實時性不高,一般很少採用;

②長輪詢技術(long-polling):客戶端向伺服器傳送Ajax請求,伺服器接到請求後hold住連線,直到有新訊息或超時(設定)才返回響應資訊並關閉連線,客戶端處理完響應資訊後再向伺服器傳送新的請求。

優點:實時性高,無訊息的情況下不會進行頻繁的請求;

缺點:伺服器維持著連線期間會消耗資源;

③基於Iframe及htmlfile的流(streaming)方式:iframe流方式是在頁面中插入一個隱藏的iframe,利用其src屬性在伺服器和客戶端之間建立一條長連結,伺服器向iframe傳輸資料(通常是HTML,內有負責插入資訊的javascript),來實時更新頁面。

優點:訊息能夠實時到達;

缺點:伺服器維持著長連線期會消耗資源;

④外掛提供socket方式:比如利用Flash XMLSocket,Java Applet套介面,Activex包裝的socket。

優點:原生socket的支援,和PC端和移動端的實現方式相似;

缺點:瀏覽器端需要裝相應的外掛;

⑤WebSocket:是HTML5開始提供的一種瀏覽器與伺服器間進行全雙工通訊的網路技術。

優點:更好的節省伺服器資源和頻寬並達到實時通訊;

缺點:目前還未普及,瀏覽器支援不好;

綜上,考慮到瀏覽器相容性和效能問題,採用長輪詢(long-polling)是一種比較好的方式。
netty-socketio是一個開源的Socket.io伺服器端的一個java的實現, 它基於Netty框架。 專案地址為(https://github.com/mrniko/netty-socketio)

基於專案需要,我們系統要做一箇中轉的服務,類似:三方系統A呼叫我們的介面通過引數的形式把資料傳給我們,然後我們在接收到這個引數並做檢驗之後馬上推送給三方系統B,實現資料的高實時性。但由於三方系統B是純前端專案,類似Echarts,無任何後臺,重要的是他們無法知道什麼時間三方A系統呼叫了我們的介面傳給我們資料,所以三方B系統無法通過介面形式來獲取資料,鑑於此,參考了一下實時通訊的案例,發現
netty-socketio是一個很好的解決方案,然後自己改造了下完成了自己的需求,改造如下:

POM依賴

<!-- socket心跳包 -->
        <dependency
>
<groupId>com.corundumstudio.socketio</groupId> <artifactId>netty-socketio</artifactId> <version>1.7.3</version> </dependency>

這個包下有很多依賴包,大概七八個。

工具類封裝
package com.spdb.hs.data.utils;

import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;

import com.corundumstudio.socketio.Configuration;
import com.corundumstudio.socketio.SocketIOServer;

public class WebSocketService implements ApplicationListener{

private static Logger log = LoggerFactory.getLogger(WebSocketService.class);

private  static Configuration config;
private  static SocketIOServer server;
private String eventName;
private String data;

public WebSocketService(){};

public WebSocketService(String eventName,String data){
    this.data=data;
    this.eventName=eventName;
}

 public void startLisener(){  
     try {  
            SocketEventListener(eventName,data);  
        } catch (Exception e) {  
            server.stop();  
            e.printStackTrace();  
        }  


}  


private void SocketEventListener(final String eventName, final String data) {
    try {
        //預設給所有連線的客戶端推送資料,廣播形式
        server.getBroadcastOperations().sendEvent(eventName, data);
    } catch (Exception e) {
        log.error(e.getMessage(), e);
    }


}

@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
    Properties p = new Properties();
    InputStream in = event.getApplicationContext().getClassLoader().getResourceAsStream("conf/socket.properties");
    try {
        p.load(in);
    } catch (IOException e) {
        log.error("Socket配置檔案讀取失敗......", e);
    }
    String Host = p.getProperty("socket.host");
    String Port = p.getProperty("socket.port");
    log.info("Host="+Host+",Port="+Port);
    if(server==null){
        config = new Configuration();  
        config.setHostname(Host);  
        config.setPort(Integer.valueOf(Port));  
        server = new SocketIOServer(config);  
        server.start();  
        log.info("Socket Service was started");
    }
}

}

實現了ApplicationListener介面,這麼做的原因是讓專案在spring容器載入完成後預設會去做的事,目的在於專案一啟動就建立唯一的SocketService服務端。後續可以已呼叫方法的形式自定義的給客戶端發信息(具體服務端與特定客戶端通訊省略)

配置檔案

Socket.properties

socket.host=xxx.xxx.xxx.xxx
socket.port=9001

spring-bean.xml

<bean id="webSocketService" class="com.spdb.hs.data.utils.WebSocketService"/>

這個東西必須在bean中配置好,不然不起作用。

呼叫例項

package com.spdb.hs.data.controller.app;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;

import com.mchange.v2.lang.StringUtils;
import com.spdb.hs.data.parameter.EventType;
import com.spdb.hs.data.utils.ApiService;
import com.spdb.hs.data.utils.WebSocketService;
import com.spdb.hs.data.vo.RspVo;
import com.spdb.hs.oms.vo.PacketHead;

@Controller
@RequestMapping("/drill")
public class DrillController {

    private Logger log = LoggerFactory.getLogger(DrillController.class);

    /**   
    *    
    * 方法描述:   
    * 建立人:zhy 
    * 建立時間:2017年9月21日 下午2:27:44   
    * 修改時間:2017年9月21日 下午2:27:44   
    * Description: 啟動
    *    
    */
    @RequestMapping("quickStart")
    @ResponseBody
    public RspVo getReady(){
        RspVo vo = new RspVo();
        String drillUrl = "xxx.xxx.xxx.xxx";
        String data = null;
        try {
            data = new ApiService().doGet(drillUrl);
        } catch (Exception e) {
            log.error(e.getMessage(),e);
        }
        if(null==data){
            vo.setCode(PacketHead.JSON_ERR_FORMAT);
            vo.setMsg("請求引數錯誤,請檢查資料......");
        }else{
            vo.setData(data);
        }
        return vo;
    }

    /**   
    *    
    * 方法描述:   
    * 建立人:zhy 
    * 建立時間:2017年9月21日 下午2:27:30   
    * 修改時間:2017年9月21日 下午2:27:30   
    * Description:  定時更新介面
    *    
    */
    @RequestMapping(value="timerUpd")
    @ResponseBody
    public RspVo timingUpdate(@RequestParam(required=true)String data){
                    RspVo vo = new RspVo();
                    log.info(" 定時更新介面接收到的引數為:data="+data);
                    if(StringUtils.nonEmptyString(data)){
                        try {
                            WebSocketService socketService = new WebSocketService(EventType.EventType_Timer,data);
                            socketService.startLisener();
                        } catch (Exception e) {
                            log.error(e.getMessage(), e);
                            vo.setCode(PacketHead.JSON_ERR_SYS);
                            vo.setMsg("系統繁忙請稍候再試......");
                            return vo;
                        }
                    }
                    return vo;

    }

/*  @RequestMapping(value="timerUpd2")
    @ResponseBody
    public RspVo timingUpdate2(@RequestParam(required=true)String beginTime,@RequestParam(required=true)String endTime,
                                                     @RequestParam(required=true)String processStatus,@RequestParam(required=true)String onlineUserCount,
                                                     @RequestParam(required=true)String allUserCount,@RequestParam(required=true)String facilityCount,
                                                     @RequestParam(required=true)String allTaskCount,@RequestParam(required=true)String autoTaskCount,
                                                     @RequestParam(required=true)String doneTaskCount,@RequestParam(required=true) String customFields){

                    log.info(" 定時更新介面接收到的引數為:beginTime="+beginTime+",endTime="+endTime+",processStatus="+processStatus+
                                                           ",onlineUserCount="+onlineUserCount+",allUserCount="+allUserCount+",facilityCount="+facilityCount+
                                                           ",allTaskCount="+allTaskCount+",autoTaskCount="+autoTaskCount+",doneTaskCount="+doneTaskCount+
                                                           ",customFields="+customFields);

                    RspVo vo = new RspVo();
                    return vo;
    }*/


    /**   
    *    
    * 方法描述:   
    * 建立人:zhy 
    * 建立時間:2017年9月21日 下午2:45:38   
    * 修改時間:2017年9月21日 下午2:45:38   
    * Description:  任務更新介面
    *    
    */
    @RequestMapping(value="tasksUpd")
    @ResponseBody
    public RspVo tasksUpdate(@RequestParam(required=true)String data){
         RspVo vo = new RspVo();
         log.info(" 任務更新介面接收到的引數為:data="+data);
         if(StringUtils.nonEmptyString(data)){
            try {
                    WebSocketService socketService = new WebSocketService(EventType.EventType_Tasks,data);
                    socketService.startLisener();
                } catch (Exception e) {
                    log.error(e.getMessage(), e);
                    vo.setCode(PacketHead.JSON_ERR_SYS);
                    vo.setMsg("系統繁忙請稍候再試......");
                    return vo;
                }
         }
         return vo;
    }

/*  @RequestMapping(value="tasksUpd2")
    @ResponseBody
    public RspVo tasksUpdate2(@RequestParam(required=true)String taskId,@RequestParam(required=true)String taskType,
                                                     @RequestParam(required=true)String taskStatus,@RequestParam(required=true)String beginTime,
                                                     @RequestParam(required=true)String acceptTime,@RequestParam(required=true)String endTime,
                                                     @RequestParam(required=true)String duration){
        log.info("任務更新介面接收到的引數為:taskId="+taskId+",taskType="+taskType+",taskStatus="+taskStatus+",beginTime="+beginTime+
                                                                  ",endTime="+endTime+",acceptTime="+acceptTime+",duration="+duration);
        RspVo vo = new RspVo();
        return vo;
    }*/

    /**   
    *    
    * 方法描述:   
    * 建立人:zhy 
    * 建立時間:2017年9月21日 下午2:46:13   
    * 修改時間:2017年9月21日 下午2:46:13   
    * Description:  日誌更新介面
    *    
    */
    @RequestMapping(value="logsUpd")
    @ResponseBody
    public RspVo logsUpdate(@RequestParam(required=true)String data){
        RspVo vo = new RspVo();
        log.info(" 日誌更新介面接收到的引數為:data="+data);
        if(StringUtils.nonEmptyString(data)){
            try {
                    WebSocketService socketService = new WebSocketService(EventType.EventType_Logs,data);
                    socketService.startLisener();
                } catch (Exception e) {
                    log.error(e.getMessage(), e);
                    vo.setCode(PacketHead.JSON_ERR_SYS);
                    vo.setMsg("系統繁忙請稍候再試......");
                    return vo;
                }
        }
        return vo;
    }

    /*@RequestMapping(value="logsUpd2")
    @ResponseBody
    public RspVo logsUpdate2(@RequestParam(required=true)String id,@RequestParam(required=true)String content) {
        log.info("日誌更新介面接收到的引數為:id="+id+",content="+content);
        RspVo vo = new RspVo();
        return vo;
    }*/

    /**   
    *    
    * 方法描述:   
    * 建立人:zhy 
    * 建立時間:2017年9月21日 下午2:46:13   
    * 修改時間:2017年9月21日 下午2:46:13   
    * Description:  狀態更新介面
    *    
    */
    @RequestMapping(value="statusUpd")
    @ResponseBody
    public RspVo statusUpdate(@RequestParam String data){
        RspVo vo = new RspVo();
        log.info(" 狀態更新介面接收到的引數為:data="+data);
        if(StringUtils.nonEmptyString(data)){
            try {
                    WebSocketService socketService = new WebSocketService(EventType.EventType_Status,data);
                    socketService.startLisener();
                } catch (Exception e) {
                    log.error(e.getMessage(), e);
                    vo.setCode(PacketHead.JSON_ERR_SYS);
                    vo.setMsg("系統繁忙請稍候再試......");
                    return vo;
                }
        }
        return vo;
    }
    /**   
     *    
     * 方法描述:   
     * 建立人:zhy 
     * 建立時間:2017年9月21日 下午2:46:13   
     * 修改時間:2017年9月21日 下午2:46:13   
     * Description: 人員更新介面
     *    
     */
    @RequestMapping(value="usersUpd")
    @ResponseBody
    public RspVo usersUpdate(@RequestParam(required=true)String data){
        RspVo vo = new RspVo();
        log.info(" 人員更新介面接收到的引數為:data="+data);
        if(StringUtils.nonEmptyString(data)){
            try {
                    WebSocketService socketService = new WebSocketService(EventType.EventType_Users,data);
                    socketService.startLisener();
                } catch (Exception e) {
                    log.error(e.getMessage(), e);
                    vo.setCode(PacketHead.JSON_ERR_SYS);
                    vo.setMsg("系統繁忙請稍候再試......");
                    return vo;
                }
        }
        return vo;
    }


}

簡單的說就是我通過HttpClient去三方A獲取一個啟動資料,然後三方A會不斷呼叫我的介面給我傳資料,我一接收到資料就穿給三方系統B

package com.spdb.hs.data.utils;

import java.io.IOException;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import org.apache.commons.lang.StringUtils;
import org.apache.http.NameValuePair;
import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.message.BasicNameValuePair;
import org.apache.http.util.EntityUtils;

public class ApiService{

    /**
     * 
     * @return 響應體的內容
     * @throws IOException 
     * @throws ClientProtocolException 
     */
    public String doGet(String url) throws ClientProtocolException, IOException{

        // 建立http GET請求
        HttpGet httpGet = new HttpGet(url);
        httpGet.setConfig(this.getConfig());//設定請求引數
        CloseableHttpResponse response = null;
        try {
            // 執行請求
            response = this.getHttpClient().execute(httpGet);
            // 判斷返回狀態是否為200
            if (response.getStatusLine().getStatusCode() == 200) {
                String content = EntityUtils.toString(response.getEntity(), "UTF-8");
//                System.out.println("內容長度:"+content.length());
                return content;
            }
        } finally {
            if (response != null) {
                response.close();
            }
            //httpClient.close();
        }
        return null;
    }

    /**
     * 帶有引數的get請求
     * @param url
     * @return
     * @throws URISyntaxException 
     * @throws IOException 
     * @throws ClientProtocolException 
     */
    public String doGet(String url , Map<String, String> params) throws URISyntaxException, ClientProtocolException, IOException{
        URIBuilder uriBuilder = new URIBuilder(url);
        if(params != null){
            for(String key : params.keySet()){
                uriBuilder.setParameter(key, params.get(key));
            }
        }//http://xxx?ss=ss
        return this.doGet(uriBuilder.build().toString());
    }
    /**
     * 帶有引數的post請求
     * @param url
     * @param params
     * @return
     * @throws IOException 
     * @throws ClientProtocolException 
     */
    public HttpResult doPost(String url , Map<String, String> params) throws ClientProtocolException, IOException{


        // 建立http POST請求
        HttpPost httpPost = new HttpPost(url);
        httpPost.setConfig(this.getConfig());
        if(params != null){

            // 設定2個post引數,一個是scope、一個是q
            List<NameValuePair> parameters = new ArrayList<NameValuePair>(0);

            for(String key : params.keySet()){
                parameters.add(new BasicNameValuePair(key, params.get(key)));
            }
            // 構造一個form表單式的實體
            UrlEncodedFormEntity formEntity = new UrlEncodedFormEntity(parameters);
            // 將請求實體設定到httpPost物件中
            httpPost.setEntity(formEntity);
        }

        CloseableHttpResponse response = null;
        try {
            // 執行請求
            response = this.getHttpClient().execute(httpPost);
            // 判斷返回狀態是否為200
            /*if (response.getStatusLine().getStatusCode() == 200) {
                String content = EntityUtils.toString(response.getEntity(), "UTF-8");
                System.out.println(content);
            }*/
            return new HttpResult(response.getStatusLine().getStatusCode(),EntityUtils.toString(response.getEntity(), "UTF-8"));
        } finally {
            if (response != null) {
                response.close();
            }
            //httpClient.close();
        }
    }

    public HttpResult doPostJson(String url , String json) throws ClientProtocolException, IOException{
         // 建立http POST請求
        HttpPost httpPost = new HttpPost(url);
        httpPost.setConfig(this.getConfig());
        if(StringUtils.isNotBlank(json)){
            //標識出傳遞的引數是 application/json
            StringEntity stringEntity = new StringEntity(json, ContentType.APPLICATION_JSON);
            httpPost.setEntity(stringEntity);
        }

        CloseableHttpResponse response = null;
        try {
            // 執行請求
            response =this.getHttpClient().execute(httpPost);
            // 判斷返回狀態是否為200
            /*if (response.getStatusLine().getStatusCode() == 200) {
                String content = EntityUtils.toString(response.getEntity(), "UTF-8");
                System.out.println(content);
            }*/
            return new HttpResult(response.getStatusLine().getStatusCode(),EntityUtils.toString(response.getEntity(), "UTF-8"));
        } finally {
            if (response != null) {
                response.close();
            }
            //httpClient.close();
        }
    }

    /**
     * 沒有引數的post請求
     * @throws IOException 
     * @throws ClientProtocolException 
     */
    public HttpResult doPost(String url) throws ClientProtocolException, IOException{
        return this.doPost(url, null);
    }

    private CloseableHttpClient getHttpClient(){
        return HttpClientBuilder.create().setMaxConnTotal(200)
                                                                    .setMaxConnPerRoute(100)
                                                                    .build();
    }

    private RequestConfig getConfig(){
        return  RequestConfig.custom().setConnectionRequestTimeout(500)
                                                                 .setSocketTimeout(30000)
                                                                 .setConnectTimeout(5000)
                                                                 .build();

    }

}
package com.spdb.hs.data.utils;

public class HttpResult {

    private int statusCode;
    private String result;

    public HttpResult(int statusCode, String result) {
        super();
        this.statusCode = statusCode;
        this.result = result;
    }

    public int getStatusCode() {
        return statusCode;
    }

    public void setStatusCode(int statusCode) {
        this.statusCode = statusCode;
    }

    public String getResult() {
        return result;
    }

    public void setResult(String result) {
        this.result = result;
    }


}

除錯例項

Index.html

<html>

</html>
<script src="./socket.io.js"></script>
<script>
    var socket = io.connect('http://10.112.5.60:9001');
</script>

socket.io.js這個需要自己去網上下載

在10.112.5.60這個IP的伺服器上我們一起吧專案啟動起來了,9001埠也開通了,那我我們現在要做的就是測試客戶端連線上服務端後會有什麼資料。效果如下:

這裡寫圖片描述

圖片表明測試成功。