1. 程式人生 > >用.NET MVC實現長輪詢,與jQuery.AJAX即時雙向通訊

用.NET MVC實現長輪詢,與jQuery.AJAX即時雙向通訊

首先感謝300位註冊使用者,讓我有充足的資料進行重構和優化。所以這兩週都在進行大重構。

其中最大的一個問題就是資料流量過大,原先已有更新,還會有Web傳統“重新整理”的形式把資料重新拿一次,然後再替換掉本地資料。

但這一拿問題就來了,在10個Chat*300個使用者的情況下,這一拿產生了一次8M多的流量,這是十分嚴重的事情,特別是其中絕大部分資料都是浪費掉了的。

那麼解決方案就很簡單了,把“全量”改成“增量”,只傳輸修改的部分,同時大量增加往返次數,把每次往返量壓縮。

當然,這篇文章主要講長輪詢,也是之後被問得比較多的方面,所以就單獨寫篇文章出來了。

這次比單純的輪詢多了一個快取行為,以解決每次“心跳”中所產生的斷線間隔資料丟失的問題。

首先列舉一下所使用到的技術點:

  • jQuery.Ajax
  • .NET同步(lock)與非同步(async await Task)
  • MVC非同步頁面

長輪詢的簡介

長輪詢是一種類似於JSONP一樣畸形的Web通訊技術,用以實現Web與服務端之間的實時雙向通訊。

在有人實現JSONP之前,單純的JS或者說Web是無法實現原生地有效地實現跨域通訊的;而在有了JSONP之後,這項工作就變得簡單了,雖然實現方法很“畸形(或者說有創意吧)”。

同樣,在有長輪詢之前,還沒出現HTML5 Web Socket的時代,單純的Web無法與伺服器進行實時通訊,HTTP限制了通訊行為只能是有客戶端發起請求,然後服務端針對該請求進行迴應。

長輪詢所做的就是把原有的協議“漏洞”利用起來,使得客戶端和服務端之間在HTML 4.1(部分更低版本應該也可以相容)下可以實時通訊。

長輪詢的原理

HTTP協議本身有兩個“漏洞”,也是現在網路通訊中無法避免的。

一個是請求(Request)和答覆(Response)之間無法確認其連線狀況,可就無法確定其所用的時限了。

判斷客戶端與服務端是否相連的一個標準就是客戶端的請求是否能收到服務端的答覆,如果收得到,就說明連線上了,即時收到的是服務端錯誤的通知(比如404 not found)。

第二漏洞就是在獲取到答覆(Response)前,都無法知道所需要的資料內容是怎麼樣的(如果有還跟人家要啥)。

長輪詢就是利用了這兩個“漏洞”:服務端收到請求(Request)後,將該請求Hold住不馬上答覆,而是一直等,等到服務端有資訊需要傳送給客戶端的時候,通過將剛才Hold住的那條請求(Request)的答覆(Response)發回給客戶端,讓客戶端作出反應。而返回的內容,呵呵呵呵呵,那就隨便服務端了。

然後,客戶端收到答覆(Response)後,馬上再重新發送一次請求(Request)給服務端,讓服務端再Hold住這條連線。周而復始,就實現了從服務端向客戶端傳送訊息的實時通訊,客戶端向服務端傳送訊息則依舊利用傳統的Post和Get進行。

受Web通訊現實情況限制,如果服務端長時間沒有訊息需要推送到客戶端的時候,也不能一直Hold住那條連結,因為很有可能被判定為閘道器超時等超時情況。所以即使沒有訊息,每間隔一段時間,服務端也要返回一個答覆(Response),讓客戶端重新請求一個連結。

見過一些人喜歡把每次輪詢的斷開到下次輪詢開始客戶端的接收->再請求的行為稱之為一次“心跳(Beat)”,也挺貼切的。

要實現真正的實時通訊,長輪詢的實現並不那麼簡單,因為每次“心跳”時會產生一個小間隙,這個間隙的時候服務端已經將上一個答覆(Response)返回,但還沒有接收到客戶端的下一次請求(Request)。那麼這時候,服務端如果有最新訊息,就無法推送給客戶端了,所以需要將這些訊息快取起來,等到下一次機會到來的時候再XXOO。

