1. 程式人生 > >C# 非同步併發操作,只保留最後一次操作

C# 非同步併發操作,只保留最後一次操作

在我們業務操作時,難免會有多次操作,我們期望什麼結果呢?

絕大部分情況,應該是隻需要最後一次操作的結果,其它操作應該無效。

自定義等待的任務類

1. 可等待的任務類 AwaitableTask:

  1     /// <summary>
  2     /// 可等待的任務
  3     /// </summary>
  4     public class AwaitableTask
  5     {
  6         /// <summary>
  7         /// 獲取任務是否為不可執行狀態
  8         /// </summary>
  9         public bool NotExecutable { get; private set; }
 10 
 11         /// <summary>
 12         /// 設定任務不可執行
 13         /// </summary>
 14         public void SetNotExecutable()
 15         {
 16             NotExecutable = true;
 17         }
 18 
 19         /// <summary>
 20         /// 獲取任務是否有效
 21         /// 注:對無效任務,可以不做處理。減少併發操作導致的干擾
 22         /// </summary>
 23         public bool IsInvalid { get; private set; } = true;
 24 
 25         /// <summary>
 26         /// 標記任務無效
 27         /// </summary>
 28         public void MarkTaskValid()
 29         {
 30             IsInvalid = false;
 31         }
 32 
 33         #region Task
 34 
 35         private readonly Task _task;
 36         /// <summary>
 37         /// 初始化可等待的任務。
 38         /// </summary>
 39         /// <param name="task"></param>
 40         public AwaitableTask(Task task) => _task = task;
 41 
 42         /// <summary>
 43         /// 獲取任務是否已完成
 44         /// </summary>
 45         public bool IsCompleted => _task.IsCompleted;
 46 
 47         /// <summary>
 48         /// 任務的Id
 49         /// </summary>
 50         public int TaskId => _task.Id;
 51 
 52         /// <summary>
 53         /// 開始任務
 54         /// </summary>
 55         public void Start() => _task.Start();
 56 
 57         /// <summary>
 58         /// 同步執行開始任務
 59         /// </summary>
 60         public void RunSynchronously() => _task.RunSynchronously();
 61 
 62         #endregion
 63 
 64         #region TaskAwaiter
 65 
 66         /// <summary>
 67         /// 獲取任務等待器
 68         /// </summary>
 69         /// <returns></returns>
 70         public TaskAwaiter GetAwaiter() => new TaskAwaiter(this);
 71 
 72         /// <summary>Provides an object that waits for the completion of an asynchronous task. </summary>
 73         [HostProtection(SecurityAction.LinkDemand, ExternalThreading = true, Synchronization = true)]
 74         public struct TaskAwaiter : INotifyCompletion
 75         {
 76             private readonly AwaitableTask _task;
 77 
 78             /// <summary>
 79             /// 任務等待器
 80             /// </summary>
 81             /// <param name="awaitableTask"></param>
 82             public TaskAwaiter(AwaitableTask awaitableTask) => _task = awaitableTask;
 83 
 84             /// <summary>
 85             /// 任務是否完成.
 86             /// </summary>
 87             public bool IsCompleted => _task._task.IsCompleted;
 88 
 89             /// <inheritdoc />
 90             public void OnCompleted(Action continuation)
 91             {
 92                 var This = this;
 93                 _task._task.ContinueWith(t =>
 94                 {
 95                     if (!This._task.NotExecutable) continuation?.Invoke();
 96                 });
 97             }
 98             /// <summary>
 99             /// 獲取任務結果
100             /// </summary>
101             public void GetResult() => _task._task.Wait();
102         }
103 
104         #endregion
105 
106     }
View Code

無效的操作可以分為以下倆種:

  • 已經進行中的操作,後續結果應標記為無效
  • 還沒開始的操作,後續不執行

自定義任務型別 AwaitableTask中,新增倆個欄位NotExecutable、IsInvalid:

1     /// <summary>
2     /// 獲取任務是否為不可執行狀態
3     /// </summary>
4     public bool NotExecutable { get; private set; }
5     /// <summary>
6     /// 獲取任務是否有效
7     /// 注:對無效任務,可以不做處理。減少併發操作導致的干擾
8     /// </summary>
9     public bool IsInvalid { get; private set; } = true;

