1. 程式人生 > >JavaScript WebSocket實現長連線通訊連線重連

JavaScript WebSocket實現長連線通訊連線重連

JavaScript-JS依靠WebSoket也可以像其他語言一樣能夠實現通訊程式碼,不過使用的時候需要注意檢查連線斷開的情況。最基本的必須保證IP和埠都可以訪問。

目錄

WebSocket程式碼及測試

呼叫初始化程式碼

通訊示例程式碼

測試效果

WebSocket連線關閉重連處理

連線斷開問題

程式碼實現重連

重連效果


WebSocket程式碼及測試

呼叫初始化程式碼

 CvNetVideo.Websocket = new WebSocket('ws://' + websocetHost + ":" + websocketPort);
 CvNetVideo.WebsocketCallbackLoader = new WebsocketCallbackLoader();
 CvNetVideo.WebsocketCallbackLoader.connect(CvNetVideo.Websocket);

通訊示例程式碼

/*
 * WebsocketCallback Loader
*/
import JT1078 from '../JX/JTT1078';

class WebsocketCallbackLoader {

    constructor() {
        this.websocket = undefined;
        this.onCallback = undefined;
        this.sendMessage = this.sendMessage;
        this.medisPackageAnalysis = new JT1078.MedisPackageAnalysis();
        this.splitData = new JT1078.SplitData();
        this.checkHelper = new JT1078.CheckHelper();
        //分包快取資料 按SIM做key
        this.AllSimPackData = {};
    }

    connect(websocket) {
        this.websocket = websocket;
        this.websocket.onopen = this.openSocket.bind(this);
        this.websocket.onclose = this.closeSocket.bind(this);
        this.websocket.binaryType = 'arraybuffer';
        this.websocket.onmessage = this.receiveSocketMessage.bind(this);
    }

    closeSocket(e) {
        console.log('Websocket808 Disconnected!');
    }

    openSocket() {
        console.log('Websocket808 Open!');
    }

    receiveSocketMessage(event) {
        let data = new Uint8Array(event.data);
        this.splitData.SplitDataBy7E(data, 18, data.byteLength - 18, (bGps) => {
            if (bGps.length > 11 && this.checkHelper.CheckXOR(bGps, 0, bGps.length - 1) === bGps[bGps.length - 1])//效驗通過
            {
                this.JX808DataPD(new Uint8Array(bGps));
            }
        });
    }
    JX808DataPD(bGps) {
        let head = new JT1078.JTHeader();
        head.FillByBinary(bGps, 0);

        if (head.PackInfo) {
            var sid = JT1078.GetFirstSerialNumber(head);
            let AllPackData = AllSimPackData[head.Sim];
            if (!AllPackData) {
                AllPackData = {};
                AllSimPackData[head.Sim] = AllPackData;
            }

            var pd = AllPackData[sid];
            if (pd) {
                if (!pd.PackData[head.PackInfo.Index]) {
                    pd.PackData.Count += 1;
                }
                pd.PackData[head.PackInfo.Index] = JT1078.GetMsbBody(head, bGps);
                pd.LastRTime = DateTime.Now;
                if (this.CheckPackByPD(pd))
                    AllPackData[sid] = null;
            }
            else {
                pd = {};
                pd.PackData = {};
                pd.PackData[head.PackInfo.Index] = JT1078.GetMsbBody(head, bGps);
                pd.PackData.Count = 1;
                pd.Head = new JT1078.JTHeader(head.MsgId, head.Sub, head.DataEncryptType, head.MsgLen, head.Sim, sid)
                pd.Head.PackInfo = new JT1078.JTPackageInfo(1, head.PackInfo.Sum);
                pd.StartTime = pd.LastRTime = DateTime.Now;
                AllPackData[sid] = pd;
            }

        } else {
            this.JX808Data(head, JT1078.GetMsbBody(head, bGps));
        }
    }
    CheckPackByPD(pd) {
        if (pd.PackData.Count === pd.Head.PackInfo.Sum)//判斷資料包數和總包數是否相等
        {
            let tmpbuf = new Uint8Array();
            let index = 0;
            for (var i = 0; i < pd.Head.PackInfo.Sum; i++) {
                tmpbuf.set(pd.PackData[i], index);
                index += pd.PackData[i].byteLength;
            }
            //解析失敗時返回true防止一直解析此錯誤資料
            try {
                this.JX808Data(pd.Head, tmpbuf);
            }
            catch (ex)
            {
                console.error(ex);
                return true;
            }
            return true;
        }
        return false;
    }
    JX808Data(head, bts) {
        switch (head.MsgId) {

            case 0x1205://終端上傳音視訊資源列表資料格式
                var videolist = new JT1078.JTVideoListInfo();
                videolist.FillByBinary(bts, 0);
                this.onCallback(videolist);
                break;
            case 0x1206://終端上傳FTP完成通知
                break;
            default:
        }
    }