jQuery.AJAX

如果是AJAX的話,一般都是用jQuery進行實現。況且,畢竟還用了JSONP,手動寫起來在工作中實在不划算。

到了Web端的程式碼,就變得很容易了,以下內容直接從專案中節選,只是作了一些山間

 1     getJsonp: function (url, data, callback, errorCallback) {
 2         $.ajax({
 3             url: url,
 4             data: data,
 5             type: "POST",
 6             dataType: "jsonp",
 7             jsonpCallback: "callback" + Math.random().toString().replace('.', ''),
 8             success: callback,
 9             error: errorCallback
10         });
11     },
12     //輪詢的鎖,保證每個輪詢有且僅有一個
13     pollingLocks: {
14     },
15     //輪詢的重試時間
16     pollingRetries: {
17     },
18     //輪詢錯誤的callBack快取
19     pollingCallbacks: [],
20     //輪詢
21     //listeningCode: 監聽編碼,與伺服器的一個契約,單個監聽編碼在伺服器中有對應的一個緩衝池,以保留該監聽相關資訊
22     //url: 目標地址
23     //data: 請求時的引數
24     //lockName: 鎖名,同樣的鎖名在同一時間只會出現一個輪詢
25     //callbakc: 接收到服務端資料後的回撥
26     polling: function (listeningCode, url, data, lockName, callback) {
27         var comet = chatConnectionProvider.connections.comet;
28 
29         //判斷是否有鎖,排他,不允許重複監聽,保持單一連結
30         if (!comet.pollingLocks[lockName]) {
31             //鎖住監聽
32             comet.pollingLocks[lockName] = true;
33             comet.getJsonp(url, data, function (cometCallbackData) {
34                 var listeningCode = cometCallbackData.ListeningCode;
35                 //將訊息發回
36                 for (var i in cometCallbackData.Callbacks) {
37                     callback(cometCallbackData.Callbacks[i]);
38                 }
39                 //將監聽編碼新增到請求資料中,以和伺服器的監聽編碼保持一致
40                 data = data || {};
41                 data.listeningCode = cometCallbackData.ListeningCode;
42                 //解鎖後繼續監聽
43                 comet.pollingLocks[lockName] = false;
44                 comet.polling(listeningCode, url, data, lockName, callback);
45             }, function (jqXHR, textStatus, errorThrown) {
46                 //如果發生錯誤,則重試,並且逐步加大重試時間,以減低伺服器壓力,以100毫秒開始,每次加倍
47                 comet.pollingRetries[lockName] = comet.pollingRetries[lockName] * 2 || 100;
48                 //將回調函式暫存
49                 chatConnectionProvider.connections.comet.pollingCallbacks[lockName] = callback;
50                 var rePollingMethors = 'chatConnectionProvider.connections.comet.pollingLocks["' + lockName + '"] = false;'//先解鎖,在解鎖之前排他,不允許重複輪詢
51                     + 'chatConnectionProvider.connections.comet.polling("' + listeningCode + '", "' + url + '", "' + data + '", "' + lockName + '", chatConnectionProvider.connections.comet.pollingCallbacks["' + lockName + '"]);';
52                 setTimeout(rePollingMethors, comet.pollingRetries[lockName]);
53             });
54         }
55     },
JS部分程式碼

.NET MVC中的非同步

一開始我花了比較長時間尋找服務端Hold住請求的方法。

普通情況下,一個Web的請求是同步執行的,如果需要轉成非同步的話,需要對執行緒進行操作。比如一開始我最白痴的想法是用自旋鎖,或者用Thread相關的方法,然後在需要的時候採用一些Interup方法進行中斷等等,都不容易寫。

後來發現MVC中提供了比較合理的一種原生的非同步頁面方式,可以簡單地實現同步轉非同步。

首先是Controller要由預設的Controller改為繼承自AsyncController。該基類有一個私有成員AsyncManager,利用該物件可以簡單地將同步轉換成非同步。

而原本有的方法,要拆分成兩個方法來寫,分別在兩個方法用原名加上Async和Completed。