2. 有返回結果的可等待任務類 AwaitableTask<TResult>:

 1     /// <summary>
 2     /// 可等待的任務
 3     /// </summary>
 4     /// <typeparam name="TResult"></typeparam>
 5     public class AwaitableTask<TResult> : AwaitableTask
 6     {
 7         private readonly Task<TResult> _task;
 8         /// <summary>
 9         /// 初始化可等待的任務
10         /// </summary>
11         /// <param name="task">需要執行的任務</param>
12         public AwaitableTask(Task<TResult> task) : base(task) => _task = task;
13 
14         #region TaskAwaiter
15 
16         /// <summary>
17         /// 獲取任務等待器
18         /// </summary>
19         /// <returns></returns>
20         public new TaskAwaiter GetAwaiter() => new TaskAwaiter(this);
21 
22         /// <summary>
23         /// 任務等待器
24         /// </summary>
25         [HostProtection(SecurityAction.LinkDemand, ExternalThreading = true, Synchronization = true)]
26         public new struct TaskAwaiter : INotifyCompletion
27         {
28             private readonly AwaitableTask<TResult> _task;
29 
30             /// <summary>
31             /// 初始化任務等待器
32             /// </summary>
33             /// <param name="awaitableTask"></param>
34             public TaskAwaiter(AwaitableTask<TResult> awaitableTask) => _task = awaitableTask;
35 
36             /// <summary>
37             /// 任務是否已完成。
38             /// </summary>
39             public bool IsCompleted => _task._task.IsCompleted;
40 
41             /// <inheritdoc />
42             public void OnCompleted(Action continuation)
43             {
44                 var This = this;
45                 _task._task.ContinueWith(t =>
46                 {
47                     if (!This._task.NotExecutable) continuation?.Invoke();
48                 });
49             }
50 
51             /// <summary>
52             /// 獲取任務結果。
53             /// </summary>
54             /// <returns></returns>
55             public TResult GetResult() => _task._task.Result;
56         }
57 
58         #endregion
59     }
View Code

新增任務等待器,同步等待結果返回:

 1    /// <summary>
 2     /// 獲取任務等待器
 3     /// </summary>
 4     /// <returns></returns>
 5     public new TaskAwaiter GetAwaiter() => new TaskAwaiter(this);
 6 
 7     /// <summary>
 8     /// 任務等待器
 9     /// </summary>
10     [HostProtection(SecurityAction.LinkDemand, ExternalThreading = true, Synchronization = true)]
11     public new struct TaskAwaiter : INotifyCompletion
12     {
13         private readonly AwaitableTask<TResult> _task;
14 
15         /// <summary>
16         /// 初始化任務等待器
17         /// </summary>
18         /// <param name="awaitableTask"></param>
19         public TaskAwaiter(AwaitableTask<TResult> awaitableTask) => _task = awaitableTask;
20 
21         /// <summary>
22         /// 任務是否已完成。
23         /// </summary>
24         public bool IsCompleted => _task._task.IsCompleted;
25 
26         /// <inheritdoc />
27         public void OnCompleted(Action continuation)
28         {
29             var This = this;
30             _task._task.ContinueWith(t =>
31             {
32                 if (!This._task.NotExecutable) continuation?.Invoke();
33             });
34         }
35 
36         /// <summary>
37         /// 獲取任務結果。
38         /// </summary>
39         /// <returns></returns>
40         public TResult GetResult() => _task._task.Result;
41     }

非同步任務佇列

