C# 非同步併發操作,只保留最後一次操作
阿新 • • 發佈:2019-12-05
在我們業務操作時,難免會有多次操作,我們期望什麼結果呢?
絕大部分情況,應該是隻需要最後一次操作的結果,其它操作應該無效。
自定義等待的任務類
1. 可等待的任務類 AwaitableTask:
1 /// <summary> 2 /// 可等待的任務 3 /// </summary> 4 public class AwaitableTask 5 { 6 /// <summary> 7 /// 獲取任務是否為不可執行狀態 8 /// </summary> 9 public bool NotExecutable { get; private set; } 10 11 /// <summary> 12 /// 設定任務不可執行 13 /// </summary> 14 public void SetNotExecutable() 15 { 16 NotExecutable = true; 17 } 18 19 /// <summary> 20 /// 獲取任務是否有效 21 /// 注:對無效任務,可以不做處理。減少併發操作導致的干擾 22 /// </summary> 23 public bool IsInvalid { get; private set; } = true; 24 25 /// <summary> 26 /// 標記任務無效 27 /// </summary> 28 public void MarkTaskValid() 29 { 30 IsInvalid = false; 31 } 32 33 #region Task 34 35 private readonly Task _task; 36 /// <summary> 37 /// 初始化可等待的任務。 38 /// </summary> 39 /// <param name="task"></param> 40 public AwaitableTask(Task task) => _task = task; 41 42 /// <summary> 43 /// 獲取任務是否已完成 44 /// </summary> 45 public bool IsCompleted => _task.IsCompleted; 46 47 /// <summary> 48 /// 任務的Id 49 /// </summary> 50 public int TaskId => _task.Id; 51 52 /// <summary> 53 /// 開始任務 54 /// </summary> 55 public void Start() => _task.Start(); 56 57 /// <summary> 58 /// 同步執行開始任務 59 /// </summary> 60 public void RunSynchronously() => _task.RunSynchronously(); 61 62 #endregion 63 64 #region TaskAwaiter 65 66 /// <summary> 67 /// 獲取任務等待器 68 /// </summary> 69 /// <returns></returns> 70 public TaskAwaiter GetAwaiter() => new TaskAwaiter(this); 71 72 /// <summary>Provides an object that waits for the completion of an asynchronous task. </summary> 73 [HostProtection(SecurityAction.LinkDemand, ExternalThreading = true, Synchronization = true)] 74 public struct TaskAwaiter : INotifyCompletion 75 { 76 private readonly AwaitableTask _task; 77 78 /// <summary> 79 /// 任務等待器 80 /// </summary> 81 /// <param name="awaitableTask"></param> 82 public TaskAwaiter(AwaitableTask awaitableTask) => _task = awaitableTask; 83 84 /// <summary> 85 /// 任務是否完成. 86 /// </summary> 87 public bool IsCompleted => _task._task.IsCompleted; 88 89 /// <inheritdoc /> 90 public void OnCompleted(Action continuation) 91 { 92 var This = this; 93 _task._task.ContinueWith(t => 94 { 95 if (!This._task.NotExecutable) continuation?.Invoke(); 96 }); 97 } 98 /// <summary> 99 /// 獲取任務結果 100 /// </summary> 101 public void GetResult() => _task._task.Wait(); 102 } 103 104 #endregion 105 106 }
無效的操作可以分為以下倆種:
- 已經進行中的操作,後續結果應標記為無效
- 還沒開始的操作,後續不執行
自定義任務型別 AwaitableTask中,新增倆個欄位NotExecutable、IsInvalid:
1 /// <summary> 2 /// 獲取任務是否為不可執行狀態 3 /// </summary> 4 public bool NotExecutable { get; private set; } 5 /// <summary> 6 /// 獲取任務是否有效 7 /// 注:對無效任務,可以不做處理。減少併發操作導致的干擾 8 /// </summary> 9 public bool IsInvalid { get; private set; } = true;
2. 有返回結果的可等待任務類 AwaitableTask<TResult>:
1 /// <summary> 2 /// 可等待的任務 3 /// </summary> 4 /// <typeparam name="TResult"></typeparam> 5 public class AwaitableTask<TResult> : AwaitableTask 6 { 7 private readonly Task<TResult> _task; 8 /// <summary> 9 /// 初始化可等待的任務 10 /// </summary> 11 /// <param name="task">需要執行的任務</param> 12 public AwaitableTask(Task<TResult> task) : base(task) => _task = task; 13 14 #region TaskAwaiter 15 16 /// <summary> 17 /// 獲取任務等待器 18 /// </summary> 19 /// <returns></returns> 20 public new TaskAwaiter GetAwaiter() => new TaskAwaiter(this); 21 22 /// <summary> 23 /// 任務等待器 24 /// </summary> 25 [HostProtection(SecurityAction.LinkDemand, ExternalThreading = true, Synchronization = true)] 26 public new struct TaskAwaiter : INotifyCompletion 27 { 28 private readonly AwaitableTask<TResult> _task; 29 30 /// <summary> 31 /// 初始化任務等待器 32 /// </summary> 33 /// <param name="awaitableTask"></param> 34 public TaskAwaiter(AwaitableTask<TResult> awaitableTask) => _task = awaitableTask; 35 36 /// <summary> 37 /// 任務是否已完成。 38 /// </summary> 39 public bool IsCompleted => _task._task.IsCompleted; 40 41 /// <inheritdoc /> 42 public void OnCompleted(Action continuation) 43 { 44 var This = this; 45 _task._task.ContinueWith(t => 46 { 47 if (!This._task.NotExecutable) continuation?.Invoke(); 48 }); 49 } 50 51 /// <summary> 52 /// 獲取任務結果。 53 /// </summary> 54 /// <returns></returns> 55 public TResult GetResult() => _task._task.Result; 56 } 57 58 #endregion 59 }
新增任務等待器,同步等待結果返回:
1 /// <summary> 2 /// 獲取任務等待器 3 /// </summary> 4 /// <returns></returns> 5 public new TaskAwaiter GetAwaiter() => new TaskAwaiter(this); 6 7 /// <summary> 8 /// 任務等待器 9 /// </summary> 10 [HostProtection(SecurityAction.LinkDemand, ExternalThreading = true, Synchronization = true)] 11 public new struct TaskAwaiter : INotifyCompletion 12 { 13 private readonly AwaitableTask<TResult> _task; 14 15 /// <summary> 16 /// 初始化任務等待器 17 /// </summary> 18 /// <param name="awaitableTask"></param> 19 public TaskAwaiter(AwaitableTask<TResult> awaitableTask) => _task = awaitableTask; 20 21 /// <summary> 22 /// 任務是否已完成。 23 /// </summary> 24 public bool IsCompleted => _task._task.IsCompleted; 25 26 /// <inheritdoc /> 27 public void OnCompleted(Action continuation) 28 { 29 var This = this; 30 _task._task.ContinueWith(t => 31 { 32 if (!This._task.NotExecutable) continuation?.Invoke(); 33 }); 34 } 35 36 /// <summary> 37 /// 獲取任務結果。 38 /// </summary> 39 /// <returns></returns> 40 public TResult GetResult() => _task._task.Result; 41 }
非同步任務佇列
新增非同步任務佇列類,用於任務的管理,如新增、執行、篩選等:
1 /// <summary> 2 /// 非同步任務佇列 3 /// </summary> 4 public class AsyncTaskQueue : IDisposable 5 { 6 /// <summary> 7 /// 非同步任務佇列 8 /// </summary> 9 public AsyncTaskQueue() 10 { 11 _autoResetEvent = new AutoResetEvent(false); 12 _thread = new Thread(InternalRunning) { IsBackground = true }; 13 _thread.Start(); 14 } 15 16 #region 執行 17 18 /// <summary> 19 /// 執行非同步操作 20 /// </summary> 21 /// <typeparam name="T">返回結果型別</typeparam> 22 /// <param name="func">非同步操作</param> 23 /// <returns>isInvalid:非同步操作是否有效;result:非同步操作結果</returns> 24 public async Task<(bool isInvalid, T reslut)> ExecuteAsync<T>(Func<Task<T>> func) 25 { 26 var task = GetExecutableTask(func); 27 var result = await await task; 28 if (!task.IsInvalid) 29 { 30 result = default(T); 31 } 32 return (task.IsInvalid, result); 33 } 34 35 /// <summary> 36 /// 執行非同步操作 37 /// </summary> 38 /// <typeparam name="T"></typeparam> 39 /// <param name="func"></param> 40 /// <returns></returns> 41 public async Task<bool> ExecuteAsync<T>(Func<Task> func) 42 { 43 var task = GetExecutableTask(func); 44 await await task; 45 return task.IsInvalid; 46 } 47 48 #endregion 49 50 #region 新增任務 51 52 /// <summary> 53 /// 獲取待執行任務 54 /// </summary> 55 /// <param name="action"></param> 56 /// <returns></returns> 57 private AwaitableTask GetExecutableTask(Action action) 58 { 59 var awaitableTask = new AwaitableTask(new Task(action)); 60 AddPenddingTaskToQueue(awaitableTask); 61 return awaitableTask; 62 } 63 64 /// <summary> 65 /// 獲取待執行任務 66 /// </summary> 67 /// <typeparam name="TResult"></typeparam> 68 /// <param name="function"></param> 69 /// <returns></returns> 70 private AwaitableTask<TResult> GetExecutableTask<TResult>(Func<TResult> function) 71 { 72 var awaitableTask = new AwaitableTask<TResult>(new Task<TResult>(function)); 73 AddPenddingTaskToQueue(awaitableTask); 74 return awaitableTask; 75 } 76 77 /// <summary> 78 /// 新增待執行任務到佇列 79 /// </summary> 80 /// <param name="task"></param> 81 /// <returns></returns> 82 private void AddPenddingTaskToQueue(AwaitableTask task) 83 { 84 //新增佇列,加鎖。 85 lock (_queue) 86 { 87 _queue.Enqueue(task); 88 //開始執行任務 89 _autoResetEvent.Set(); 90 } 91 } 92 93 #endregion 94 95 #region 內部執行 96 97 private void InternalRunning() 98 { 99 while (!_isDisposed) 100 { 101 if (_queue.Count == 0) 102 { 103 //等待後續任務 104 _autoResetEvent.WaitOne(); 105 } 106 while (TryGetNextTask(out var task)) 107 { 108 //如已從佇列中刪除 109 if (task.NotExecutable) continue; 110 111 if (UseSingleThread) 112 { 113 task.RunSynchronously(); 114 } 115 else 116 { 117 task.Start(); 118 } 119 } 120 } 121 } 122 /// <summary> 123 /// 上一次非同步操作 124 /// </summary> 125 private AwaitableTask _lastDoingTask; 126 private bool TryGetNextTask(out AwaitableTask task) 127 { 128 task = null; 129 while (_queue.Count > 0) 130 { 131 //獲取並從佇列中移除任務 132 if (_queue.TryDequeue(out task) && (!AutoCancelPreviousTask || _queue.Count == 0)) 133 { 134 //設定進行中的非同步操作無效 135 _lastDoingTask?.MarkTaskValid(); 136 _lastDoingTask = task; 137 return true; 138 } 139 //併發操作,設定任務不可執行 140 task.SetNotExecutable(); 141 } 142 return false; 143 } 144 145 #endregion 146 147 #region dispose 148 149 /// <inheritdoc /> 150 public void Dispose() 151 { 152 Dispose(true); 153 GC.SuppressFinalize(this); 154 } 155 156 /// <summary> 157 /// 析構任務佇列 158 /// </summary> 159 ~AsyncTaskQueue() => Dispose(false); 160 161 private void Dispose(bool disposing) 162 { 163 if (_isDisposed) return; 164 if (disposing) 165 { 166 _autoResetEvent.Dispose(); 167 } 168 _thread = null; 169 _autoResetEvent = null; 170 _isDisposed = true; 171 } 172 173 #endregion 174 175 #region 屬性及欄位 176 177 /// <summary> 178 /// 是否使用單執行緒完成任務. 179 /// </summary> 180 public bool UseSingleThread { get; set; } = true; 181 182 /// <summary> 183 /// 自動取消以前的任務。 184 /// </summary> 185 public bool AutoCancelPreviousTask { get; set; } = false; 186 187 private bool _isDisposed; 188 private readonly ConcurrentQueue<AwaitableTask> _queue = new ConcurrentQueue<AwaitableTask>(); 189 private Thread _thread; 190 private AutoResetEvent _autoResetEvent; 191 192 #endregion 193 194 }View Code
1. 自動取消之前的任務 AutoCancelPreviousTask
內部使用執行緒,迴圈獲取當前任務列表,如果當前任務被標記NotExecutable不可執行,則跳過。
NotExecutable是何時標記的?
獲取任務時,標記所有獲取的任務為NotExecutable。直到任務列表中為空,那麼只執行最後獲取的一個任務。
2. 標記已經進行的任務無效 MarkTaskValid
當前進行的任務,無法中止,那麼標記為無效即可。
1 /// <summary> 2 /// 上一次非同步操作 3 /// </summary> 4 private AwaitableTask _lastDoingTask; 5 private bool TryGetNextTask(out AwaitableTask task) 6 { 7 task = null; 8 while (_queue.Count > 0) 9 { 10 //獲取並從佇列中移除任務 11 if (_queue.TryDequeue(out task) && (!AutoCancelPreviousTask || _queue.Count == 0)) 12 { 13 //設定進行中的非同步操作無效 14 _lastDoingTask?.MarkTaskValid(); 15 _lastDoingTask = task; 16 return true; 17 } 18 //併發操作,設定任務不可執行 19 task.SetNotExecutable(); 20 } 21 return false; 22 }
後續執行完後,根據此標記,設定操作結果為空。
1 /// <summary> 2 /// 執行非同步操作 3 /// </summary> 4 /// <typeparam name="T">返回結果型別</typeparam> 5 /// <param name="func">非同步操作</param> 6 /// <returns>isInvalid:非同步操作是否有效;result:非同步操作結果</returns> 7 public async Task<(bool isInvalid, T reslut)> ExecuteAsync<T>(Func<Task<T>> func) 8 { 9 var task = GetExecutableTask(func); 10 var result = await await task; 11 if (!task.IsInvalid) 12 { 13 result = default(T); 14 } 15 return (task.IsInvalid, result); 16 }
實踐測試
啟動10個併發任務,測試實際的任務佇列併發操作管理:
1 public MainWindow() 2 { 3 InitializeComponent(); 4 _asyncTaskQueue = new AsyncTaskQueue 5 { 6 AutoCancelPreviousTask = true, 7 UseSingleThread = true 8 }; 9 } 10 private AsyncTaskQueue _asyncTaskQueue; 11 private void ButtonBase_OnClick(object sender, RoutedEventArgs e) 12 { 13 // 快速啟動10個任務 14 for (var i = 1; i < 10; i++) 15 { 16 Test(_asyncTaskQueue, i); 17 } 18 } 19 public static async void Test(AsyncTaskQueue taskQueue, int num) 20 { 21 var result = await taskQueue.ExecuteAsync(async () => 22 { 23 Debug.WriteLine("輸入:" + num); 24 // 長時間耗時任務 25 await Task.Delay(TimeSpan.FromSeconds(5)); 26 return num * 100; 27 }); 28 Debug.WriteLine($"{num}輸出的:" + result); 29 }
測試結果如下:
只有最後一次操作結果,才是有效的。其它9次操作,一次是無效的,8次操作被取消不執行。
Demo,見Github