    sendMessage(data, callback) {
        this.onCallback = callback;
        switch (this.websocket.readyState) {
            case 0:
                console.log("Websocket808 readyState=CONNECTING");
                break;
            case 1:
                this.websocket.send(data);
                break;
            case 2:
                console.log("Websocket808 readyState=CLOSING");
                break;
            case 3:
                console.log("Websocket808 readyState=CLOSED");
                break;

        }

    }

}

export default WebsocketCallbackLoader;  

測試效果

WebSocket連線關閉重連處理

連線斷開問題

程式碼實現重連

改造一下呼叫程式碼和傳送訊息程式碼:

   static initWebsocket(websocetHost, websocketPort) {
        CvNetVideo.WebsocketCallbackLoader = new WebsocketCallbackLoader(websocetHost, websocketPort);
        CvNetVideo.WebsocketCallbackLoader.connect();
    }
/*
 * WebsocketCallback Loader
*/
import JT1078 from '../JX/JTT1078';
import { setTimeout } from 'timers';

class WebsocketCallbackLoader {

    constructor(host, port) {
        this.host = host;
        this.port = port;
        this.websocket = undefined;
        this.onCallback = undefined;
        this.sendMessage = this.sendMessage;
        this.medisPackageAnalysis = new JT1078.MedisPackageAnalysis();
        this.splitData = new JT1078.SplitData();
        this.checkHelper = new JT1078.CheckHelper();
        //分包快取資料 按SIM做key
        this.AllSimPackData = {};
    }

    connect() {
        var url = "ws://" + this.host + ":" + this.port;
        console.log("Websocket808 WebSocket connect ..." + url);
        this.websocket = new WebSocket(url);
        this.websocket.onopen = this.openSocket.bind(this);
        this.websocket.onclose = this.closeSocket.bind(this);
        this.websocket.binaryType = 'arraybuffer';
        this.websocket.onmessage = this.receiveSocketMessage.bind(this);
    }

    closeSocket(e) {
        console.log('Websocket808 Disconnected!');
    }

    openSocket() {
        console.log('Websocket808 Open!');
    }