新增非同步任務佇列類,用於任務的管理,如新增、執行、篩選等:

  1     /// <summary>
  2     /// 非同步任務佇列
  3     /// </summary>
  4     public class AsyncTaskQueue : IDisposable
  5     {
  6         /// <summary>
  7         /// 非同步任務佇列
  8         /// </summary>
  9         public AsyncTaskQueue()
 10         {
 11             _autoResetEvent = new AutoResetEvent(false);
 12             _thread = new Thread(InternalRunning) { IsBackground = true };
 13             _thread.Start();
 14         }
 15 
 16         #region 執行
 17 
 18         /// <summary>
 19         /// 執行非同步操作
 20         /// </summary>
 21         /// <typeparam name="T">返回結果型別</typeparam>
 22         /// <param name="func">非同步操作</param>
 23         /// <returns>isInvalid:非同步操作是否有效;result:非同步操作結果</returns>
 24         public async Task<(bool isInvalid, T reslut)> ExecuteAsync<T>(Func<Task<T>> func)
 25         {
 26             var task = GetExecutableTask(func);
 27             var result = await await task;
 28             if (!task.IsInvalid)
 29             {
 30                 result = default(T);
 31             }
 32             return (task.IsInvalid, result);
 33         }
 34 
 35         /// <summary>
 36         /// 執行非同步操作
 37         /// </summary>
 38         /// <typeparam name="T"></typeparam>
 39         /// <param name="func"></param>
 40         /// <returns></returns>
 41         public async Task<bool> ExecuteAsync<T>(Func<Task> func)
 42         {
 43             var task = GetExecutableTask(func);
 44             await await task;
 45             return task.IsInvalid;
 46         }
 47 
 48         #endregion
 49 
 50         #region 新增任務
 51 
 52         /// <summary>
 53         /// 獲取待執行任務
 54         /// </summary>
 55         /// <param name="action"></param>
 56         /// <returns></returns>
 57         private AwaitableTask GetExecutableTask(Action action)
 58         {
 59             var awaitableTask = new AwaitableTask(new Task(action));
 60             AddPenddingTaskToQueue(awaitableTask);
 61             return awaitableTask;
 62         }
 63 
 64         /// <summary>
 65         /// 獲取待執行任務
 66         /// </summary>
 67         /// <typeparam name="TResult"></typeparam>
 68         /// <param name="function"></param>
 69         /// <returns></returns>
 70         private AwaitableTask<TResult> GetExecutableTask<TResult>(Func<TResult> function)
 71         {
 72             var awaitableTask = new AwaitableTask<TResult>(new Task<TResult>(function));
 73             AddPenddingTaskToQueue(awaitableTask);
 74             return awaitableTask;
 75         }
 76 
 77         /// <summary>
 78         /// 新增待執行任務到佇列
 79         /// </summary>
 80         /// <param name="task"></param>
 81         /// <returns></returns>
 82         private void AddPenddingTaskToQueue(AwaitableTask task)
 83         {
 84             //新增佇列,加鎖。
 85             lock (_queue)
 86             {
 87                 _queue.Enqueue(task);
 88                 //開始執行任務
 89                 _autoResetEvent.Set();
 90             }
 91         }
 92 
 93         #endregion
 94 
 95         #region 內部執行
 96 
 97         private void InternalRunning()
 98         {
 99             while (!_isDisposed)
100             {
101                 if (_queue.Count == 0)
102                 {
103                     //等待後續任務
104                     _autoResetEvent.WaitOne();
105                 }
106                 while (TryGetNextTask(out var task))
107                 {
108                     //如已從佇列中刪除
109                     if (task.NotExecutable) continue;
110 
111                     if (UseSingleThread)
112                     {
113                         task.RunSynchronously();
114                     }
115                     else
116                     {
117                         task.Start();
118                     }
119                 }
120             }
121         }
122         /// <summary>
123         /// 上一次非同步操作
124         /// </summary>
125         private AwaitableTask _lastDoingTask;
126         private bool TryGetNextTask(out AwaitableTask task)
127         {
128             task = null;
129             while (_queue.Count > 0)
130             {
131                 //獲取並從佇列中移除任務
132                 if (_queue.TryDequeue(out task) && (!AutoCancelPreviousTask || _queue.Count == 0))
133                 {
134                     //設定進行中的非同步操作無效
135                     _lastDoingTask?.MarkTaskValid();
136                     _lastDoingTask = task;
137                     return true;
138                 }
139                 //併發操作,設定任務不可執行
140                 task.SetNotExecutable();
141             }
142             return false;
143         }
144 
145         #endregion
146 
147         #region dispose
148 
149         /// <inheritdoc />
150         public void Dispose()
151         {
152             Dispose(true);
153             GC.SuppressFinalize(this);
154         }
155 
156         /// <summary>
157         /// 析構任務佇列
158         /// </summary>
159         ~AsyncTaskQueue() => Dispose(false);
160 
161         private void Dispose(bool disposing)
162         {
163             if (_isDisposed) return;
164             if (disposing)
165             {
166                 _autoResetEvent.Dispose();
167             }
168             _thread = null;
169             _autoResetEvent = null;
170             _isDisposed = true;
171         }
172 
173         #endregion
174 
175         #region 屬性及欄位
176 
177         /// <summary>
178         /// 是否使用單執行緒完成任務.
179         /// </summary>
180         public bool UseSingleThread { get; set; } = true;
181 
182         /// <summary>
183         /// 自動取消以前的任務。
184         /// </summary>
185         public bool AutoCancelPreviousTask { get; set; } = false;
186 
187         private bool _isDisposed;
188         private readonly ConcurrentQueue<AwaitableTask> _queue = new ConcurrentQueue<AwaitableTask>();
189         private Thread _thread;
190         private AutoResetEvent _autoResetEvent;
191 
192         #endregion
193 
194     }
View Code