比如我的ListenController,裡面有一個User方法,用以監聽使用者的資料。經過實現之後,就變成了ListenController : AsyncController,同時擁有一對User方法:UserAsync和UserCompleted。

那麼,在頁面請求Listen/User的時候,就會自動呼叫名稱匹配的UserAsync方法。

在這之後,我們就需要利用AsyncManager執行以下語句,將執行緒“掛起”(Hold住,這樣懂了吧):

asyncManager.OutstandingOperations.Increment();

直到我們有訊息需要傳送給使用者的時候,通過以下方式對UserCompleted進行傳參:

asyncManager.Parameters["listeningCode"] = Code;

然後再觸發UserCompleted:

asyncManager.OutstandingOperations.Decrement();

再整體地看一次,ListenController就是長這個樣子的:

 1     public class ListenController : AsyncController
 2     {
 3         //
 4         // GET: /Listen/
 5 
 6         ICometManager cometManager;
 7 
 8         public ListenController()
 9         {
10             cometManager = StructureMap.ObjectFactory.GetInstance<ICometManager>();
11         }
12 
13         /// <summary>
14         /// 監聽使用者的資訊
15         /// </summary>
16         /// <param name="listeningCode">監聽編碼,如果為空則視為一次全新的監聽,允許同以客戶端開啟多個網頁進行多個監聽</param>
17         public void UserAsync(int? listeningCode)
18         {
19             //開始監聽使用者
20             cometManager.ListenUser(listeningCode, AsyncManager);
21         }
22 
23         /// <summary>
24         /// 返回使用者的資訊
25         /// </summary>
26         /// <param name="listeningCode">監聽編碼</param>
27         /// <returns></returns>
28         public JsonpResult UserCompleted(int listeningCode)
29         {
30             //獲取使用者所有的訊息
31             var callbacks = cometManager.TakeAllUserCallbacks(listeningCode);
32 
33             //將該訊息返回
34             return Json(new
35             {
36                 ListeningCode = listeningCode,
37                 Callbacks = callbacks.Select(item => new CallbackModel(item))
38             })
39             .ToJsonp();
40         }
41     }
ListenController

CometManager就是我用來處理輪詢的物件。

注意到在UserCompleted是通過了一個ICometManager.TakeAllUserCallbacks來獲取使用者的所有回撥資料,而不是直接通過AsyncManager.Parameters傳送。原因是實現過程中我發現無法通過AsyncManager.Parameters將自定義物件傳參,所以採取了這種方式。或許,實現序列化後或者引用相關序列化方法,能實現如此傳參。

在CometManager : ICometManager中,相關實現如此:

 1         /// <summary>
 2         /// 監聽使用者的方法
 3         /// </summary>
 4         /// <param name="listeningCode">指定監聽編碼,如果為空則為全新的監聽</param>
 5         /// <param name="asyncManager">監聽來源頁面的AsyncManager,用以處理非同步與回撥</param>
 6         public void ListenUser(int? listeningCode, System.Web.Mvc.Async.AsyncManager asyncManager)
 7         {
 8             //監聽新訊息
 9             userListenerQuery.Add(chatUserProvider.Current.Id, listeningCode, userListenManager, asyncManager);
10         }
11 
12         /// <summary>
13         /// 取走使用者所有回撥結果
14         /// </summary>
15         /// <param name="listeningCode">監聽者的Id</param>
16         /// <returns></returns>
17         public IEnumerable<CallbackModel> TakeAllUserCallbacks(int listeningCode)
18         {
19             return userListenerQuery.TakeAllCallback(listeningCode);
20         }
CometManager節選

userListenerQuery是一個單例(Singleton)的監聽佇列;而UserListenManager是往上一層的監聽管理物件,畢竟Chat本身不單止支援輪詢,還需要支援其他通訊方式,所以往上有一個公共層管理著所有訊息。

.NET中的非同步

除了MVC本身提供的特有方法外,還需要一些傳統的行為才能實現完整的長輪詢。

接著上面,參照ListenQuery的實現:

 1         Dictionary<int, CometListener> listenersDic;
 2         Dictionary<int, DateTime> lastAddTimeDic;
 3 
 4         public ListenerQuery()
 5         {
 6             listenersDic = new Dictionary<int, CometListener>();
 7             lastAddTimeDic = new Dictionary<int, DateTime>();
 8         }
 9 
