1. 程式人生 > >C#實現請求唯一性校驗支援高併發

C#實現請求唯一性校驗支援高併發

使用場景描述:

  網路請求中經常會遇到傳送的請求,服務端響應是成功的,但是返回的時候出現網路故障,導致客戶端無法接收到請求結果,那麼客戶端程式可能判斷為網路故障,而重複傳送同一個請求。當然如果介面中定義了請求結果查詢介面,那麼這種重複會相對少一些。特別是交易類的資料,這種操作更是需要避免重複傳送請求。另外一種情況是使用者過於快速的點選介面按鈕,產生連續的相同內容請求,那麼後端也需要進行過濾,這種一般出現在系統對接上,無法去控制第三方系統的業務邏輯,需要從自身業務邏輯裡面去限定。

其他需求描述:

  這類請求一般存在時間範圍和高併發的特點,就是短時間內會出現重複的請求,因此對模組需要支援高併發性。

技術實現:

  對請求的業務內容進行MD5摘要,並且將MD5摘要儲存到快取中,每個請求資料都通過這個一個公共的呼叫的方法進行判斷。

程式碼實現:

  公共呼叫程式碼 UniqueCheck 採用單例模式建立唯一物件,便於在多執行緒呼叫的時候,只訪問一個統一的快取庫

/*
         * volatile就像大家更熟悉的const一樣,volatile是一個型別修飾符(type specifier)。
         * 它是被設計用來修飾被不同執行緒訪問和修改的變數。
         * 如果沒有volatile,基本上會導致這樣的結果:要麼無法編寫多執行緒程式,要麼編譯器失去大量優化的機會。
         */
        private static readonly object lockHelper = new object();

        private volatile static UniqueCheck _instance;        

        /// <summary>
        /// 獲取單一例項
        /// </summary>
        /// <returns></returns>
        public static UniqueCheck GetInstance()
        {
            if (_instance == null)
            {
                lock (lockHelper)
                {
                    if (_instance == null)
                        _instance = new UniqueCheck();
                }
            }
            return _instance;
        }

  這裡需要注意volatile的修飾符,在實際測試過程中,如果沒有此修飾符,在高併發的情況下會出現報錯。

  自定義一個可以進行併發處理佇列,程式碼如下:ConcurrentLinkedQueue

  1 using System;
  2 using System.Collections.Generic;
  3 using System.Text;
  4 using System.Threading;
  5 
  6 namespace PackgeUniqueCheck
  7 {
  8     /// <summary>
  9     /// 非加鎖併發佇列,處理100個併發數以內
 10     /// </summary>
 11     /// <typeparam name="T"></typeparam>
 12     public class ConcurrentLinkedQueue<T>
 13     {
 14         private class Node<K>
 15         {
 16             internal K Item;
 17             internal Node<K> Next;
 18 
 19             public Node(K item, Node<K> next)
 20             {
 21                 this.Item = item;
 22                 this.Next = next;
 23             }
 24         }
 25 
 26         private Node<T> _head;
 27         private Node<T> _tail;
 28 
 29         public ConcurrentLinkedQueue()
 30         {
 31             _head = new Node<T>(default(T), null);
 32             _tail = _head;
 33         }
 34 
 35         public bool IsEmpty
 36         {
 37             get { return (_head.Next == null); }
 38         }
 39         /// <summary>
 40         /// 進入佇列
 41         /// </summary>
 42         /// <param name="item"></param>
 43         public void Enqueue(T item)
 44         {
 45             Node<T> newNode = new Node<T>(item, null);
 46             while (true)
 47             {
 48                 Node<T> curTail = _tail;
 49                 Node<T> residue = curTail.Next;
 50 
 51                 //判斷_tail是否被其他process改變
 52                 if (curTail == _tail)
 53                 {
 54                     //A 有其他process執行C成功,_tail應該指向新的節點
 55                     if (residue == null)
 56                     {
 57                         //C 其他process改變了tail節點,需要重新取tail節點
 58                         if (Interlocked.CompareExchange<Node<T>>(
 59                           ref curTail.Next, newNode, residue) == residue)
 60                         {
 61                             //D 嘗試修改tail
 62                             Interlocked.CompareExchange<Node<T>>(ref _tail, newNode, curTail);
 63                             return;
 64                         }
 65                     }
 66                     else
 67                     {
 68                         //B 幫助其他執行緒完成D操作
 69                         Interlocked.CompareExchange<Node<T>>(ref _tail, residue, curTail);
 70                     }
 71                 }
 72             }
 73         }
 74         /// <summary>
 75         /// 佇列取資料
 76         /// </summary>
 77         /// <param name="result"></param>
 78         /// <returns></returns>
 79         public bool TryDequeue(out T result)
 80         {
 81             Node<T> curHead;
 82             Node<T> curTail;
 83             Node<T> next;
 84             while (true)
 85             {
 86                 curHead = _head;
 87                 curTail = _tail;
 88                 next = curHead.Next;
 89                 if (curHead == _head)
 90                 {
 91                     if (next == null) //Queue為空
 92                     {
 93                         result = default(T);
 94                         return false;
 95                     }
 96                     if (curHead == curTail) //Queue處於Enqueue第一個node的過程中
 97                     {
 98                         //嘗試幫助其他Process完成操作
 99                         Interlocked.CompareExchange<Node<T>>(ref _tail, next, curTail);
100                     }
101                     else
102                     {
103                         //取next.Item必須放到CAS之前
104                         result = next.Item;
105                         //如果_head沒有發生改變,則將_head指向next並退出
106                         if (Interlocked.CompareExchange<Node<T>>(ref _head,
107                           next, curHead) == curHead)
108                             break;
109                     }
110                 }
111             }
112             return true;
113         }
114         /// <summary>
115         /// 嘗試獲取最後一個物件
116         /// </summary>
117         /// <param name="result"></param>
118         /// <returns></returns>
119         public bool TryGetTail(out T result)
120         {
121             result = default(T);
122             if (_tail == null)
123             {
124                 return false;
125             }
126             result = _tail.Item;
127             return true;
128         }
129     }
130 }