    receiveSocketMessage(event) {
        let data = new Uint8Array(event.data);
        this.splitData.SplitDataBy7E(data, 18, data.byteLength - 18, (bGps) => {
            if (bGps.length > 11 && this.checkHelper.CheckXOR(bGps, 0, bGps.length - 1) === bGps[bGps.length - 1])//效驗通過
            {
                this.JX808DataPD(new Uint8Array(bGps));
            }
        });
    }
    JX808DataPD(bGps) {
        let head = new JT1078.JTHeader();
        head.FillByBinary(bGps, 0);

        if (head.PackInfo) {
            var sid = JT1078.GetFirstSerialNumber(head);
            let AllPackData = AllSimPackData[head.Sim];
            if (!AllPackData) {
                AllPackData = {};
                AllSimPackData[head.Sim] = AllPackData;
            }

            var pd = AllPackData[sid];
            if (pd) {
                if (!pd.PackData[head.PackInfo.Index]) {
                    pd.PackData.Count += 1;
                }
                pd.PackData[head.PackInfo.Index] = JT1078.GetMsbBody(head, bGps);
                pd.LastRTime = DateTime.Now;
                if (this.CheckPackByPD(pd))
                    AllPackData[sid] = null;
            }
            else {
                pd = {};
                pd.PackData = {};
                pd.PackData[head.PackInfo.Index] = JT1078.GetMsbBody(head, bGps);
                pd.PackData.Count = 1;
                pd.Head = new JT1078.JTHeader(head.MsgId, head.Sub, head.DataEncryptType, head.MsgLen, head.Sim, sid)
                pd.Head.PackInfo = new JT1078.JTPackageInfo(1, head.PackInfo.Sum);
                pd.StartTime = pd.LastRTime = DateTime.Now;
                AllPackData[sid] = pd;
            }

        } else {
            this.JX808Data(head, JT1078.GetMsbBody(head, bGps));
        }
    }
    CheckPackByPD(pd) {
        if (pd.PackData.Count === pd.Head.PackInfo.Sum)//判斷資料包數和總包數是否相等
        {
            let tmpbuf = new Uint8Array();
            let index = 0;
            for (var i = 0; i < pd.Head.PackInfo.Sum; i++) {
                tmpbuf.set(pd.PackData[i], index);
                index += pd.PackData[i].byteLength;
            }
            //解析失敗時返回true防止一直解析此錯誤資料
            try {
                this.JX808Data(pd.Head, tmpbuf);
            }
            catch (ex)
            {
                console.error(ex);
                return true;
            }
            return true;
        }
        return false;
    }
    JX808Data(head, bts) {
        switch (head.MsgId) {

            case 0x1205://終端上傳音視訊資源列表資料格式
                var videolist = new JT1078.JTVideoListInfo();
                videolist.FillByBinary(bts, 0);
                this.onCallback(videolist);
                break;
            case 0x1206://終端上傳FTP完成通知
                break;
            default:
        }
    }

    printState() {
        switch (this.websocket.readyState) {
            case 0:
                console.log("Websocket808 readyState=CONNECTING");
                break;
            case 1:
                console.log("Websocket808 readyState=OPEN");
                break;
            case 2:
                console.log("Websocket808 readyState=CLOSING");
                break;
            case 3:
                console.log("Websocket808 readyState=CLOSED");
                break;
        }
    }

    sendData(data)
    {
        this.websocket.send(data);
    }

    sendMessage(data, callback) {
        this.onCallback = callback;
        this.printState();
        if (this.websocket.readyState === 0) {
            setTimeout(() => {
                this.sendData(data);
            }, 3000);
        } else if (this.websocket.readyState === 2 || this.websocket.readyState === 3) {
            this.connect();
            setTimeout(() => {
                this.printState();
                this.sendData(data);
            }, 3000);
        } else {
            this.sendData(data);
        }

    }

}

export default WebsocketCallbackLoader;  

重連效果

 下面是連線斷開後重連效果:

注意:重連是非同步呼叫的,使用setTimeout(func,ms) 呼叫,因為連線建立需要時間,直接連很有可能正在連線。

使用佇列處理連線失敗資料

/*
 * WebsocketCallback Loader
*/
import JT1078 from '../JX/JTT1078';
import Queue from '../utils/queue';

class WebsocketCallbackLoader{

    constructor(host, port) {
        this.host = host;
        this.port = port;
        this.websocket = undefined;
        this.medisPackageAnalysis = new JT1078.MedisPackageAnalysis();
        this.splitData = new JT1078.SplitData();
        this.checkHelper = new JT1078.CheckHelper();
        //分包快取資料 按SIM做key
        this.AllSimPackData = {};
        this.sendCache = new Queue();
        this.onCallback = {};
    }

    connect() {
        var url = "ws://" + this.host + ":" + this.port;
        this.websocket = new WebSocket(url);
        this.websocket.onopen = this.openSocket.bind(this);
        this.websocket.onclose = this.closeSocket.bind(this);
        this.websocket.binaryType = 'arraybuffer';
        this.websocket.onmessage = this.receiveSocketMessage.bind(this);
    }

    closeSocket(e) {
        console.log('Websocket808 Disconnected!');
    }

    openSocket() {
        console.log('Websocket808 Open!');
        while (this.sendCache.size() > 0) {
            this.sendData(this.sendCache.dequeue());
        }
    }

