.net core Redis訊息佇列中介軟體【InitQ】
阿新 • • 發佈:2021-06-28
前言
這是一篇拖更很久的部落格,不知不覺InitQ在nuget下載量已經過15K了,奈何胸無點墨也不曉得怎麼寫(懶),隨便在github上掛了個md,現在好好嘮嘮如何在redis裡使用佇列
佇列快取分散式 非同步調優堆配置 ------(來自某位不知名碼友)
誕生背景
redis在專案中使用的越來越頻繁,通常我們是用來做快取,使用較多的就是String,Hash這兩種型別,以及分散式鎖,redis的List型別,就可以用於訊息佇列,使用起來更加簡單,且速度更快,非常適合子服務內部之間的訊息流轉,創造靈感來自於楊老闆的CAP(地址:https://www.cnblogs.com/tibos/p/11858095.html
安裝環境
- .net core版本:2.1
- redis版本:3.0以上
特點
1.通過註解的方式,訂閱佇列
2.可以設定消費訊息的頻次
3.支援訊息廣播
4.支援延遲佇列
使用介紹
-
1.獲取initQ包
方案A. install-package InitQ
方案B. nuget包管理工具搜尋 InitQ -
2.新增中介軟體(該中介軟體依賴 StackExchange.Redis)
services.AddInitQ(m=> { m.SuspendTime = 1000; m.IntervalTime = 1000; m.ConnectionString = "127.0.0.1,connectTimeout=15000,syncTimeout=5000,password=123456"; m.ListSubscribe = new List<Type>() { typeof(RedisSubscribeA), typeof(RedisSubscribeB) }; m.ShowLog = false; });
-
3.配置說明
public class InitQOptions { /// <summary> /// redis連線字串 /// </summary> public string ConnectionString { get; set; } /// <summary> /// 沒訊息時掛起時長(毫秒) /// </summary> public int SuspendTime { get; set; } /// <summary> /// 每次消費訊息間隔時間(毫秒) /// </summary> public int IntervalTime { get; set; } /// <summary> /// 是否顯示日誌 /// </summary> public bool ShowLog { get; set; } /// <summary> /// 需要注入的型別 /// </summary> public IList<Type> ListSubscribe { get; set; } public InitQOptions() { ConnectionString = ""; IntervalTime = 0; SuspendTime = 1000; ShowLog = false; } }
訊息釋出/訂閱
訊息的釋出/訂閱是最基礎的功能,這裡做了幾個優化
- 採用的是長輪詢模式,可以控制訊息消費的頻次,以及輪詢空訊息的間隔,避免資源浪費
- 支援多個類訂閱訊息,可以很方便的根據業務進行分類,前提是這些類 必須註冊
- 支援多執行緒消費訊息(在執行耗時任務的時候,非常有用)
示例如下(Thread.Sleep):
public class RedisSubscribeA: IRedisSubscribe
{
[Subscribe("tibos_test_1")]
private async Task SubRedisTest(string msg)
{
Console.WriteLine($"A類--->當前時間:{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")} 訂閱者A消費訊息:{msg}");
Thread.Sleep(3000); //使用堵塞執行緒模式,同步延時
Console.WriteLine($"A類<---當前時間:{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")} 訂閱者A消費訊息:{msg} 完成");
}
}
public class RedisSubscribeA: IRedisSubscribe
{
[Subscribe("tibos_test_1")]
private async Task SubRedisTest(string msg)
{
Console.WriteLine($"A類--->當前時間:{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")} 訂閱者A消費訊息:{msg}");
Thread.Sleep(3000); //使用堵塞執行緒模式,同步延時
Console.WriteLine($"A類<---當前時間:{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")} 訂閱者A消費訊息:{msg} 完成");
}
[Subscribe("tibos_test_1")]
private async Task SubRedisTest2(string msg)
{
Console.WriteLine($"A類--->當前時間:{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")} 訂閱者A消費訊息:{msg}");
Thread.Sleep(3000); //使用堵塞執行緒模式,同步延時
Console.WriteLine($"A類<---當前時間:{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")} 訂閱者A消費訊息:{msg} 完成");
}
}
示例如下(Task.Delay):
[Subscribe("tibos_test_1")]
private async Task SubRedisTest(string msg)
{
Console.WriteLine($"A類--->當前時間:{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")} 訂閱者A消費訊息:{msg}");
await Task.Delay(3000); //使用非堵塞執行緒模式,非同步延時
Console.WriteLine($"A類<---當前時間:{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")} 訂閱者A消費訊息:{msg} 完成");
}
根據業務情況,合理的選擇堵塞模式
- 1.訂閱釋出者
using (var scope = _provider.GetRequiredService<IServiceScopeFactory>().CreateScope()) { //redis物件 var _redis = scope.ServiceProvider.GetService<ICacheService>(); //迴圈向 tibos_test_1 佇列傳送訊息 for (int i = 0; i < 1000; i++) { await _redis.ListRightPushAsync("tibos_test_1", $"我是訊息{i + 1}號"); } }
- 2.定義消費者類 RedisSubscribeA
public class RedisSubscribeA: IRedisSubscribe { [Subscribe("tibos_test_1")] private async Task SubRedisTest(string msg) { Console.WriteLine($"A類--->訂閱者A訊息訊息:{msg}"); } [Subscribe("tibos_test_1")] private async Task SubRedisTest1(string msg) { Console.WriteLine($"A類--->訂閱者A1訊息訊息:{msg}"); } [Subscribe("tibos_test_1")] private async Task SubRedisTest2(string msg) { Console.WriteLine($"A類--->訂閱者A2訊息訊息:{msg}"); } [Subscribe("tibos_test_1")] private async Task SubRedisTest3(string msg) { Console.WriteLine($"A類--->訂閱者A3訊息訊息:{msg}"); } }
- 3.定義消費者類 RedisSubscribeB
public class RedisSubscribeB : IRedisSubscribe { /// <summary> /// 測試 /// </summary> /// <param name="msg"></param> /// <returns></returns> [Subscribe("tibos_test_1")] private async Task SubRedisTest(string msg) { Console.WriteLine($"B類--->訂閱者B消費訊息:{msg}"); } }
訊息廣播/訂閱
訊息廣播是StackExchange.Redis已經封裝好的,我們只用起個執行緒監聽即可,只要監聽了這個key的執行緒,都會收到訊息
- 1.訂閱訊息通道,訂閱者需要在程式初始化的時候啟動一個執行緒偵聽通道,這裡使用HostedService來實現,並註冊到容器
public class ChannelSubscribeA : IHostedService, IDisposable { private readonly IServiceProvider _provider; private readonly ILogger _logger; public ChannelSubscribeA(ILogger<TestMain> logger, IServiceProvider provider) { _logger = logger; _provider = provider; } public void Dispose() { _logger.LogInformation("退出"); } public Task StartAsync(CancellationToken cancellationToken) { _logger.LogInformation("程式啟動"); Task.Run(async () => { using (var scope = _provider.GetRequiredService<IServiceScopeFactory>().CreateScope()) { //redis物件 var _redis = scope.ServiceProvider.GetService<ICacheService>(); await _redis.SubscribeAsync("test_channel", new Action<RedisChannel, RedisValue>((channel, message) => { Console.WriteLine("test_channel" + " 訂閱服務A收到訊息:" + message); })); } }); return Task.CompletedTask; } public Task StopAsync(CancellationToken cancellationToken) { _logger.LogInformation("結束"); return Task.CompletedTask; } }
public class ChannelSubscribeB : IHostedService, IDisposable { private readonly IServiceProvider _provider; private readonly ILogger _logger; public ChannelSubscribeB(ILogger<TestMain> logger, IServiceProvider provider) { _logger = logger; _provider = provider; } public void Dispose() { _logger.LogInformation("退出"); } public Task StartAsync(CancellationToken cancellationToken) { _logger.LogInformation("程式啟動"); Task.Run(async () => { using (var scope = _provider.GetRequiredService<IServiceScopeFactory>().CreateScope()) { //redis物件 var _redis = scope.ServiceProvider.GetService<ICacheService>(); await _redis.SubscribeAsync("test_channel", new Action<RedisChannel, RedisValue>((channel, message) => { Console.WriteLine("test_channel" + " 訂閱服務B收到訊息:" + message); })); } }); return Task.CompletedTask; } public Task StopAsync(CancellationToken cancellationToken) { _logger.LogInformation("結束"); return Task.CompletedTask; } }
- 2.將HostedService類注入到容器
services.AddHostedService<ChannelSubscribeA>(); services.AddHostedService<ChannelSubscribeB>();
- 3.廣播訊息
using (var scope = _provider.GetRequiredService<IServiceScopeFactory>().CreateScope()) { //redis物件 var _redis = scope.ServiceProvider.GetService<ICacheService>(); for (int i = 0; i < 1000; i++) { await _redis.PublishAsync("test_channel", $"往通道傳送第{i}條訊息"); } }
延遲訊息
延遲訊息非常適用處理一些定時任務的場景,如訂單15分鐘未付款,自動取消, xxx天后,自動續費...... 這裡使用zset+redis鎖來實現,這裡的操作方式,跟釋出/定義非常類似
寫入延遲訊息:SortedSetAddAsync
註解使用:SubscribeDelay
-
1.定義釋出者
Task.Run(async () => { using (var scope = _provider.GetRequiredService<IServiceScopeFactory>().CreateScope()) { //redis物件 var _redis = scope.ServiceProvider.GetService<ICacheService>(); for (int i = 0; i < 100; i++) { var dt = DateTime.Now.AddSeconds(3 * (i + 1)); //key:redis裡的key,唯一 //msg:任務 //time:延時執行的時間 await _redis.SortedSetAddAsync("test_0625", $"延遲任務,第{i + 1}個元素,執行時間:{dt.ToString("yyyy-MM-dd HH:mm:ss")}", dt); } } });
-
2.定義消費者
//延遲佇列 [SubscribeDelay("test_0625")] private async Task SubRedisTest1(string msg) { Console.WriteLine($"A類--->當前時間:{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")} 訂閱者延遲佇列訊息開始--->{msg}"); //模擬任務執行耗時 await Task.Delay(TimeSpan.FromSeconds(3)); Console.WriteLine($"A類--->{msg} 結束<---"); }
版本
- V1.0 更新時間:2019-12-30
版本庫:
作者:提伯斯