雖然是一個非常簡單的唯一性校驗邏輯,但是要做到高效率,高併發支援,高可靠性,以及低記憶體佔用,需要實現這樣的需求,需要做細緻的模擬測試。

  1 using System;
  2 using System.Collections.Generic;
  3 using System.Text;
  4 using System.Threading;
  5 using System.Collections;
  6 
  7 namespace PackgeUniqueCheck
  8 {
  9     public class UniqueCheck
 10     {
 11         /*
 12          * volatile就像大家更熟悉的const一樣,volatile是一個型別修飾符(type specifier)。
 13          * 它是被設計用來修飾被不同執行緒訪問和修改的變數。
 14          * 如果沒有volatile,基本上會導致這樣的結果:要麼無法編寫多執行緒程式,要麼編譯器失去大量優化的機會。
 15          */
 16         private static readonly object lockHelper = new object();
 17 
 18         private volatile static UniqueCheck _instance;        
 19 
 20         /// <summary>
 21         /// 獲取單一例項
 22         /// </summary>
 23         /// <returns></returns>
 24         public static UniqueCheck GetInstance()
 25         {
 26             if (_instance == null)
 27             {
 28                 lock (lockHelper)
 29                 {
 30                     if (_instance == null)
 31                         _instance = new UniqueCheck();
 32                 }
 33             }
 34             return _instance;
 35         }
 36 
 37         private UniqueCheck()
 38         {
 39             //建立一個執行緒安全的雜湊表,作為字典快取
 40             _DataKey = Hashtable.Synchronized(new Hashtable());
 41             Queue myqueue = new Queue();
 42             _DataQueue = Queue.Synchronized(myqueue);
 43             _Myqueue = new ConcurrentLinkedQueue<string>();
 44             _Timer = new Thread(DoTicket);
 45             _Timer.Start();
 46         }
 47 
 48         #region 公共屬性設定
 49         /// <summary>
 50         /// 設定定時執行緒的休眠時間長度:預設為1分鐘
 51         /// 時間範圍:1-7200000,值為1毫秒到2小時
 52         /// </summary>
 53         /// <param name="value"></param>
 54         public void SetTimeSpan(int value)
 55         {
 56             if (value > 0&& value <=7200000)
 57             {
 58                 _TimeSpan = value;
 59             }
 60         }
 61         /// <summary>
 62         /// 設定快取Cache中的最大記錄條數
 63         /// 值範圍:1-5000000,1到500萬
 64         /// </summary>
 65         /// <param name="value"></param>
 66         public void SetCacheMaxNum(int value)
 67         {
 68             if (value > 0 && value <= 5000000)
 69             {
 70                 _CacheMaxNum = value;
 71             }
 72         }
 73         /// <summary>
 74         /// 設定是否在控制檯中顯示日誌
 75         /// </summary>
 76         /// <param name="value"></param>
 77         public void SetIsShowMsg(bool value)
 78         {
 79             Helper.IsShowMsg = value;
 80         }
 81         /// <summary>
 82         /// 執行緒請求阻塞增量
 83         /// 值範圍:1-CacheMaxNum,建議設定為快取最大值的10%-20%
 84         /// </summary>
 85         /// <param name="value"></param>
 86         public void SetBlockNumExt(int value)
 87         {
 88             if (value > 0 && value <= _CacheMaxNum)
 89             {
 90                 _BlockNumExt = value;
 91             }
 92         }
 93         /// <summary>
 94         /// 請求阻塞時間
 95         /// 值範圍:1-max,根據阻塞增量設定請求阻塞時間
 96         /// 阻塞時間越長,阻塞增量可以設定越大,但是請求實時響應就越差
 97         /// </summary>
 98         /// <param name="value"></param>
 99         public void SetBlockSpanTime(int value)
100         {
101             if (value > 0)
102             {
103                 _BlockSpanTime = value;
104             }
105         }
106         #endregion
107 
108         #region 私有變數
109         /// <summary>
110         /// 內部執行執行緒
111         /// </summary>
112         private Thread _runner = null;
113         /// <summary>
114         /// 可處理高併發的佇列
115         /// </summary>
116         private ConcurrentLinkedQueue<string> _Myqueue = null;
117         /// <summary>
118         /// 唯一內容的時間健值對
119         /// </summary>
120         private Hashtable _DataKey = null;
121         /// <summary>
122         /// 內容時間佇列
123         /// </summary>
124         private Queue _DataQueue = null;
125         /// <summary>
126         /// 定時執行緒的休眠時間長度:預設為1分鐘
127         /// </summary>
128         private int _TimeSpan = 3000;
129         /// <summary>
130         /// 定時計時器執行緒
131         /// </summary>
132         private Thread _Timer = null;
133         /// <summary>
134         /// 快取Cache中的最大記錄條數
135         /// </summary>
136         private int _CacheMaxNum = 500000;
137         /// <summary>
138         /// 執行緒請求阻塞增量
139         /// </summary>
140         private int _BlockNumExt = 10000;
141         /// <summary>
142         /// 請求阻塞時間
143         /// </summary>
144         private int _BlockSpanTime = 100;
145         #endregion
146 
147         #region 私有方法
148         private void StartRun()
149         {
150             _runner = new Thread(DoAction);
151             _runner.Start();
152             Helper.ShowMsg("內部執行緒啟動成功!");
153         }
154 
155         private string GetItem()
156         {
157             string tp = string.Empty;
158             bool result = _Myqueue.TryDequeue(out tp);
159             return tp;
160         }
161         /// <summary>
162         /// 執行迴圈操作
163         /// </summary>
164         private void DoAction()
165         {
166             while (true)
167             {
168                 while (!_Myqueue.IsEmpty)
169                 {
170                     string item = GetItem();
171                     _DataQueue.Enqueue(item);
172                     if (!_DataKey.ContainsKey(item))
173                     {
174                         _DataKey.Add(item, DateTime.Now);
175                     }
176                 }
177                 //Helper.ShowMsg("當前陣列已經為空,處理執行緒進入休眠狀態...");
178                 Thread.Sleep(2);
179             }
180         }
181         /// <summary>
182         /// 執行定時器的動作
183         /// </summary>
184         private void DoTicket()
185         {
186             while (true)
187             {
188                 Helper.ShowMsg("當前資料佇列個數:" + _DataQueue.Count.ToString());
189                 if (_DataQueue.Count > _CacheMaxNum)
190                 {
191                     while (true)
192                     {
193                         Helper.ShowMsg(string.Format("當前佇列數:{0},已經超出最大長度:{1},開始進行清理操作...", _DataQueue.Count, _CacheMaxNum.ToString()));
194                         string item = _DataQueue.Dequeue().ToString();
195                         if (!string.IsNullOrEmpty(item))
196                         {
197                             if (_DataKey.ContainsKey(item))
198                             {
199                                 _DataKey.Remove(item);
200                             }
201                             if (_DataQueue.Count <= _CacheMaxNum)
202                             {
203                                 Helper.ShowMsg("清理完成,開始休眠清理執行緒...");
204                                 break;
205                             }
206                         }
207                     }
208                 }
209                 Thread.Sleep(_TimeSpan);
210             }
211         }
212 
213         /// <summary>
214         /// 執行緒進行睡眠等待
215         /// 如果當前負載壓力大大超出了執行緒的處理能力
216         /// 那麼需要進行延時呼叫
217         /// </summary>
218         private void BlockThread()
219         {
220             if (_DataQueue.Count > _CacheMaxNum + _BlockNumExt)
221             {
222                 Thread.Sleep(_BlockSpanTime);
223             }
224         }
225         #endregion
226 
227         #region 公共方法
228         /// <summary>
229         /// 開啟服務執行緒
230         /// </summary>
231         public void Start()
232         {
233             if (_runner == null)
234             {
235                 StartRun();
236             }
237             else
238             {
239                 if (_runner.IsAlive == false)
240                 {
241                     StartRun();
242                 }
243             }
244 
245         }
246         /// <summary>
247         /// 關閉服務執行緒
248         /// </summary>
249         public void Stop()
250         {
251             if (_runner != null)
252             {
253                 _runner.Abort();
254                 _runner = null;
255             }
256         }
257 
258         /// <summary>
259         /// 新增內容資訊
260         /// </summary>
261         /// <param name="item">內容資訊</param>
262         /// <returns>true:快取中不包含此值,佇列新增成功,false:快取中包含此值,佇列新增失敗</returns>
263         public bool AddItem(string item)
264         {
265             BlockThread();
266             item = Helper.MakeMd5(item);
267             if (_DataKey.ContainsKey(item))
268             {
269                 return false;
270             }
271             else
272             {
273                 _Myqueue.Enqueue(item);
274                 return true;
275             }
276         }
277         /// <summary>
278         /// 判斷內容資訊是否已經存在
279         /// </summary>
280         /// <param name="item">內容資訊</param>
281         /// <returns>true:資訊已經存在於快取中,false:資訊不存在於快取中</returns>
282         public bool CheckItem(string item)
283         {
284             item = Helper.MakeMd5(item);
285             return _DataKey.ContainsKey(item);
286         }
287         #endregion    
288 
289     }
290 }