10         /// <summary>
11         /// 新增一個監聽
12         /// </summary>
13         /// <param name="listenToId">監聽物件的Id</param>
14         /// <param name="listeningCode">原有監聽者的編碼</param>
15         /// <param name="listenManager">監聽的相關業務管理物件</param>
16         /// <param name="asyncManager">頁面的非同步管理物件</param>
17         /// <returns>監聽編碼</returns>
18         public int Add(int listenToId, int? listeningCode, IListenManager<int> listenManager, AsyncManager asyncManager)
19         {
20             lock (listenersDic)
21             {
22                 lock (lastAddTimeDic)
23                 {
24                     CometListener listener;
25                     //如果監聽者不存在,則生成,否則用原有的監聽者
26                     if (listeningCode == null || !listenersDic.ContainsKey(listeningCode.Value))
27                     {
28                         ////生成其隨機編碼
29                         //var seed = 10000;
30                         //var random = new Random(seed);
31                         //listeningCode = random.Next(seed);
32                         //while (listenersDic.ContainsKey(listeningCode.Value))
33                         //{
34                         //    listeningCode = random.Next(seed);
35                         //}
36                         //改為採用原有編碼
37 
38                         //生成監聽者並開始監聽
39                         Action<int> setListenerCode;
40                         listener = new CometListener(out setListenerCode);
41                         listenManager.ListenAsnyc(listenToId, listener, setListenerCode);
42 
43                         listeningCode = listener.Code;
44                         //新增入本列表字典
45                         listenersDic.Add(listeningCode.Value, listener);
46                         //新增監聽時間
47                         lastAddTimeDic.Add(listeningCode.Value, DateTime.Now);
48                     }
49                     else
50                     {
51                         listener = listenersDic[listeningCode.Value];
52                         lastAddTimeDic[listeningCode.Value] = DateTime.Now;
53                     }
54 
55                     //開始監聽
56                     listener.Begin(asyncManager);
57 
58                     //定時一次檢查,如果監聽超時,則清除監聽
59                     //設計倒計時,定期重新監聽,以免超時
60                     var timeLimitInMilliSecond = 60000;
61                     System.Timers.Timer timer = new System.Timers.Timer(timeLimitInMilliSecond);
62 
63                     //設定計時終結方法
64                     timer.Elapsed += (sender, e) =>
65                     {
66                         if (lastAddTimeDic[listeningCode.Value].AddSeconds(45) < DateTime.Now)
67                         {
68                             listenManager.StopListenAsnyc(listener);
69                         }
70                     };
71 
72                     //啟動倒計時
73                     timer.Start();
74                 }
75             }
76 
77             return listeningCode.Value;
78         }
79 
80         /// <summary>
81         /// 取走所有回撥結果
82         /// </summary>
83         /// <param name="listeningCode">監聽者的Id</param>
84         /// <returns></returns>
85         public IEnumerable<CallbackModel> TakeAllCallback(int listeningCode)
86         {
87             return listenersDic[listeningCode].ShiftAllCallbacks();
88         }
89     }
ListenerQuery

這裡用了一個字典來記錄每個ListeningCode以及相關的Listener。

注意Add方法內有一個Timer。就像註釋上所說的,定期檢查使用者是否在監聽。我在這裡設定了每30秒有一次“心跳”(Beat),而每次監聽後的第60秒會來檢查45秒內(暫時這麼設定的,有待時間考驗是不是個合適值)使用者是否再來監聽,如果沒有則停止監聽。

這麼做的原因是防止客戶端單方面離婚毀約,然後服務端的Comet傻傻地在這裡痴情地幫客戶端繼續保留快取訊息。這種情況時有出現,比如客戶端還沒等到答覆(Response)就私奔關掉了頁面,留下服務單在那邊Hold住連線傻傻地等待。

注意凡是處理佇列類的地方都有鎖,以防止併發問題。