1. 自動取消之前的任務 AutoCancelPreviousTask

內部使用執行緒,迴圈獲取當前任務列表,如果當前任務被標記NotExecutable不可執行,則跳過。

NotExecutable是何時標記的?

獲取任務時,標記所有獲取的任務為NotExecutable。直到任務列表中為空,那麼只執行最後獲取的一個任務。

2. 標記已經進行的任務無效 MarkTaskValid

當前進行的任務,無法中止,那麼標記為無效即可。

 1     /// <summary>
 2     /// 上一次非同步操作
 3     /// </summary>
 4     private AwaitableTask _lastDoingTask;
 5     private bool TryGetNextTask(out AwaitableTask task)
 6     {
 7         task = null;
 8         while (_queue.Count > 0)
 9         {
10             //獲取並從佇列中移除任務
11             if (_queue.TryDequeue(out task) && (!AutoCancelPreviousTask || _queue.Count == 0))
12             {
13                 //設定進行中的非同步操作無效
14                 _lastDoingTask?.MarkTaskValid();
15                 _lastDoingTask = task;
16                 return true;
17             }
18             //併發操作,設定任務不可執行
19             task.SetNotExecutable();
20         }
21         return false;
22     }

後續執行完後,根據此標記,設定操作結果為空。

 1     /// <summary>
 2     /// 執行非同步操作
 3     /// </summary>
 4     /// <typeparam name="T">返回結果型別</typeparam>
 5     /// <param name="func">非同步操作</param>
 6     /// <returns>isInvalid:非同步操作是否有效;result:非同步操作結果</returns>
 7     public async Task<(bool isInvalid, T reslut)> ExecuteAsync<T>(Func<Task<T>> func)
 8     {
 9         var task = GetExecutableTask(func);
10         var result = await await task;
11         if (!task.IsInvalid)
12         {
13             result = default(T);
14         }
15         return (task.IsInvalid, result);
16     }

實踐測試

啟動10個併發任務,測試實際的任務佇列併發操作管理:

 1     public MainWindow()
 2     {
 3         InitializeComponent();
 4         _asyncTaskQueue = new AsyncTaskQueue
 5         {
 6             AutoCancelPreviousTask = true,
 7             UseSingleThread = true
 8         };
 9     }
10     private AsyncTaskQueue _asyncTaskQueue;
11     private void ButtonBase_OnClick(object sender, RoutedEventArgs e)
12     {
13         // 快速啟動10個任務
14         for (var i = 1; i < 10; i++)
15         {
16             Test(_asyncTaskQueue, i);
17         }
18     }
19     public static async void Test(AsyncTaskQueue taskQueue, int num)
20     {
21         var result = await taskQueue.ExecuteAsync(async () =>
22         {
23             Debug.WriteLine("輸入:" + num);
24             // 長時間耗時任務
25             await Task.Delay(TimeSpan.FromSeconds(5));
26             return num * 100;
27         });
28         Debug.WriteLine($"{num}輸出的:" + result);
29     }

測試結果如下:

只有最後一次操作結果,才是有效的。其它9次操作,一次是無效的,8次操作被取消不執行。

 Demo,見Github