C# 利用執行緒安全資料結構BlockingCollection實現多執行緒
阿新 • • 發佈:2018-12-04
using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Threading; using Danny.Infrastructure.Helper; namespace Danny.Infrastructure.Collections { /// <summary> /// 一個基於BlockingCollection實現的多執行緒的處理佇列 /// </summary> public class ProcessQueue<T> { private BlockingCollection<T> _queue; private CancellationTokenSource _cancellationTokenSource; private CancellationToken _cancellToken; //內部執行緒池 private List<Thread> _threadCollection; //佇列是否正在處理資料 private int _isProcessing; //有執行緒正在處理資料 private const int Processing = 1; //沒有執行緒處理資料 private const int UnProcessing = 0; //佇列是否可用 private volatile bool _enabled = true; //內部處理執行緒數量 private int _internalThreadCount; public event Action<T> ProcessItemEvent; //處理異常,需要三個引數,當前佇列例項,異常,當時處理的資料 public event Action<dynamic,Exception,T> ProcessExceptionEvent; public ProcessQueue() { _queue=new BlockingCollection<T>(); _cancellationTokenSource = new CancellationTokenSource(); _internalThreadCount = 1; _cancellToken = _cancellationTokenSource.Token; _threadCollection = new List<Thread>(); } public ProcessQueue(int internalThreadCount):this() { this._internalThreadCount = internalThreadCount; } /// <summary> /// 佇列內部元素的數量 /// </summary> public int GetInternalItemCount() { return _queue.Count; } public void Enqueue(T items) { if (items == null) { throw new ArgumentException("items"); } _queue.Add(items); DataAdded(); } public void Flush() { StopProcess(); while (_queue.Count != 0) { T item=default(T); if (_queue.TryTake(out item)) { try { ProcessItemEvent(item); } catch (Exception ex) { OnProcessException(ex,item); } } } } private void DataAdded() { if (_enabled) { if (!IsProcessingItem()) { ProcessRangeItem(); StartProcess(); } } } //判斷是否佇列有執行緒正在處理 private bool IsProcessingItem() { return !(Interlocked.CompareExchange(ref _isProcessing, Processing, UnProcessing) == UnProcessing); } private void ProcessRangeItem() { for (int i = 0; i < this._internalThreadCount; i++) { ProcessItem(); } } private void ProcessItem() { Thread currentThread = new Thread((state) => { T item=default(T); while (_enabled) { try { try { item = _queue.Take(_cancellToken); ProcessItemEvent(item); } catch (OperationCanceledException ex) { DebugHelper.DebugView(ex.ToString()); } } catch (Exception ex) { OnProcessException(ex,item); } } }); _threadCollection.Add(currentThread); } private void StartProcess() { foreach (var thread in _threadCollection) { thread.Start(); } } private void StopProcess() { this._enabled = false; foreach (var thread in _threadCollection) { if (thread.IsAlive) { thread.Join(); } } _threadCollection.Clear(); } private void OnProcessException(Exception ex,T item) { var tempException = ProcessExceptionEvent; Interlocked.CompareExchange(ref ProcessExceptionEvent, null, null); if (tempException != null) { ProcessExceptionEvent(this,ex,item); } } } }
class Program { static void Main(string[] args) { ProcessQueue<int> processQueue = new ProcessQueue<int>(); processQueue.ProcessExceptionEvent += ProcessQueue_ProcessExceptionEvent; processQueue.ProcessItemEvent += ProcessQueue_ProcessItemEvent; processQueue.Enqueue(1); processQueue.Enqueue(2); processQueue.Enqueue(3); } /// <summary> /// 該方法對入隊的每個元素進行處理 /// </summary> /// <param name="value"></param> private static void ProcessQueue_ProcessItemEvent(int value) { Console.WriteLine(value); } /// <summary> /// 處理異常 /// </summary> /// <param name="obj">佇列例項</param> /// <param name="ex">異常物件</param> /// <param name="value">出錯的資料</param> private static void ProcessQueue_ProcessExceptionEvent(dynamic obj, Exception ex, int value) { Console.WriteLine(ex.ToString()); } }