    receiveSocketMessage(event) {
        let data = new Uint8Array(event.data);
        this.splitData.SplitDataBy7E(data, 18, data.byteLength - 18, (bGps) => {
            if (bGps.length > 11 && this.checkHelper.CheckXOR(bGps, 0, bGps.length - 1) === bGps[bGps.length - 1])//效驗通過
            {
                this.JX808DataPD(new Uint8Array(bGps));
            }
        });
    }
    JX808DataPD(bGps) {
        let head = new JT1078.JTHeader();
        head.FillByBinary(bGps, 0);

        if (head.PackInfo) {
            var sid = JT1078.GetFirstSerialNumber(head);
            let AllPackData = this.AllSimPackData[head.Sim];
            if (!AllPackData) {
                AllPackData = {};
                this.AllSimPackData[head.Sim] = AllPackData;
            }

            var pd = AllPackData[sid];
            if (pd) {  
                if (!pd.PackData[head.PackInfo.Index]) {
                    pd.PackData.Count += 1;
                    pd.PackData.totalLength += head.MsgLen;
                }
                pd.PackData[head.PackInfo.Index] = JT1078.GetMsbBody(head, bGps);
                pd.LastRTime = new Date();
                if (this.CheckPackByPD(pd))
                    AllPackData[sid] = null;
            }
            else {
                pd = {};
                pd.PackData = {};
                pd.PackData[head.PackInfo.Index] = JT1078.GetMsbBody(head, bGps);
                pd.PackData.Count = 1;
                pd.PackData.totalLength = head.MsgLen;
                pd.Head = new JT1078.JTHeader(head.MsgId, head.Sub, head.DataEncryptType, head.MsgLen, head.Sim, sid);
                pd.Head.PackInfo = new JT1078.JTPackageInfo(1, head.PackInfo.Sum);
                pd.StartTime = pd.LastRTime = new Date();
                AllPackData[sid] = pd;
            }

        } else {
            this.JX808Data(head, JT1078.GetMsbBody(head, bGps));
        }
    }
    CheckPackByPD(pd) {
        if (pd.PackData.Count === pd.Head.PackInfo.Sum)//判斷資料包數和總包數是否相等
        {
            let tmpbuf = new Uint8Array(pd.PackData.totalLength);
            let index = 0;
            for (var i = 1; i <= pd.Head.PackInfo.Sum; i++) {
                tmpbuf.set(pd.PackData[i], index);
                index += pd.PackData[i].byteLength;
            }
            //解析失敗時返回true防止一直解析此錯誤資料
            try {
                this.JX808Data(pd.Head, tmpbuf);
            }
            catch (ex) {
                console.error(ex);
                return true;
            }
            return true;
        }
        return false;
    }
    JX808Data(head, bts) {
        switch (head.MsgId) {

            case 0x1205://終端上傳音視訊資源列表資料格式
                var videolist = new JT1078.JTVideoListInfo();
                videolist.FillByBinary(bts, 0);
                this.onCallback[0x1205](videolist);
                break;
            case 0x1206://終端上傳FTP完成通知
                var fileUploadEndInform = new JT1078.JTVideoFileUploadEndInform();
                fileUploadEndInform.FillByBinary(bts, 0);
                this.onCallback[0x1206](fileUploadEndInform);
                break;
            default:
        }
    }

    printState() {
        switch (this.websocket.readyState) {
            case 0:
                console.log("Websocket808 readyState=CONNECTING");
                break;
            case 1:
                console.log("Websocket808 readyState=OPEN");
                break;
            case 2:
                console.log("Websocket808 readyState=CLOSING");
                break;
            case 3:
                console.log("Websocket808 readyState=CLOSED");
                break;
        }
    }

    sendData(data)
    {
        this.websocket.send(data);
    }

    sendMessage(data, callback, id) {

        this.onCallback[id] = callback;
        if (this.websocket.readyState === 0) {
            this.sendCache.enqueue(data);
        } else if (this.websocket.readyState === 2 || this.websocket.readyState === 3) {
            this.printState();
            this.sendCache.enqueue(data);
            this.connect();
        } else {
            this.sendData(data);
        }

    }

}

export default WebsocketCallbackLoader;