ASP.NET Core 3.x 併發限制
前言
Microsoft.AspNetCore.ConcurrencyLimiter AspNetCore3.0後增加的,用於傳入的請求進行排隊處理,避免執行緒池的不足.
我們日常開發中可能常做的給某web伺服器配置連線數以及,請求佇列大小,那麼今天我們看看如何在通過中介軟體形式實現一個併發量以及佇列長度限制.
Queue策略
新增Nuget
Install-Package Microsoft.AspNetCore.ConcurrencyLimiter
public void ConfigureServices(IServiceCollection services) { services.AddQueuePolicy(options => { //最大併發請求數 options.MaxConcurrentRequests = 2; //請求佇列長度限制 options.RequestQueueLimit = 1; }); services.AddControllers(); } public void Configure(IApplicationBuilder app, IWebHostEnvironment env) { //新增併發限制中介軟體 app.UseConcurrencyLimiter(); app.Run(async context => { Task.Delay(100).Wait(); // 100ms sync-over-async await context.Response.WriteAsync("Hello World!"); }); if (env.IsDevelopment()) { app.UseDeveloperExceptionPage(); } app.UseHttpsRedirection(); app.UseRouting(); app.UseAuthorization(); app.UseEndpoints(endpoints => { endpoints.MapControllers(); }); }
通過上面簡單的配置,我們就可以將他引入到我們的程式碼中,從而做併發量限制,以及佇列的長度;那麼問題來了,他是怎麼實現的呢?
public static IServiceCollection AddQueuePolicy(this IServiceCollection services, Action<QueuePolicyOptions> configure) { services.Configure(configure); services.AddSingleton<IQueuePolicy, QueuePolicy>(); return services; }
QueuePolicy採用的是SemaphoreSlim訊號量設計,SemaphoreSlim、Semaphore(訊號量)支援併發多執行緒進入被保護程式碼,物件在初始化時會指定 最大任務數量,當執行緒請求訪問資源,訊號量遞減,而當他們釋放時,訊號量計數又遞增。
/// <summary> /// 構造方法(初始化Queue策略) /// </summary> /// <param name="options"></param> public QueuePolicy(IOptions<QueuePolicyOptions> options) { _maxConcurrentRequests = options.Value.MaxConcurrentRequests; if (_maxConcurrentRequests <= 0) { throw new ArgumentException(nameof(_maxConcurrentRequests), "MaxConcurrentRequests must be a positive integer."); } _requestQueueLimit = options.Value.RequestQueueLimit; if (_requestQueueLimit < 0) { throw new ArgumentException(nameof(_requestQueueLimit), "The RequestQueueLimit cannot be a negative number."); } //使用SemaphoreSlim來限制任務最大個數 _serverSemaphore = new SemaphoreSlim(_maxConcurrentRequests); }
ConcurrencyLimiterMiddleware中介軟體
/// <summary>
/// Invokes the logic of the middleware.
/// </summary>
/// <param name="context">The <see cref="HttpContext"/>.</param>
/// <returns>A <see cref="Task"/> that completes when the request leaves.</returns>
public async Task Invoke(HttpContext context)
{
var waitInQueueTask = _queuePolicy.TryEnterAsync();
// Make sure we only ever call GetResult once on the TryEnterAsync ValueTask b/c it resets.
bool result;
if (waitInQueueTask.IsCompleted)
{
ConcurrencyLimiterEventSource.Log.QueueSkipped();
result = waitInQueueTask.Result;
}
else
{
using (ConcurrencyLimiterEventSource.Log.QueueTimer())
{
result = await waitInQueueTask;
}
}
if (result)
{
try
{
await _next(context);
}
finally
{
_queuePolicy.OnExit();
}
}
else
{
ConcurrencyLimiterEventSource.Log.RequestRejected();
ConcurrencyLimiterLog.RequestRejectedQueueFull(_logger);
context.Response.StatusCode = StatusCodes.Status503ServiceUnavailable;
await _onRejected(context);
}
}
每次當我們請求的時候首先會呼叫_queuePolicy.TryEnterAsync()
,進入該方法後先開啟一個私有lock鎖,再接著判斷總請求量是否≥(請求佇列限制的大小+最大併發請求數),如果當前數量超出了,那麼我直接丟擲,送你個503狀態;
if (result)
{
try
{
await _next(context);
}
finally
{
_queuePolicy.OnExit();
}
}
else
{
ConcurrencyLimiterEventSource.Log.RequestRejected();
ConcurrencyLimiterLog.RequestRejectedQueueFull(_logger);
context.Response.StatusCode = StatusCodes.Status503ServiceUnavailable;
await _onRejected(context);
}
問題來了,我這邊如果說還沒到你設定的大小呢,我這個請求沒有給你伺服器造不成壓力,那麼你給我處理一下吧.
await _serverSemaphore.WaitAsync();
非同步等待進入訊號量,如果沒有執行緒被授予對訊號量的訪問許可權,則進入執行保護程式碼;否則此執行緒將在此處等待,直到訊號量被釋放為止
lock (_totalRequestsLock)
{
if (TotalRequests >= _requestQueueLimit + _maxConcurrentRequests)
{
return false;
}
TotalRequests++;
}
//非同步等待進入訊號量,如果沒有執行緒被授予對訊號量的訪問許可權,則進入執行保護程式碼;否則此執行緒將在此處等待,直到訊號量被釋放為止
await _serverSemaphore.WaitAsync();
return true;
}
返回成功後那麼中介軟體這邊再進行處理,_queuePolicy.OnExit();
通過該呼叫進行呼叫_serverSemaphore.Release();
釋放訊號燈,再對總請求數遞減
Stack策略
再來看看另一種方法,棧策略,他是怎麼做的呢?一起來看看.再附加上如何使用的程式碼.
public void ConfigureServices(IServiceCollection services)
{
services.AddStackPolicy(options =>
{
//最大併發請求數
options.MaxConcurrentRequests = 2;
//請求佇列長度限制
options.RequestQueueLimit = 1;
});
services.AddControllers();
}
通過上面的配置,我們便可以對我們的應用程式執行出相應的策略.下面再來看看他是怎麼實現的呢
public static IServiceCollection AddStackPolicy(this IServiceCollection services, Action<QueuePolicyOptions> configure)
{
services.Configure(configure);
services.AddSingleton<IQueuePolicy, StackPolicy>();
return services;
}
可以看到這次是通過StackPolicy
類做的策略.來一起來看看主要的方法
/// <summary>
/// 構造方法(初始化引數)
/// </summary>
/// <param name="options"></param>
public StackPolicy(IOptions<QueuePolicyOptions> options)
{
//棧分配
_buffer = new List<ResettableBooleanCompletionSource>();
//佇列大小
_maxQueueCapacity = options.Value.RequestQueueLimit;
//最大併發請求數
_maxConcurrentRequests = options.Value.MaxConcurrentRequests;
//剩餘可用空間
_freeServerSpots = options.Value.MaxConcurrentRequests;
}
當我們通過中介軟體請求呼叫,_queuePolicy.TryEnterAsync()
時,首先會判斷我們是否還有訪問請求次數,如果_freeServerSpots>0,那麼則直接給我們返回true,讓中介軟體直接去執行下一步,如果當前佇列=我們設定的佇列大小的話,那我們需要取消先前請求;每次取消都是先取消之前的保留後面的請求;
public ValueTask<bool> TryEnterAsync()
{
lock (_bufferLock)
{
if (_freeServerSpots > 0)
{
_freeServerSpots--;
return _trueTask;
}
// 如果佇列滿了,取消先前的請求
if (_queueLength == _maxQueueCapacity)
{
_hasReachedCapacity = true;
_buffer[_head].Complete(false);
_queueLength--;
}
var tcs = _cachedResettableTCS ??= new ResettableBooleanCompletionSource(this);
_cachedResettableTCS = null;
if (_hasReachedCapacity || _queueLength < _buffer.Count)
{
_buffer[_head] = tcs;
}
else
{
_buffer.Add(tcs);
}
_queueLength++;
// increment _head for next time
_head++;
if (_head == _maxQueueCapacity)
{
_head = 0;
}
return tcs.GetValueTask();
}
}
當我們請求後呼叫_queuePolicy.OnExit();
出棧,再將請求長度遞減
public void OnExit()
{
lock (_bufferLock)
{
if (_queueLength == 0)
{
_freeServerSpots++;
if (_freeServerSpots > _maxConcurrentRequests)
{
_freeServerSpots--;
throw new InvalidOperationException("OnExit must only be called once per successful call to TryEnterAsync");
}
return;
}
// step backwards and launch a new task
if (_head == 0)
{
_head = _maxQueueCapacity - 1;
}
else
{
_head--;
}
//退出,出棧
_buffer[_head].Complete(true);
_queueLength--;
}
}
總結
基於棧結構的特點,在實際應用中,通常只會對棧執行以下兩種操作:
- 向棧中新增元素,此過程被稱為"進棧"(入棧或壓棧);
- 從棧中提取出指定元素,此過程被稱為"出棧"(或彈棧);
佇列儲存結構的實現有以下兩種方式:
- 順序佇列:在順序表的基礎上實現的佇列結構;
- 鏈佇列:在連結串列的基礎上實現的佇列結構;