那麼最後,CometListener的實現就如下:

  1     public class CometListener : Listen.IListener
  2     {
  3         AsyncManager asyncManager;
  4 
  5         List<CallbackModel> callbacks;
  6 
  7         /// <summary>
  8         /// 建構函式
  9         /// </summary>
 10         public CometListener(out Action<int> setListenerCode)
 11         {
 12             setListenerCode = setCode;
 13 
 14             callbacks = new List<CallbackModel>();
 15         }
 16 
 17         internal void setCode(int code)
 18         {
 19             this.Code = code;
 20         }
 21 
 22         /// <summary>
 23         /// 開始監聽的方法
 24         /// </summary>
 25         /// <param name="asyncManager">頁面的非同步處理物件</param>
 26         public void Begin(AsyncManager asyncManager)
 27         {
 28             //先把原有資料返回
 29             Return();
 30 
 31             lock (asyncManager)
 32             {
 33                 this.asyncManager = asyncManager;
 34                 lock (this.asyncManager)
 35                 {
 36                     //啟動非同步
 37                     asyncManager.OutstandingOperations.Increment();
 38 
 39                     //設計倒計時,定期斷開監聽,以免閘道器超時
 40                     var timeLimitInMilliSecond = 30000;
 41                     System.Timers.Timer timer = new System.Timers.Timer(timeLimitInMilliSecond);
 42 
 43                     //設定計時終結方法
 44                     timer.Elapsed += (sender, e) =>
 45                     {
 46                         if (this.asyncManager == asyncManager)
 47                         {
 48                             Return();
 49                         }
 50                     };
 51 
 52                     //啟動倒計時
 53                     timer.Start();
 54                 }
 55             }
 56         }
 57 
 58         /// <summary>
 59         /// 將現有的值返回給客戶端
 60         /// </summary>
 61         public void Return()
 62         {
 63             if (asyncManager != null)
 64             {
 65                 lock (asyncManager)
 66                 {
 67                     //返回最新值
 68                     asyncManager.Parameters["listeningCode"] = Code;
 69 
 70                     //返回最新值
 71                     asyncManager.OutstandingOperations.Decrement();
 72 
 73                     //清空當前頁面非同步物件,以等待下一個輪詢請求
 74                     asyncManager = null;
 75                 }
 76             }
 77         }
 78 
 79         /// <summary>
 80         /// 拿走並清除callbacks
 81         /// </summary>
 82         public IEnumerable<CallbackModel> ShiftAllCallbacks()
 83         {
 84             lock (callbacks)
 85             {
 86                 var result = callbacks.ToList();
 87                 callbacks.Clear();
 88                 return result;
 89             }
 90         }
 91 
 92 
 93         #region IListener members
 94 
 95         /// <summary>
 96         /// 唯一的監聽編碼,用以隔開並區分監聽
 97         /// </summary>
 98         public int Code
 99         {
100             get;
101             private set;
102         }
103 
104         /// <summary>
105         /// 回撥方法,通過該方法將新的資料傳送回給監聽者
106         /// </summary>
107         /// <param name="typeCode">資料的型別</param>
108         /// <param name="data">資料內容</param>
109         /// <returns></returns>
110         public async Task CallAsync(int typeCode, object args)
111         {
112             lock (callbacks)
113             {
114                 callbacks.Add(new CallbackModel(typeCode, args));
115             }
116             Return();
117         }
118 
119         #endregion
120     }
CometListener

總結

兩週前單次通訊的往返大約在200ms~300ms之間,這次重構後,將Chat核心中大量同步行為改成了非同步併發,已經將單次通訊往返壓縮在了30ms~50ms之間。當然最希望是能壓縮在10ms~20ms,那樣就可以用長輪詢進行高同步性的遊戲應用了,比如射擊、即時戰略。但是,到時候就沒那麼簡單了吧,畢竟心跳(Beat)的時候是會有兩次往返,也就是必須將單次往返壓縮在10ms以內才有可能實現,頁面的資料支撐也是個問題,需要大量套用字頁面來存放資料,Balabalabalabala.......

和JSONP一樣,長輪詢是一個畸形的技術,也更加是開發人員在備受顯示情況限制下智慧的結晶。當然,從通訊上來講,它不是一項“優秀”的技術或者協議,它浪費了太多“