模擬測試程式碼:

private static string _example = Guid.NewGuid().ToString();

        private static UniqueCheck _uck = null;

        static void Main(string[] args)
        {
            _uck = UniqueCheck.GetInstance();
            _uck.Start();
            _uck.SetIsShowMsg(false);
            _uck.SetCacheMaxNum(20000000);
            _uck.SetBlockNumExt(1000000);
            _uck.SetTimeSpan(6000);

            _uck.AddItem(_example);
            Thread[] threads = new Thread[20];

            for (int i = 0; i < 20; i++)
            {
                threads[i] = new Thread(AddInfo);
                threads[i].Start();
            }

            Thread checkthread = new Thread(CheckInfo);
            checkthread.Start();

            string value = Console.ReadLine();

            checkthread.Abort();
            for (int i = 0; i < 50; i++)
            {
                threads[i].Abort();
            }
            _uck.Stop();
        }

        static void AddInfo()
        {
            while (true)
            {
                _uck.AddItem(Guid.NewGuid().ToString());
            }
        }

        static void CheckInfo()
        {
            while (true)
            {
                Console.WriteLine("開始時間:{0}...", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.ffff"));
                Console.WriteLine("插入結果:{0}", _uck.AddItem(_example));
                Console.WriteLine("結束時間:{0}", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.ffff"));
          //調整程序休眠時間,可以測試高併發的情況 //Thread.Sleep(1000); } }

測試截圖:

&n