.NetCore使用Redis,StackExchange.Redis佇列,釋出與訂閱,分散式鎖的簡單使用
環境:之前一直是使用serverStack.Redis的客服端,
今天來使用一下StackExchange.Redis(個人感覺更加的人性化一些,也是免費的,效能也不會差太多),
版本為StackExchange.Redis V2.1.58 ,Core3.1
簡單的說明(專業的術語參考資料網路和官網):官網地址:https://www.redis.net.cn/
Redis是一個開源的 ,由C語言編寫、支援網路、可基於記憶體亦可持久化的日誌型、Key-Value資料庫,並提供多種語言的API。
Redis 是一個高效能的key-value資料庫。Redis的出現,很大程度補償了memcached這類key/value儲存的不足,
提供了Java,C/C++,C#,PHP,JavaScript,Perl,Object-C,Python,Ruby,Erlang等客戶端,使用很方便。
優點:
1 Redis讀寫效能優異,從記憶體當中進行IO讀寫速度快,支援超過100K+每秒的讀寫頻率。
2 Redis支援Strings,
Lists, Hashes, Sets,Ordered Sets等資料型別操作。
3 Redis支援資料持久化,支援AOF和RDB兩種持久化方式
4 Redis支援主從複製,主機會自動將資料同步到從機,可以進行讀寫分離。
5 Redis的所有操作都是原子性的,同時Redis還支援對幾個操作全並後的原子性執行。
6 Redis是單執行緒多CPU,不需要考慮加鎖釋放鎖,也就沒有死鎖的問題,效率高。
1:redis佇列值入隊出隊,截圖效果:
優化之前將近50秒了,這實在太慢了
優化後的效果:為5.55s的樣子
2:redis釋出與訂閱截圖效果:(一個釋出者,四個訂閱者)
3:redis秒殺,截圖如下:單個程序秒殺ok
開多個程序時,會有超賣的現象:
4:redis值分散式鎖,截圖:
但是這樣會比較耗資源
分散式鎖ok庫存為零就不在請求直接拋異常即可
上面通過測試的截圖,簡單的介紹了,Redis的佇列(入隊和出隊),Redis釋出與訂閱,Redis分散式鎖的使用,現在直接上程式碼 :
出隊入隊的WebApi Core3.1
1 using System; 2 using System.Collections.Generic; 3 using System.Linq; 4 using System.Threading.Tasks; 5 6 namespace WebApp.Controllers 7 { 8 using Microsoft.AspNetCore.Mvc; 9 using RedisPublishAndSubHelper; 10 [Route("api/[Controller]")] 11 [ApiController] 12 public class RedisTestController 13 { 14 [HttpGet("EnqueueMsg")] 15 public async Task<ApiResultObject> EnqueueMsgAsync(string rediskey, string redisValue) 16 { 17 ApiResultObject obj = new ApiResultObject(); 18 try 19 { 20 long enqueueLong = default; 21 for (int i = 0; i < 1000; i++) 22 { 23 enqueueLong = await MyRedisSubPublishHelper.EnqueueListLeftPushAsync(rediskey, redisValue + i); 24 } 25 obj.Code = ResultCode.Success; 26 obj.Data = "入隊的資料長度:" + enqueueLong; 27 obj.Msg = "入隊成功!"; 28 } 29 catch (Exception ex) 30 { 31 32 obj.Msg = $"入隊異常,原因:{ex.Message}"; 33 } 34 return obj; 35 } 36 [HttpGet("DequeueMsg")] 37 public async Task<ApiResultObject> DequeueMsgAsync(string rediskey) 38 { 39 ApiResultObject obj = new ApiResultObject(); 40 try 41 { 42 string dequeueMsg = await MyRedisSubPublishHelper.DequeueListPopRightAsync(rediskey); 43 obj.Code = ResultCode.Success; 44 obj.Data = $"出隊的資料是:{dequeueMsg}"; 45 obj.Msg = "入隊成功!"; 46 } 47 catch (Exception ex) 48 { 49 obj.Msg = $"入隊異常,原因:{ex.Message}"; 50 } 51 return obj; 52 } 53 } 54 }View Code
出隊入隊的後端code WebApi:
1 using System; 2 using System.Collections.Generic; 3 using System.Linq; 4 using System.Threading.Tasks; 5 6 namespace WebApp.Controllers 7 { 8 using Microsoft.AspNetCore.Mvc; 9 using RedisPublishAndSubHelper; 10 [Route("api/[Controller]")] 11 [ApiController] 12 public class RedisTestController 13 { 14 [HttpGet("EnqueueMsg")] 15 public async Task<ApiResultObject> EnqueueMsgAsync(string rediskey, string redisValue) 16 { 17 ApiResultObject obj = new ApiResultObject(); 18 try 19 { 20 long enqueueLong = default; 21 for (int i = 0; i < 1000; i++) 22 { 23 enqueueLong = await MyRedisSubPublishHelper.EnqueueListLeftPushAsync(rediskey, redisValue + i); 24 } 25 obj.Code = ResultCode.Success; 26 obj.Data = "入隊的資料長度:" + enqueueLong; 27 obj.Msg = "入隊成功!"; 28 } 29 catch (Exception ex) 30 { 31 32 obj.Msg = $"入隊異常,原因:{ex.Message}"; 33 } 34 return obj; 35 } 36 [HttpGet("DequeueMsg")] 37 public async Task<ApiResultObject> DequeueMsgAsync(string rediskey) 38 { 39 ApiResultObject obj = new ApiResultObject(); 40 try 41 { 42 string dequeueMsg = await MyRedisSubPublishHelper.DequeueListPopRightAsync(rediskey); 43 obj.Code = ResultCode.Success; 44 obj.Data = $"出隊的資料是:{dequeueMsg}"; 45 obj.Msg = "入隊成功!"; 46 } 47 catch (Exception ex) 48 { 49 obj.Msg = $"入隊異常,原因:{ex.Message}"; 50 } 51 return obj; 52 } 53 } 54 }View Code
入隊以及秒殺分散式鎖的客服端的Code:
1 using System; 2 using System.Threading; 3 using System.Threading.Tasks; 4 5 namespace RedisPublishService 6 { 7 using RedisPublishAndSubHelper; 8 class Program 9 { 10 static void Main(string[] args) 11 { 12 #region 入隊的code 13 { 14 int Index = 100000; 15 while (Index > 0) 16 { 17 //string msg = Console.ReadLine(); 18 new MyRedisSubPublishHelper().PublishMessage("nihaofengge", $"你好風哥:Guid值是:{DateTime.Now}{Guid.NewGuid().ToString()}"); 19 Console.WriteLine("釋出成功!"); 20 Index -= 1; 21 } 22 Console.ReadKey(); 23 } 24 #endregion 25 26 #region 秒殺的code 27 { 28 try 29 { 30 Console.WriteLine("秒殺開始。。。。。"); 31 for (int i = 0; i < 200; i++) 32 { 33 Task.Run(() => 34 { 35 MyRedisSubPublishHelper.LockByRedis("mstest"); 36 string productCount = MyRedisHelper.StringGet("productcount"); 37 int pcount = int.Parse(productCount); 38 if (pcount > 0) 39 { 40 long dlong = MyRedisHelper.StringDec("productcount"); 41 Console.WriteLine($"秒殺成功,商品庫存:{dlong}"); 42 pcount -= 1; 43 System.Threading.Thread.Sleep(30); 44 } 45 else 46 { 47 Console.WriteLine($"秒殺失敗,商品庫存為零了!"); 48 throw new Exception("產品秒殺數量為零!");//載入這裡會比較保險 49 } 50 MyRedisSubPublishHelper.UnLockByRedis("mstest"); 51 }).Wait(); 52 } 53 } 54 catch (Exception ex) 55 { 56 Console.WriteLine($"產品已經秒殺完畢,原因:{ex.Message}"); 57 } 58 Console.ReadKey(); 59 } 60 #endregion 61 } 62 } 63 }View Code
完整的code RedisHelper幫助類(測試並使用了部分方法的封裝),
1 using System; 2 using System.Collections.Generic; 3 using System.Linq; 4 using System.Threading.Tasks; 5 6 namespace RedisPublishAndSubHelper 7 { 8 using StackExchange.Redis; 9 using StackExchange; 10 using System.Threading; 11 12 public class MyRedisHelper 13 { 14 private static readonly string connectionRedisStr = string.Empty;// "47.107.87.32:6379,connectTimeout=1000,connectRetry=3,syncTimeout=10000"; 15 static MyRedisHelper() 16 { 17 //在這裡來初始化一些配置資訊 18 connectionRedisStr = "12.23.45.12:6379,connectTimeout=1000,connectRetry=3,syncTimeout=10000"; 19 } 20 21 #region Redis string簡單的常見同步方法操作 22 public static bool StringSet(string key, string stringValue, double senconds = 60) 23 { 24 using (var conn = ConnectionMultiplexer.Connect(connectionRedisStr)) 25 { 26 IDatabase db = conn.GetDatabase(); 27 return db.StringSet(key, stringValue, TimeSpan.FromSeconds(senconds)); 28 } 29 } 30 public static string StringGet(string key) 31 { 32 using (var conn = ConnectionMultiplexer.Connect(connectionRedisStr)) 33 { 34 IDatabase db = conn.GetDatabase(); 35 return db.StringGet(key); 36 } 37 } 38 39 public static long StringInc(string key) 40 { 41 using (var conn = ConnectionMultiplexer.Connect(connectionRedisStr)) 42 { 43 IDatabase db = conn.GetDatabase(); 44 return db.StringIncrement(key); 45 } 46 } 47 48 public static long StringDec(string key) 49 { 50 using (var conn = ConnectionMultiplexer.Connect(connectionRedisStr)) 51 { 52 IDatabase db = conn.GetDatabase(); 53 return db.StringDecrement(key); 54 } 55 } 56 public static bool KeyExists(string key) 57 { 58 using (var conn = ConnectionMultiplexer.Connect(connectionRedisStr)) 59 { 60 IDatabase db = conn.GetDatabase(); 61 return db.KeyExists(key); 62 } 63 } 64 #endregion 65 66 #region List Hash, Set,Zset 大同小異的使用,比較簡單,後續有時間再補上 67 68 #endregion 69 70 #region 入隊出隊 71 72 #region 入隊 73 /// <summary> 74 /// 入隊right 75 /// </summary> 76 /// <param name="queueName"></param> 77 /// <param name="redisValue"></param> 78 /// <returns></returns> 79 public static long EnqueueListRightPush(RedisKey queueName, RedisValue redisValue) 80 { 81 using (var conn = ConnectionMultiplexer.Connect(connectionRedisStr)) 82 { 83 return conn.GetDatabase().ListRightPush(queueName, redisValue); 84 } 85 } 86 /// <summary> 87 /// 入隊left 88 /// </summary> 89 /// <param name="queueName"></param> 90 /// <param name="redisvalue"></param> 91 /// <returns></returns> 92 public static long EnqueueListLeftPush(RedisKey queueName, RedisValue redisvalue) 93 { 94 using (var conn = ConnectionMultiplexer.Connect(connectionRedisStr)) 95 { 96 return conn.GetDatabase().ListLeftPush(queueName, redisvalue); 97 } 98 } 99 /// <summary> 100 /// 入隊left非同步 101 /// </summary> 102 /// <param name="queueName"></param> 103 /// <param name="redisvalue"></param> 104 /// <returns></returns> 105 public static async Task<long> EnqueueListLeftPushAsync(RedisKey queueName, RedisValue redisvalue) 106 { 107 using (var conn = await ConnectionMultiplexer.ConnectAsync(connectionRedisStr)) 108 { 109 return await conn.GetDatabase().ListLeftPushAsync(queueName, redisvalue); 110 } 111 } 112 /// <summary> 113 /// 獲取佇列的長度 114 /// </summary> 115 /// <param name="queueName"></param> 116 /// <returns></returns> 117 public static long EnqueueListLength(RedisKey queueName) 118 { 119 using (var conn = ConnectionMultiplexer.Connect(connectionRedisStr)) 120 { 121 return conn.GetDatabase().ListLength(queueName); 122 } 123 } 124 125 #endregion 126 127 #region 出隊 128 public static string DequeueListPopLeft(RedisKey queueName) 129 { 130 using (var conn = ConnectionMultiplexer.Connect(connectionRedisStr)) 131 { 132 IDatabase database = conn.GetDatabase(); 133 int count = database.ListRange(queueName).Length; 134 if (count <= 0) 135 { 136 throw new Exception($"佇列{queueName}資料為零"); 137 } 138 string redisValue = database.ListLeftPop(queueName); 139 if (!string.IsNullOrEmpty(redisValue)) 140 return redisValue; 141 else 142 return string.Empty; 143 } 144 } 145 public static string DequeueListPopRight(RedisKey queueName) 146 { 147 using (var conn = ConnectionMultiplexer.Connect(connectionRedisStr)) 148 { 149 IDatabase database = conn.GetDatabase(); 150 int count = database.ListRange(queueName).Length; 151 if (count <= 0) 152 { 153 throw new Exception($"佇列{queueName}資料為零"); 154 } 155 string redisValue = conn.GetDatabase().ListRightPop(queueName); 156 if (!string.IsNullOrEmpty(redisValue)) 157 return redisValue; 158 else 159 return string.Empty; 160 } 161 } 162 public static async Task<string> DequeueListPopRightAsync(RedisKey queueName) 163 { 164 using (var conn = await ConnectionMultiplexer.ConnectAsync(connectionRedisStr)) 165 { 166 IDatabase database = conn.GetDatabase(); 167 int count = (await database.ListRangeAsync(queueName)).Length; 168 if (count <= 0) 169 { 170 throw new Exception($"佇列{queueName}資料為零"); 171 } 172 string redisValue = await conn.GetDatabase().ListRightPopAsync(queueName); 173 if (!string.IsNullOrEmpty(redisValue)) 174 return redisValue; 175 else 176 return string.Empty; 177 } 178 } 179 #endregion 180 181 #endregion 182 183 #region 分散式鎖 184 public static void LockByRedis(string key, int expireTimeSeconds = 10) 185 { 186 try 187 { 188 IDatabase database1 = ConnectionMultiplexer.Connect(connectionRedisStr).GetDatabase(); 189 while (true) 190 { 191 expireTimeSeconds = expireTimeSeconds > 20 ? 10 : expireTimeSeconds; 192 bool lockflag = database1.LockTake(key, Thread.CurrentThread.ManagedThreadId, TimeSpan.FromSeconds(expireTimeSeconds)); 193 if (lockflag) 194 { 195 break; 196 } 197 } 198 } 199 catch (Exception ex) 200 { 201 throw new Exception($"Redis加鎖異常:原因{ex.Message}"); 202 } 203 } 204 205 public static bool UnLockByRedis(string key) 206 { 207 ConnectionMultiplexer conn = ConnectionMultiplexer.Connect(connectionRedisStr); 208 try 209 { 210 IDatabase database1 = conn.GetDatabase(); 211 return database1.LockRelease(key, Thread.CurrentThread.ManagedThreadId); 212 } 213 catch (Exception ex) 214 { 215 throw new Exception($"Redis加鎖異常:原因{ex.Message}"); 216 } 217 finally 218 { 219 if (conn != null) 220 { 221 conn.Close(); 222 conn.Dispose(); 223 } 224 } 225 } 226 #endregion 227 228 } 229 }View Code
1 using System; 2 using System.Collections.Generic; 3 using System.Text; 4 5 namespace RedisPublishAndSubHelper 6 { 7 using StackExchange.Redis; 8 using System.Net.Http; 9 using System.Threading; 10 using System.Threading.Channels; 11 using System.Threading.Tasks; 12 13 public class MyRedisSubPublishHelper 14 { 15 private static readonly string redisConnectionStr = "12.32.12.54:6379,connectTimeout=10000,connectRetry=3,syncTimeout=10000"; 16 private static readonly ConnectionMultiplexer connectionMultiplexer = null; 17 static MyRedisSubPublishHelper() 18 { 19 connectionMultiplexer = ConnectionMultiplexer.Connect(redisConnectionStr); 20 } 21 22 23 #region 釋出訂閱 24 public void SubScriper(string topticName, Action<RedisChannel, RedisValue> handler = null) 25 { 26 ISubscriber subscriber = connectionMultiplexer.GetSubscriber(); 27 ChannelMessageQueue channelMessageQueue = subscriber.Subscribe(topticName); 28 channelMessageQueue.OnMessage(channelMessage => 29 { 30 if (handler != null) 31 { 32 string redisChannel = channelMessage.Channel; 33 string msg = channelMessage.Message; 34 handler.Invoke(redisChannel, msg); 35 } 36 else 37 { 38 string msg = channelMessage.Message; 39 Console.WriteLine($"訂閱到訊息: { msg},Channel={channelMessage.Channel}"); 40 } 41 }); 42 } 43 public void PublishMessage(string topticName, string message) 44 { 45 ISubscriber subscriber = connectionMultiplexer.GetSubscriber(); 46 long publishLong = subscriber.Publish(topticName, message); 47 Console.WriteLine($"釋出訊息成功:{publishLong}"); 48 } 49 #endregion 50 51 #region 入隊出隊 52 public static async Task<long> EnqueueListLeftPushAsync(RedisKey queueName, RedisValue redisvalue) 53 { 54 return await connectionMultiplexer.GetDatabase().ListLeftPushAsync(queueName, redisvalue); 55 } 56 57 public static async Task<string> DequeueListPopRightAsync(RedisKey queueName) 58 { 59 IDatabase database = connectionMultiplexer.GetDatabase(); 60 int count = (await database.ListRangeAsync(queueName)).Length; 61 if (count <= 0) 62 { 63 throw new Exception($"佇列{queueName}資料為零"); 64 } 65 string redisValue = await database.ListRightPopAsync(queueName); 66 if (!string.IsNullOrEmpty(redisValue)) 67 return redisValue; 68 else 69 return string.Empty; 70 } 71 #endregion 72 73 #region 分散式鎖 74 public static void LockByRedis(string key, int expireTimeSeconds = 10) 75 { 76 try 77 { 78 IDatabase database = connectionMultiplexer.GetDatabase(); 79 while (true) 80 { 81 expireTimeSeconds = expireTimeSeconds > 20 ? 10 : expireTimeSeconds; 82 bool lockflag = database.LockTake(key, Thread.CurrentThread.ManagedThreadId, TimeSpan.FromSeconds(expireTimeSeconds)); 83 if (lockflag) 84 { 85 break; 86 } 87 } 88 } 89 catch (Exception ex) 90 { 91 throw new Exception($"Redis加鎖異常:原因{ex.Message}"); 92 } 93 } 94 95 public static bool UnLockByRedis(string key) 96 { 97 try 98 { 99 IDatabase database = connectionMultiplexer.GetDatabase(); 100 return database.LockRelease(key, Thread.CurrentThread.ManagedThreadId); 101 } 102 catch (Exception ex) 103 { 104 throw new Exception($"Redis加鎖異常:原因{ex.Message}"); 105 } 106 } 107 #endregion 108 } 109 }View Code
&n