C#實現請求唯一性校驗支援高併發
阿新 • • 發佈:2019-09-30
使用場景描述:
網路請求中經常會遇到傳送的請求,服務端響應是成功的,但是返回的時候出現網路故障,導致客戶端無法接收到請求結果,那麼客戶端程式可能判斷為網路故障,而重複傳送同一個請求。當然如果介面中定義了請求結果查詢介面,那麼這種重複會相對少一些。特別是交易類的資料,這種操作更是需要避免重複傳送請求。另外一種情況是使用者過於快速的點選介面按鈕,產生連續的相同內容請求,那麼後端也需要進行過濾,這種一般出現在系統對接上,無法去控制第三方系統的業務邏輯,需要從自身業務邏輯裡面去限定。
其他需求描述:
這類請求一般存在時間範圍和高併發的特點,就是短時間內會出現重複的請求,因此對模組需要支援高併發性。
技術實現:
對請求的業務內容進行MD5摘要,並且將MD5摘要儲存到快取中,每個請求資料都通過這個一個公共的呼叫的方法進行判斷。
程式碼實現:
公共呼叫程式碼 UniqueCheck 採用單例模式建立唯一物件,便於在多執行緒呼叫的時候,只訪問一個統一的快取庫
/* * volatile就像大家更熟悉的const一樣,volatile是一個型別修飾符(type specifier)。 * 它是被設計用來修飾被不同執行緒訪問和修改的變數。 * 如果沒有volatile,基本上會導致這樣的結果:要麼無法編寫多執行緒程式,要麼編譯器失去大量優化的機會。 */ private static readonly object lockHelper = new object(); private volatile static UniqueCheck _instance; /// <summary> /// 獲取單一例項 /// </summary> /// <returns></returns> public static UniqueCheck GetInstance() { if (_instance == null) { lock (lockHelper) { if (_instance == null) _instance = new UniqueCheck(); } } return _instance; }
這裡需要注意volatile的修飾符,在實際測試過程中,如果沒有此修飾符,在高併發的情況下會出現報錯。
自定義一個可以進行併發處理佇列,程式碼如下:ConcurrentLinkedQueue
1 using System; 2 using System.Collections.Generic; 3 using System.Text; 4 using System.Threading; 5 6 namespace PackgeUniqueCheck 7 { 8 /// <summary> 9 /// 非加鎖併發佇列,處理100個併發數以內 10 /// </summary> 11 /// <typeparam name="T"></typeparam> 12 public class ConcurrentLinkedQueue<T> 13 { 14 private class Node<K> 15 { 16 internal K Item; 17 internal Node<K> Next; 18 19 public Node(K item, Node<K> next) 20 { 21 this.Item = item; 22 this.Next = next; 23 } 24 } 25 26 private Node<T> _head; 27 private Node<T> _tail; 28 29 public ConcurrentLinkedQueue() 30 { 31 _head = new Node<T>(default(T), null); 32 _tail = _head; 33 } 34 35 public bool IsEmpty 36 { 37 get { return (_head.Next == null); } 38 } 39 /// <summary> 40 /// 進入佇列 41 /// </summary> 42 /// <param name="item"></param> 43 public void Enqueue(T item) 44 { 45 Node<T> newNode = new Node<T>(item, null); 46 while (true) 47 { 48 Node<T> curTail = _tail; 49 Node<T> residue = curTail.Next; 50 51 //判斷_tail是否被其他process改變 52 if (curTail == _tail) 53 { 54 //A 有其他process執行C成功,_tail應該指向新的節點 55 if (residue == null) 56 { 57 //C 其他process改變了tail節點,需要重新取tail節點 58 if (Interlocked.CompareExchange<Node<T>>( 59 ref curTail.Next, newNode, residue) == residue) 60 { 61 //D 嘗試修改tail 62 Interlocked.CompareExchange<Node<T>>(ref _tail, newNode, curTail); 63 return; 64 } 65 } 66 else 67 { 68 //B 幫助其他執行緒完成D操作 69 Interlocked.CompareExchange<Node<T>>(ref _tail, residue, curTail); 70 } 71 } 72 } 73 } 74 /// <summary> 75 /// 佇列取資料 76 /// </summary> 77 /// <param name="result"></param> 78 /// <returns></returns> 79 public bool TryDequeue(out T result) 80 { 81 Node<T> curHead; 82 Node<T> curTail; 83 Node<T> next; 84 while (true) 85 { 86 curHead = _head; 87 curTail = _tail; 88 next = curHead.Next; 89 if (curHead == _head) 90 { 91 if (next == null) //Queue為空 92 { 93 result = default(T); 94 return false; 95 } 96 if (curHead == curTail) //Queue處於Enqueue第一個node的過程中 97 { 98 //嘗試幫助其他Process完成操作 99 Interlocked.CompareExchange<Node<T>>(ref _tail, next, curTail); 100 } 101 else 102 { 103 //取next.Item必須放到CAS之前 104 result = next.Item; 105 //如果_head沒有發生改變,則將_head指向next並退出 106 if (Interlocked.CompareExchange<Node<T>>(ref _head, 107 next, curHead) == curHead) 108 break; 109 } 110 } 111 } 112 return true; 113 } 114 /// <summary> 115 /// 嘗試獲取最後一個物件 116 /// </summary> 117 /// <param name="result"></param> 118 /// <returns></returns> 119 public bool TryGetTail(out T result) 120 { 121 result = default(T); 122 if (_tail == null) 123 { 124 return false; 125 } 126 result = _tail.Item; 127 return true; 128 } 129 } 130 }
雖然是一個非常簡單的唯一性校驗邏輯,但是要做到高效率,高併發支援,高可靠性,以及低記憶體佔用,需要實現這樣的需求,需要做細緻的模擬測試。
1 using System; 2 using System.Collections.Generic; 3 using System.Text; 4 using System.Threading; 5 using System.Collections; 6 7 namespace PackgeUniqueCheck 8 { 9 public class UniqueCheck 10 { 11 /* 12 * volatile就像大家更熟悉的const一樣,volatile是一個型別修飾符(type specifier)。 13 * 它是被設計用來修飾被不同執行緒訪問和修改的變數。 14 * 如果沒有volatile,基本上會導致這樣的結果:要麼無法編寫多執行緒程式,要麼編譯器失去大量優化的機會。 15 */ 16 private static readonly object lockHelper = new object(); 17 18 private volatile static UniqueCheck _instance; 19 20 /// <summary> 21 /// 獲取單一例項 22 /// </summary> 23 /// <returns></returns> 24 public static UniqueCheck GetInstance() 25 { 26 if (_instance == null) 27 { 28 lock (lockHelper) 29 { 30 if (_instance == null) 31 _instance = new UniqueCheck(); 32 } 33 } 34 return _instance; 35 } 36 37 private UniqueCheck() 38 { 39 //建立一個執行緒安全的雜湊表,作為字典快取 40 _DataKey = Hashtable.Synchronized(new Hashtable()); 41 Queue myqueue = new Queue(); 42 _DataQueue = Queue.Synchronized(myqueue); 43 _Myqueue = new ConcurrentLinkedQueue<string>(); 44 _Timer = new Thread(DoTicket); 45 _Timer.Start(); 46 } 47 48 #region 公共屬性設定 49 /// <summary> 50 /// 設定定時執行緒的休眠時間長度:預設為1分鐘 51 /// 時間範圍:1-7200000,值為1毫秒到2小時 52 /// </summary> 53 /// <param name="value"></param> 54 public void SetTimeSpan(int value) 55 { 56 if (value > 0&& value <=7200000) 57 { 58 _TimeSpan = value; 59 } 60 } 61 /// <summary> 62 /// 設定快取Cache中的最大記錄條數 63 /// 值範圍:1-5000000,1到500萬 64 /// </summary> 65 /// <param name="value"></param> 66 public void SetCacheMaxNum(int value) 67 { 68 if (value > 0 && value <= 5000000) 69 { 70 _CacheMaxNum = value; 71 } 72 } 73 /// <summary> 74 /// 設定是否在控制檯中顯示日誌 75 /// </summary> 76 /// <param name="value"></param> 77 public void SetIsShowMsg(bool value) 78 { 79 Helper.IsShowMsg = value; 80 } 81 /// <summary> 82 /// 執行緒請求阻塞增量 83 /// 值範圍:1-CacheMaxNum,建議設定為快取最大值的10%-20% 84 /// </summary> 85 /// <param name="value"></param> 86 public void SetBlockNumExt(int value) 87 { 88 if (value > 0 && value <= _CacheMaxNum) 89 { 90 _BlockNumExt = value; 91 } 92 } 93 /// <summary> 94 /// 請求阻塞時間 95 /// 值範圍:1-max,根據阻塞增量設定請求阻塞時間 96 /// 阻塞時間越長,阻塞增量可以設定越大,但是請求實時響應就越差 97 /// </summary> 98 /// <param name="value"></param> 99 public void SetBlockSpanTime(int value) 100 { 101 if (value > 0) 102 { 103 _BlockSpanTime = value; 104 } 105 } 106 #endregion 107 108 #region 私有變數 109 /// <summary> 110 /// 內部執行執行緒 111 /// </summary> 112 private Thread _runner = null; 113 /// <summary> 114 /// 可處理高併發的佇列 115 /// </summary> 116 private ConcurrentLinkedQueue<string> _Myqueue = null; 117 /// <summary> 118 /// 唯一內容的時間健值對 119 /// </summary> 120 private Hashtable _DataKey = null; 121 /// <summary> 122 /// 內容時間佇列 123 /// </summary> 124 private Queue _DataQueue = null; 125 /// <summary> 126 /// 定時執行緒的休眠時間長度:預設為1分鐘 127 /// </summary> 128 private int _TimeSpan = 3000; 129 /// <summary> 130 /// 定時計時器執行緒 131 /// </summary> 132 private Thread _Timer = null; 133 /// <summary> 134 /// 快取Cache中的最大記錄條數 135 /// </summary> 136 private int _CacheMaxNum = 500000; 137 /// <summary> 138 /// 執行緒請求阻塞增量 139 /// </summary> 140 private int _BlockNumExt = 10000; 141 /// <summary> 142 /// 請求阻塞時間 143 /// </summary> 144 private int _BlockSpanTime = 100; 145 #endregion 146 147 #region 私有方法 148 private void StartRun() 149 { 150 _runner = new Thread(DoAction); 151 _runner.Start(); 152 Helper.ShowMsg("內部執行緒啟動成功!"); 153 } 154 155 private string GetItem() 156 { 157 string tp = string.Empty; 158 bool result = _Myqueue.TryDequeue(out tp); 159 return tp; 160 } 161 /// <summary> 162 /// 執行迴圈操作 163 /// </summary> 164 private void DoAction() 165 { 166 while (true) 167 { 168 while (!_Myqueue.IsEmpty) 169 { 170 string item = GetItem(); 171 _DataQueue.Enqueue(item); 172 if (!_DataKey.ContainsKey(item)) 173 { 174 _DataKey.Add(item, DateTime.Now); 175 } 176 } 177 //Helper.ShowMsg("當前陣列已經為空,處理執行緒進入休眠狀態..."); 178 Thread.Sleep(2); 179 } 180 } 181 /// <summary> 182 /// 執行定時器的動作 183 /// </summary> 184 private void DoTicket() 185 { 186 while (true) 187 { 188 Helper.ShowMsg("當前資料佇列個數:" + _DataQueue.Count.ToString()); 189 if (_DataQueue.Count > _CacheMaxNum) 190 { 191 while (true) 192 { 193 Helper.ShowMsg(string.Format("當前佇列數:{0},已經超出最大長度:{1},開始進行清理操作...", _DataQueue.Count, _CacheMaxNum.ToString())); 194 string item = _DataQueue.Dequeue().ToString(); 195 if (!string.IsNullOrEmpty(item)) 196 { 197 if (_DataKey.ContainsKey(item)) 198 { 199 _DataKey.Remove(item); 200 } 201 if (_DataQueue.Count <= _CacheMaxNum) 202 { 203 Helper.ShowMsg("清理完成,開始休眠清理執行緒..."); 204 break; 205 } 206 } 207 } 208 } 209 Thread.Sleep(_TimeSpan); 210 } 211 } 212 213 /// <summary> 214 /// 執行緒進行睡眠等待 215 /// 如果當前負載壓力大大超出了執行緒的處理能力 216 /// 那麼需要進行延時呼叫 217 /// </summary> 218 private void BlockThread() 219 { 220 if (_DataQueue.Count > _CacheMaxNum + _BlockNumExt) 221 { 222 Thread.Sleep(_BlockSpanTime); 223 } 224 } 225 #endregion 226 227 #region 公共方法 228 /// <summary> 229 /// 開啟服務執行緒 230 /// </summary> 231 public void Start() 232 { 233 if (_runner == null) 234 { 235 StartRun(); 236 } 237 else 238 { 239 if (_runner.IsAlive == false) 240 { 241 StartRun(); 242 } 243 } 244 245 } 246 /// <summary> 247 /// 關閉服務執行緒 248 /// </summary> 249 public void Stop() 250 { 251 if (_runner != null) 252 { 253 _runner.Abort(); 254 _runner = null; 255 } 256 } 257 258 /// <summary> 259 /// 新增內容資訊 260 /// </summary> 261 /// <param name="item">內容資訊</param> 262 /// <returns>true:快取中不包含此值,佇列新增成功,false:快取中包含此值,佇列新增失敗</returns> 263 public bool AddItem(string item) 264 { 265 BlockThread(); 266 item = Helper.MakeMd5(item); 267 if (_DataKey.ContainsKey(item)) 268 { 269 return false; 270 } 271 else 272 { 273 _Myqueue.Enqueue(item); 274 return true; 275 } 276 } 277 /// <summary> 278 /// 判斷內容資訊是否已經存在 279 /// </summary> 280 /// <param name="item">內容資訊</param> 281 /// <returns>true:資訊已經存在於快取中,false:資訊不存在於快取中</returns> 282 public bool CheckItem(string item) 283 { 284 item = Helper.MakeMd5(item); 285 return _DataKey.ContainsKey(item); 286 } 287 #endregion 288 289 } 290 }
模擬測試程式碼:
private static string _example = Guid.NewGuid().ToString(); private static UniqueCheck _uck = null; static void Main(string[] args) { _uck = UniqueCheck.GetInstance(); _uck.Start(); _uck.SetIsShowMsg(false); _uck.SetCacheMaxNum(20000000); _uck.SetBlockNumExt(1000000); _uck.SetTimeSpan(6000); _uck.AddItem(_example); Thread[] threads = new Thread[20]; for (int i = 0; i < 20; i++) { threads[i] = new Thread(AddInfo); threads[i].Start(); } Thread checkthread = new Thread(CheckInfo); checkthread.Start(); string value = Console.ReadLine(); checkthread.Abort(); for (int i = 0; i < 50; i++) { threads[i].Abort(); } _uck.Stop(); } static void AddInfo() { while (true) { _uck.AddItem(Guid.NewGuid().ToString()); } } static void CheckInfo() { while (true) { Console.WriteLine("開始時間:{0}...", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.ffff")); Console.WriteLine("插入結果:{0}", _uck.AddItem(_example)); Console.WriteLine("結束時間:{0}", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.ffff"));
//調整程序休眠時間,可以測試高併發的情況 //Thread.Sleep(1000); } }
測試截圖:
&n