JavaScript WebSocket實現長連線通訊連線重連
阿新 • • 發佈:2018-12-05
JavaScript-JS依靠WebSoket也可以像其他語言一樣能夠實現通訊程式碼,不過使用的時候需要注意檢查連線斷開的情況。最基本的必須保證IP和埠都可以訪問。
目錄
WebSocket程式碼及測試
呼叫初始化程式碼
CvNetVideo.Websocket = new WebSocket('ws://' + websocetHost + ":" + websocketPort); CvNetVideo.WebsocketCallbackLoader = new WebsocketCallbackLoader(); CvNetVideo.WebsocketCallbackLoader.connect(CvNetVideo.Websocket);
通訊示例程式碼
/* * WebsocketCallback Loader */ import JT1078 from '../JX/JTT1078'; class WebsocketCallbackLoader { constructor() { this.websocket = undefined; this.onCallback = undefined; this.sendMessage = this.sendMessage; this.medisPackageAnalysis = new JT1078.MedisPackageAnalysis(); this.splitData = new JT1078.SplitData(); this.checkHelper = new JT1078.CheckHelper(); //分包快取資料 按SIM做key this.AllSimPackData = {}; } connect(websocket) { this.websocket = websocket; this.websocket.onopen = this.openSocket.bind(this); this.websocket.onclose = this.closeSocket.bind(this); this.websocket.binaryType = 'arraybuffer'; this.websocket.onmessage = this.receiveSocketMessage.bind(this); } closeSocket(e) { console.log('Websocket808 Disconnected!'); } openSocket() { console.log('Websocket808 Open!'); } receiveSocketMessage(event) { let data = new Uint8Array(event.data); this.splitData.SplitDataBy7E(data, 18, data.byteLength - 18, (bGps) => { if (bGps.length > 11 && this.checkHelper.CheckXOR(bGps, 0, bGps.length - 1) === bGps[bGps.length - 1])//效驗通過 { this.JX808DataPD(new Uint8Array(bGps)); } }); } JX808DataPD(bGps) { let head = new JT1078.JTHeader(); head.FillByBinary(bGps, 0); if (head.PackInfo) { var sid = JT1078.GetFirstSerialNumber(head); let AllPackData = AllSimPackData[head.Sim]; if (!AllPackData) { AllPackData = {}; AllSimPackData[head.Sim] = AllPackData; } var pd = AllPackData[sid]; if (pd) { if (!pd.PackData[head.PackInfo.Index]) { pd.PackData.Count += 1; } pd.PackData[head.PackInfo.Index] = JT1078.GetMsbBody(head, bGps); pd.LastRTime = DateTime.Now; if (this.CheckPackByPD(pd)) AllPackData[sid] = null; } else { pd = {}; pd.PackData = {}; pd.PackData[head.PackInfo.Index] = JT1078.GetMsbBody(head, bGps); pd.PackData.Count = 1; pd.Head = new JT1078.JTHeader(head.MsgId, head.Sub, head.DataEncryptType, head.MsgLen, head.Sim, sid) pd.Head.PackInfo = new JT1078.JTPackageInfo(1, head.PackInfo.Sum); pd.StartTime = pd.LastRTime = DateTime.Now; AllPackData[sid] = pd; } } else { this.JX808Data(head, JT1078.GetMsbBody(head, bGps)); } } CheckPackByPD(pd) { if (pd.PackData.Count === pd.Head.PackInfo.Sum)//判斷資料包數和總包數是否相等 { let tmpbuf = new Uint8Array(); let index = 0; for (var i = 0; i < pd.Head.PackInfo.Sum; i++) { tmpbuf.set(pd.PackData[i], index); index += pd.PackData[i].byteLength; } //解析失敗時返回true防止一直解析此錯誤資料 try { this.JX808Data(pd.Head, tmpbuf); } catch (ex) { console.error(ex); return true; } return true; } return false; } JX808Data(head, bts) { switch (head.MsgId) { case 0x1205://終端上傳音視訊資源列表資料格式 var videolist = new JT1078.JTVideoListInfo(); videolist.FillByBinary(bts, 0); this.onCallback(videolist); break; case 0x1206://終端上傳FTP完成通知 break; default: } } sendMessage(data, callback) { this.onCallback = callback; switch (this.websocket.readyState) { case 0: console.log("Websocket808 readyState=CONNECTING"); break; case 1: this.websocket.send(data); break; case 2: console.log("Websocket808 readyState=CLOSING"); break; case 3: console.log("Websocket808 readyState=CLOSED"); break; } } } export default WebsocketCallbackLoader;
測試效果
WebSocket連線關閉重連處理
連線斷開問題
程式碼實現重連
改造一下呼叫程式碼和傳送訊息程式碼:
static initWebsocket(websocetHost, websocketPort) {
CvNetVideo.WebsocketCallbackLoader = new WebsocketCallbackLoader(websocetHost, websocketPort);
CvNetVideo.WebsocketCallbackLoader.connect();
}
/* * WebsocketCallback Loader */ import JT1078 from '../JX/JTT1078'; import { setTimeout } from 'timers'; class WebsocketCallbackLoader { constructor(host, port) { this.host = host; this.port = port; this.websocket = undefined; this.onCallback = undefined; this.sendMessage = this.sendMessage; this.medisPackageAnalysis = new JT1078.MedisPackageAnalysis(); this.splitData = new JT1078.SplitData(); this.checkHelper = new JT1078.CheckHelper(); //分包快取資料 按SIM做key this.AllSimPackData = {}; } connect() { var url = "ws://" + this.host + ":" + this.port; console.log("Websocket808 WebSocket connect ..." + url); this.websocket = new WebSocket(url); this.websocket.onopen = this.openSocket.bind(this); this.websocket.onclose = this.closeSocket.bind(this); this.websocket.binaryType = 'arraybuffer'; this.websocket.onmessage = this.receiveSocketMessage.bind(this); } closeSocket(e) { console.log('Websocket808 Disconnected!'); } openSocket() { console.log('Websocket808 Open!'); } receiveSocketMessage(event) { let data = new Uint8Array(event.data); this.splitData.SplitDataBy7E(data, 18, data.byteLength - 18, (bGps) => { if (bGps.length > 11 && this.checkHelper.CheckXOR(bGps, 0, bGps.length - 1) === bGps[bGps.length - 1])//效驗通過 { this.JX808DataPD(new Uint8Array(bGps)); } }); } JX808DataPD(bGps) { let head = new JT1078.JTHeader(); head.FillByBinary(bGps, 0); if (head.PackInfo) { var sid = JT1078.GetFirstSerialNumber(head); let AllPackData = AllSimPackData[head.Sim]; if (!AllPackData) { AllPackData = {}; AllSimPackData[head.Sim] = AllPackData; } var pd = AllPackData[sid]; if (pd) { if (!pd.PackData[head.PackInfo.Index]) { pd.PackData.Count += 1; } pd.PackData[head.PackInfo.Index] = JT1078.GetMsbBody(head, bGps); pd.LastRTime = DateTime.Now; if (this.CheckPackByPD(pd)) AllPackData[sid] = null; } else { pd = {}; pd.PackData = {}; pd.PackData[head.PackInfo.Index] = JT1078.GetMsbBody(head, bGps); pd.PackData.Count = 1; pd.Head = new JT1078.JTHeader(head.MsgId, head.Sub, head.DataEncryptType, head.MsgLen, head.Sim, sid) pd.Head.PackInfo = new JT1078.JTPackageInfo(1, head.PackInfo.Sum); pd.StartTime = pd.LastRTime = DateTime.Now; AllPackData[sid] = pd; } } else { this.JX808Data(head, JT1078.GetMsbBody(head, bGps)); } } CheckPackByPD(pd) { if (pd.PackData.Count === pd.Head.PackInfo.Sum)//判斷資料包數和總包數是否相等 { let tmpbuf = new Uint8Array(); let index = 0; for (var i = 0; i < pd.Head.PackInfo.Sum; i++) { tmpbuf.set(pd.PackData[i], index); index += pd.PackData[i].byteLength; } //解析失敗時返回true防止一直解析此錯誤資料 try { this.JX808Data(pd.Head, tmpbuf); } catch (ex) { console.error(ex); return true; } return true; } return false; } JX808Data(head, bts) { switch (head.MsgId) { case 0x1205://終端上傳音視訊資源列表資料格式 var videolist = new JT1078.JTVideoListInfo(); videolist.FillByBinary(bts, 0); this.onCallback(videolist); break; case 0x1206://終端上傳FTP完成通知 break; default: } } printState() { switch (this.websocket.readyState) { case 0: console.log("Websocket808 readyState=CONNECTING"); break; case 1: console.log("Websocket808 readyState=OPEN"); break; case 2: console.log("Websocket808 readyState=CLOSING"); break; case 3: console.log("Websocket808 readyState=CLOSED"); break; } } sendData(data) { this.websocket.send(data); } sendMessage(data, callback) { this.onCallback = callback; this.printState(); if (this.websocket.readyState === 0) { setTimeout(() => { this.sendData(data); }, 3000); } else if (this.websocket.readyState === 2 || this.websocket.readyState === 3) { this.connect(); setTimeout(() => { this.printState(); this.sendData(data); }, 3000); } else { this.sendData(data); } } } export default WebsocketCallbackLoader;
重連效果
下面是連線斷開後重連效果:
注意:重連是非同步呼叫的,使用setTimeout(func,ms) 呼叫,因為連線建立需要時間,直接連很有可能正在連線。
使用佇列處理連線失敗資料
/*
* WebsocketCallback Loader
*/
import JT1078 from '../JX/JTT1078';
import Queue from '../utils/queue';
class WebsocketCallbackLoader{
constructor(host, port) {
this.host = host;
this.port = port;
this.websocket = undefined;
this.medisPackageAnalysis = new JT1078.MedisPackageAnalysis();
this.splitData = new JT1078.SplitData();
this.checkHelper = new JT1078.CheckHelper();
//分包快取資料 按SIM做key
this.AllSimPackData = {};
this.sendCache = new Queue();
this.onCallback = {};
}
connect() {
var url = "ws://" + this.host + ":" + this.port;
this.websocket = new WebSocket(url);
this.websocket.onopen = this.openSocket.bind(this);
this.websocket.onclose = this.closeSocket.bind(this);
this.websocket.binaryType = 'arraybuffer';
this.websocket.onmessage = this.receiveSocketMessage.bind(this);
}
closeSocket(e) {
console.log('Websocket808 Disconnected!');
}
openSocket() {
console.log('Websocket808 Open!');
while (this.sendCache.size() > 0) {
this.sendData(this.sendCache.dequeue());
}
}
receiveSocketMessage(event) {
let data = new Uint8Array(event.data);
this.splitData.SplitDataBy7E(data, 18, data.byteLength - 18, (bGps) => {
if (bGps.length > 11 && this.checkHelper.CheckXOR(bGps, 0, bGps.length - 1) === bGps[bGps.length - 1])//效驗通過
{
this.JX808DataPD(new Uint8Array(bGps));
}
});
}
JX808DataPD(bGps) {
let head = new JT1078.JTHeader();
head.FillByBinary(bGps, 0);
if (head.PackInfo) {
var sid = JT1078.GetFirstSerialNumber(head);
let AllPackData = this.AllSimPackData[head.Sim];
if (!AllPackData) {
AllPackData = {};
this.AllSimPackData[head.Sim] = AllPackData;
}
var pd = AllPackData[sid];
if (pd) {
if (!pd.PackData[head.PackInfo.Index]) {
pd.PackData.Count += 1;
pd.PackData.totalLength += head.MsgLen;
}
pd.PackData[head.PackInfo.Index] = JT1078.GetMsbBody(head, bGps);
pd.LastRTime = new Date();
if (this.CheckPackByPD(pd))
AllPackData[sid] = null;
}
else {
pd = {};
pd.PackData = {};
pd.PackData[head.PackInfo.Index] = JT1078.GetMsbBody(head, bGps);
pd.PackData.Count = 1;
pd.PackData.totalLength = head.MsgLen;
pd.Head = new JT1078.JTHeader(head.MsgId, head.Sub, head.DataEncryptType, head.MsgLen, head.Sim, sid);
pd.Head.PackInfo = new JT1078.JTPackageInfo(1, head.PackInfo.Sum);
pd.StartTime = pd.LastRTime = new Date();
AllPackData[sid] = pd;
}
} else {
this.JX808Data(head, JT1078.GetMsbBody(head, bGps));
}
}
CheckPackByPD(pd) {
if (pd.PackData.Count === pd.Head.PackInfo.Sum)//判斷資料包數和總包數是否相等
{
let tmpbuf = new Uint8Array(pd.PackData.totalLength);
let index = 0;
for (var i = 1; i <= pd.Head.PackInfo.Sum; i++) {
tmpbuf.set(pd.PackData[i], index);
index += pd.PackData[i].byteLength;
}
//解析失敗時返回true防止一直解析此錯誤資料
try {
this.JX808Data(pd.Head, tmpbuf);
}
catch (ex) {
console.error(ex);
return true;
}
return true;
}
return false;
}
JX808Data(head, bts) {
switch (head.MsgId) {
case 0x1205://終端上傳音視訊資源列表資料格式
var videolist = new JT1078.JTVideoListInfo();
videolist.FillByBinary(bts, 0);
this.onCallback[0x1205](videolist);
break;
case 0x1206://終端上傳FTP完成通知
var fileUploadEndInform = new JT1078.JTVideoFileUploadEndInform();
fileUploadEndInform.FillByBinary(bts, 0);
this.onCallback[0x1206](fileUploadEndInform);
break;
default:
}
}
printState() {
switch (this.websocket.readyState) {
case 0:
console.log("Websocket808 readyState=CONNECTING");
break;
case 1:
console.log("Websocket808 readyState=OPEN");
break;
case 2:
console.log("Websocket808 readyState=CLOSING");
break;
case 3:
console.log("Websocket808 readyState=CLOSED");
break;
}
}
sendData(data)
{
this.websocket.send(data);
}
sendMessage(data, callback, id) {
this.onCallback[id] = callback;
if (this.websocket.readyState === 0) {
this.sendCache.enqueue(data);
} else if (this.websocket.readyState === 2 || this.websocket.readyState === 3) {
this.printState();
this.sendCache.enqueue(data);
this.connect();
} else {
this.sendData(data);
}
}
}
export default WebsocketCallbackLoader;