.NET Core中使用IHostedService結合佇列執行定時任務
最近遇到了這樣的場景:每隔一段時間,需要在後臺使用佇列對一批資料進行業務處理。
Quartz.NET是一種選擇,在 .NET Core中,可以使用IHostedService
執行後臺定時任務。在本篇中,首先嚐試把佇列還原到最簡單、原始的狀態,然後給出以上場景問題的具體解決方案。
假設一個佇列有8個元素。現在abcd依次進入佇列。
0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 |
---|---|---|---|---|---|---|---|
a | b | c | d | ||||
head | tail |
ab依次出佇列。
0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 |
---|---|---|---|---|---|---|---|
c | d | ||||||
head | tail |
可以想象,隨著不斷地入列出列,head和tail的位置不斷往後,當tail在7號位的時候,雖然佇列裡還有空間,但此時資料就無法入隊列了。
如何才可以繼續入佇列呢?
首先想到的是資料搬移。當資料無法進入佇列,首先讓佇列項出列,進入到另外一個新佇列,這個新佇列就可以再次接收資料入隊列了。但是,搬移整個佇列中的資料的時間複雜度為O(n),而原先出佇列的時間複雜度是O(1),這種方式不夠理想。
還有一種思路是使用迴圈佇列。當tail指向最後一個位置,此時有新的資料進入佇列,tail就來到頭部指向0號位置,這樣這個佇列就可以迴圈使用了。
0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 |
---|---|---|---|---|---|---|---|
h | i | j | |||||
head | tail |
現在a入棧。
0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 |
---|---|---|---|---|---|---|---|
h | i | j | a | ||||
tail | head |
佇列有很多種實現。
比如在生產消費模型中可以用阻塞佇列。當生產佇列為空的時候,為了不讓消費者取資料,生產佇列的Dequeue行為會被阻塞;而當生產佇列滿的時候,為了不讓更多的資料進來,生產佇列的Enqueue行為被阻塞。
執行緒安全的佇列叫併發佇列,如C#中的ConcurrentQueue
。
執行緒池內部也使用了佇列機制。因為CPU的資源是有限的,過多的執行緒會導致CPU頻繁地線上程之間切換。執行緒池內通過維護一定數量的執行緒來減輕CPU的負擔。當執行緒池沒有多餘的執行緒可供使用,請求過來,一種方式是拒絕請求,另外一種方式是讓請求佇列阻塞,等到執行緒池內有執行緒可供使用,請求隊列出列執行。用連結串列實現的佇列是無界佇列(unbounded queue),這種做法可能會導致過多的請求在排隊,等待響應時間過長。用陣列實現的佇列是有界佇列(bounded queue),當執行緒池已滿,請求過來就會被拒絕。對有界佇列來說陣列的大小設定很講究。
來模擬一個數組佇列。
public class ArrayQueue
{
private string[] items;
private int n = 0; //陣列長度
private int head = 0;
private int tail = 0;
public ArrayQueue(int capacity)
{
n = capacity;
items = new string[capacity];
}
public bool Enqueue(string item)
{
if(tail==n){
return false;
}
items[tail] = item;
++tail;
return true;
}
public string Dequeue()
{
if(head==null){
return null;
}
string ret = items[head];
++head;
return ret;
}
}
以上就是一個最簡單的、用陣列實現的佇列。
再次回到要解決的場景問題。解決思路大致是:實現IHostedService
介面,在其中執行定時任務,每次把佇列項放到佇列中,並定義出佇列的方法,在其中執行業務邏輯。
關於佇列,通過以下的步驟使其在後臺執行。
- 佇列項(MessageQueueItem):具備唯一標識、委託、新增到佇列中的時間等屬性
- 佇列(MessageQueue):維護著
Dictionary<string, MessageQueueItem>
靜態字典集合 MessageQueueUtility
類:決定著如何執行,比如佇列執行的間隔時間、垃圾回收MessageQueueThreadUtility
類:維護佇列執行緒,提供佇列在後臺執行的方法- 在
Startup.cs
中的Configure
中呼叫MessageQueueThreadUtility
中的方法使佇列在後臺執行
佇列項(MessageQueueItem)
public class MessageQueueItem
{
public MessageQueueItem(string key, Action action, string description=null)
{
Key = key;
Action = action;
Description = description;
AddTime = DateTime.Now;
}
public string Key{get;set;}
public Actioin Action{get;set;}
public DateTime AddTime{get;set;}
public string Description{get;set;}
}
佇列(MessageQueue),維護著針對佇列項的一個靜態字典集合。
public class MessageQueue
{
public static Dictionary<string, MessageQueueItem> MessageQueueDictionary = new Dictionary<string, MessageQueueItem>(StringComparer.OrdinalIgnoreCase);
public static object MessageQueueSyncLock = new object();
public static object OperateLock = new object();
public static void OperateQueue()
{
lock(OperateLock)
{
var mq = new MessageQueue();
var key = mq.GetCurrentKey();
while(!string.IsNullOrEmpty(key))
{
var mqItem = mq.GetItem(key);
mqItem.Action();
mq.Remove(key);
key = mq.GetCurrentKey();
}
}
}
public string GetCurrentKey()
{
lock(MessageQueueSyncLock)
{
return MessageQueueDictionary.Keys.FirstOrDefault();
}
}
public MessageQueueItem GetItem(string key)
{
lock(MessageQueueSyncLock)
{
if(MessageQueueDictionary.ContainsKey(key))
{
return MessageQueueDictionary[key];
}
return null;
}
}
public void Remove(string key)
{
lock(MessageQueueSyncLock)
{
if(MessageQueueDictionary.ContainsKey(key))
{
MessageQueueDictionary.Remove(key);
}
}
}
public MessageQueueItem Add(string key, Action actioin)
{
lock(MessageQueueSyncLock)
{
var mqItem = new MessageQueueItem(key, action);
MessageQueueDictionary[key] = mqItem;
return mqItem;
}
}
public int GetCount()
{
lock(MessageQueueSyncLock)
{
return MessageQueueDictionary.Count;
}
}
}
MessageQueueUtility
類, 決定著佇列執行的節奏。
public class MessageQueueUtility
{
private readonly int _sleepMilliSeconds;
public MessageQueueUtility(int sleepMilliSeconds=1000)
{
_sleepMilliSeconds = sleepMilliSeoncds;
}
~MessageQueueUtility()
{
MessageQueue.OperateQueue();
}
public void Run
{
do
{
MessageQueue.OperateQueue();
Thread.Sleep(_sleepMilliSeconds);
} while(true)
}
}
MessageQueueThreadUtility
類,管理佇列的執行緒,並讓其在後臺執行。
public static class MessageQueueThreadUtility
{
public static Dictionary<string, Thread> AsyncThreadCollection = new Dictioanry<string, Thread>();
public static void Register(string threadUniqueName)
{
{
MessageQueueUtility messageQueueUtility = new MessageQueueUtility();
Thread messageQueueThread = new Thread(messageQueueUtility.Run){
Name = threadUniqueName
};
AsyncThreadCollection.Add(messageQueueThread.Name, messageQueueThread);
}
AsyncThreadCollection.Values.ToList().ForEach(z => {
z.IsBackground = true;
z.Start();
});
}
}
Startup.cs
中註冊。
public class Startup
{
public IServiceProvider ConfigureServices(IServiceCollection services)
{
...
}
public void Configure(IApplicationBuilder app, IHostingEnvironment env...)
{
RegisterMessageQueueThreads();
}
private void RegisterMessageQueueThreads()
{
MessageQueueThreadUtility.Register("");
}
}
最後在IHostedService
的實現類中把佇列項丟給佇列。
public class MyBackgroundSerivce : IHostedService, IDisposable
{
private Timer _timer;
public IServiceProvider Services{get;}
public MyBackgroundService(IServiceProvider services)
{
Serivces = services;
}
public void Dispose()
{
_timer?.Dispose();
}
public Task StartAsync(CancellationToken cancellationToken)
{
_timer = new Timer(DoWork, null, TimeSpan.Zero, TimeSpan.FromSeconds(10));
return Task.CompletedTask;
}
public Task StopAsync(CancellationToken cancellationToken)
{
_timer?.Change(Timeout.Infinite,0);
return Task.CompletedTask;
}
private void DoWork(object state)
{
using(var scope = Services.CreateScope())
{
using(var db = scope.ServiceProvider.GetRequiredService<MyDbContext>())
{
...
var mq = new MessageQueue();
mq.Add("somekey", DealQueueItem);
}
}
}
private void DealQueueItem()
{
var mq = new MessageQueue();
var key = mq.GetCurrentKey();
var item = mq.GetItem(key);
if(item!=null)
{
using(var scope = Services.CreateScope())
{
using(var db = scope.ServiceProvider.GetRequiredService<MyDbContext>())
{
//執行業務邏輯
}
}
}
}
}
當需要使用上下文的時候,首先通過IServiceProvider
的CreateScope
方法得到ISerivceScope
,再通過它的ServiceProvider
屬性獲取依賴倒置容器中的上下文服務。
以上,用IHostedService
結合佇列解決了開篇提到的場景問題,如果您有很好的想法,我們一起交流吧。文中的佇列部分來自"盛派網路"的Senparc.Weixin SDK原始碼。