基於Redis的分散式鎖設計
前言
基於Redis的分散式鎖實現,原理很簡單嘛:檢測一下Key是否存在,不存在則Set Key,加鎖成功,存在則加鎖失敗。對嗎?這麼簡單嗎?
如果你真這麼想,那麼你真的需要好好聽我講一下了。接下來,咱們找個例子研究一下。
在開始之前,咱們先定些規則:
- 關於示例程式碼:
- 需要搭配我準備的示例程式碼,該示例採用C#編寫
- 示例中的材料Id固定為10000
- 示例中的材料初始庫存均為100
- 關於Redis中的Key:
- 指示材料庫存的Key為
ProductStock_10000
- 自己實現的分散式鎖中,指示鎖的Key為
DistributedLock_10000
- RedLock.net中,指示鎖的Key為
redlock:10000
- 指示材料庫存的Key為
假如沒有鎖
如果沒有鎖,我們可以通過Jmeter併發100個請求,看看最後庫存是不是0
/// <summary> /// 無鎖釦減庫存 /// </summary> /// <returns></returns> [HttpPost("DecreaseProductStockWithNoLock")] public async Task<string> DecreaseProductStockWithNoLock() { var stockKey = GetProductStockKey(ProductId); var currentQuantity = (long)(await _redisDatabase.Database.StringGetAsync(stockKey)); if (currentQuantity < 1) throw new Exception("庫存不足"); var leftQuantity = currentQuantity - 1; await _redisDatabase.Database.StringSetAsync(stockKey, leftQuantity); return $"剩餘庫存:{leftQuantity}"; }
完了,庫存全亂了,收拾收拾,跑路吧o(╥﹏╥)o!
單應用中的鎖
提到鎖,大多數人首先想到的應該就是Monitor
的語法糖lock
了,這是大多數人最先接觸到的一種鎖。在單應用中,因為lock是執行緒鎖,所以使用該鎖一般是沒有什麼問題的。
/// <summary> /// 在單應用中扣減庫存 /// </summary> /// <returns></returns> [HttpPost("DecreaseProductStockInSingleApp")] public string DecreaseProductStockInSingleApp() { long leftQuantity; lock (_lockObj) { var stockKey = GetProductStockKey(ProductId); var currentQuantity = (long)_redisDatabase.Database.StringGet(stockKey); if (currentQuantity < 1) throw new Exception("庫存不足"); leftQuantity = currentQuantity - 1; _redisDatabase.Database.StringSet(stockKey, leftQuantity); } return $"剩餘庫存:{leftQuantity}"; }
結果和我們所期望的一樣,剩餘庫存為0
但是如果我們進行應用叢集,部署多份一模一樣的應用,那lock
就無能為力了。接下來,咱們啟動兩個應用例項來看看
# 以開發環境執行,能看到更多資訊
dotnet XXTk.Redis.DistributedLock.Api.dll --urls http://localhost:5000 --environment Development
dotnet XXTk.Redis.DistributedLock.Api.dll --urls http://localhost:5010 --environment Development
可見,一共傳送了100個請求,本應該最後庫存為0的,卻還剩17個
應用叢集中的鎖
版本1
很明顯,lock
已經沒用了,是時候進入咱們的主題了——基於Redis的分散式鎖設計。
初步的思路是這樣的:
- 將材料Id作為Redis Key
- 如果Redis中存在該Key,則認為鎖已經被其他執行緒佔用了
- 如果Redis中不存在Key,則將該Key新增到Redis中,Value則隨意賦值
- 當獲取到鎖的業務執行完畢後,將Key從Redis中移除
有了思路,接下來就該想一下如何實現了。很幸運,Redis的命令SETNX key value
完全滿足我們的需求,實現如下:
對Redis命令不熟悉的同學,可以參考這篇Redis命令文件
/// <summary>
/// 在應用叢集中扣減庫存V1
/// </summary>
/// <returns></returns>
[HttpPost("v1/DecreaseProductStockInAppCluster")]
public async Task<string> DecreaseProductStockInAppClusterV1()
{
var lockKey = GetDistributedLockKey(ProductId.ToString());
// 使用 SETNX key value 命令加鎖
if (await _redisDatabase.Database.StringSetAsync(lockKey, 1, null, When.NotExists, CommandFlags.DemandMaster))
{
try
{
var stockKey = GetProductStockKey(ProductId);
var currentQuantity = (long)await _redisDatabase.Database.StringGetAsync(stockKey);
if (currentQuantity < 1)
throw new Exception("庫存不足");
var leftQuantity = currentQuantity - 1;
await _redisDatabase.Database.StringSetAsync(stockKey, leftQuantity);
return $"剩餘庫存:{leftQuantity}";
}
finally
{
// 釋放鎖
await _redisDatabase.Database.KeyDeleteAsync(lockKey, CommandFlags.DemandMaster);
}
}
else
throw new Exception("獲取鎖失敗");
}
我沒找到Jmeter統計請求成功或失敗次數的方法,所以使用了聚合報告,通過報告裡的錯誤率手動計算。如果你知道,可以分享給我,謝謝!
通過計算,成功50次,失敗50次,而我們查到的庫存也是還剩餘50個,所以已經基本實現了我們的需求。
版本2
雖然版本1已經基本實現了我們的需求,但是試想一下:
- 程式碼執行在
try
塊中時,應用崩潰了,導致鎖未被釋放 - 釋放鎖時,由於網路問題,連線Redis失敗了,導致鎖未被釋放
如果發生了以上任何情況,都無法正確的釋放鎖,導致鎖永遠無法釋放,導致死鎖。
那我們應該怎麼辦呢?對,就是給鎖加一個過期時間!不過SETNX
命令並沒有“過期時間”引數,那我們就需要在獲取到鎖後,通過EXPIRE
命令設定鎖的過期時間。
這樣,可以嗎?當然不可以,我們需要將SET
和EXPIRE
兩個操作合併為一個原子性操作,那我們應該怎麼做呢?別擔心,Redis對SET
命令進行了增強,使用SET key value EX seconds NX
命令即可,最後的NX
則是表示與SETNX
同義。
/// <summary>
/// 在應用叢集中扣減庫存V2
/// </summary>
/// <returns></returns>
[HttpPost("v2/DecreaseProductStockInAppCluster")]
public async Task<string> DecreaseProductStockInAppClusterV2()
{
var lockKey = GetDistributedLockKey(ProductId.ToString());
var expiresIn = TimeSpan.FromSeconds(30);
// 使用 SET key value EX seconds NX 命令加鎖,並設定過期時間
if (await _redisDatabase.AddAsync(lockKey, 1, expiresIn, When.NotExists, CommandFlags.DemandMaster))
{
try
{
var stockKey = GetProductStockKey(ProductId);
var currentQuantity = (long)await _redisDatabase.Database.StringGetAsync(stockKey);
if (currentQuantity < 1)
throw new Exception("庫存不足");
var leftQuantity = currentQuantity - 1;
await _redisDatabase.Database.StringSetAsync(stockKey, leftQuantity);
return $"剩餘庫存:{leftQuantity}";
}
finally
{
// 釋放鎖
await _redisDatabase.Database.KeyDeleteAsync(lockKey, CommandFlags.DemandMaster);
}
}
else
throw new Exception("獲取鎖失敗");
}
版本3
好,死鎖的問題咱們已經解決了,那咱們的分散式鎖是不是已經完美了呢?NO!NO!NO!還是有一些問題滴:
- 如果執行緒A獲取到了鎖,並設定了鎖的過期時間是30s,而業務的執行時長需要40s,這就出現了鎖被提前釋放的問題
- 如果鎖被提前釋放了,然後被另一個執行緒B獲取到了,此時執行緒A的業務執行完畢了,然後執行了
finally
程式碼塊中的鎖釋放程式碼,這就把不屬於執行緒A而屬於執行緒B的鎖釋放掉了,這下可全亂套了。
是不是感覺越改問題越多?別灰心,咱們一個一個來解決,先來解決第二個“錯誤釋放了不屬於自己的鎖”的問題。為了讓執行緒知道哪個是自己的鎖,我們需要給執行緒起個唯一不重複的名字,當需要釋放鎖的時候,先檢查一下是不是自己的鎖,如果是,才釋放鎖。那這個名字放在哪裡呢?咱們之前LockKey對應的Value不是沒有用嘛,那咱們就把名字存這裡面,實現如下:
/// <summary>
/// 在應用叢集中扣減庫存V3
/// </summary>
/// <returns></returns>
[HttpPost("v3/DecreaseProductStockInAppCluster")]
public async Task<string> DecreaseProductStockInAppClusterV3()
{
var lockKey = GetDistributedLockKey(ProductId.ToString());
var resourceId = Guid.NewGuid().ToString();
var expiresIn = TimeSpan.FromSeconds(30);
// 使用 SET key value EX seconds NX 命令加鎖,設定過期時間,並將值設定為業務Id
if (await _redisDatabase.AddAsync(lockKey, resourceId, expiresIn, When.NotExists, CommandFlags.DemandMaster))
{
try
{
var stockKey = GetProductStockKey(ProductId);
var currentQuantity = (long)await _redisDatabase.Database.StringGetAsync(stockKey);
if (currentQuantity < 1)
throw new Exception("庫存不足");
var leftQuantity = currentQuantity - 1;
await _redisDatabase.Database.StringSetAsync(stockKey, leftQuantity);
return $"剩餘庫存:{leftQuantity}";
}
finally
{
// 釋放鎖
if (await _redisDatabase.GetAsync<string>(lockKey) == resourceId)
{
_redisDatabase.Database.KeyDelete(lockKey, CommandFlags.DemandMaster);
}
}
}
else
throw new Exception("獲取鎖失敗");
}
版本4
上面的程式碼,你應該看出問題了吧?沒錯,最後的釋放鎖程式碼是分兩步執行的,並不是原子操作,這肯定是不允許的啦!但是,Redis又沒有提供相關命令,所以我們只能使用lua指令碼了:
/// <summary>
/// 在應用叢集中扣減庫存V4
/// </summary>
/// <returns></returns>
[HttpPost("v4/DecreaseProductStockInAppCluster")]
public async Task<string> DecreaseProductStockInAppClusterV4()
{
var lockKey = GetDistributedLockKey(ProductId.ToString());
var resourceId = Guid.NewGuid().ToString();
var expiresIn = TimeSpan.FromSeconds(30);
// 使用 SET key value EX seconds NX 命令加鎖,設定過期時間,並將值設定為業務Id
if (await _redisDatabase.AddAsync(lockKey, resourceId, expiresIn, When.NotExists, CommandFlags.DemandMaster))
{
try
{
var stockKey = GetProductStockKey(ProductId);
var currentQuantity = (long)await _redisDatabase.Database.StringGetAsync(stockKey);
if (currentQuantity < 1)
throw new Exception("庫存不足");
var leftQuantity = currentQuantity - 1;
await _redisDatabase.Database.StringSetAsync(stockKey, leftQuantity);
return $"剩餘庫存:{leftQuantity}";
}
finally
{
// 釋放鎖,使用lua指令碼實現操作的原子性
await _redisDatabase.Database.ScriptEvaluateAsync(@"
if redis.call('get', KEYS[1]) == ARGV[1] then
return redis.call('del', KEYS[1])
else
return 0
end",
keys: new RedisKey[] { lockKey },
values: new RedisValue[] { resourceId },
CommandFlags.DemandMaster);
}
}
else
throw new Exception("獲取鎖失敗");
}
如果你沒有使用我的示例程式碼,而是自己寫的,可能會出現鎖未被正確釋放的問題:執行完lua指令碼後,返回的是0。這可能是因為你使用了Json序列化工具來將物件序列化為字串,以將其存放到Redis中。但是由於Json序列化字串時,將引號(")也序列化為了("),這就會導致字串"123"存入到Redis中為"\"123\""。具體解決辦法可以參考我實現的
RedisNewtonsoftSerializer
類。
版本5
最後,我們來解決最後一個問題——業務執行時長超過了鎖的過期時長,導致鎖提前被釋放。由於我們無法準確預測業務的執行時長,鎖過期時間設定的太長也不合理,所以,若業務還未執行完,我們必須能夠在鎖快過期的時候,適當的延長鎖過期時間。可以通過定時器來解決。
/// <summary>
/// 在應用叢集中扣減庫存V5
/// </summary>
/// <returns></returns>
[HttpPost("v5/DecreaseProductStockInAppCluster")]
public async Task<string> DecreaseProductStockInAppClusterV5()
{
var lockKey = GetDistributedLockKey(ProductId.ToString());
var resourceId = Guid.NewGuid().ToString();
var expiresIn = TimeSpan.FromSeconds(30);
// 使用 SET key value EX seconds NX 命令加鎖,設定過期時間,並將值設定為業務Id
if (await _redisDatabase.AddAsync(lockKey, resourceId, expiresIn, When.NotExists, CommandFlags.DemandMaster))
{
try
{
// 啟動定時器,定時延長key的過期時間
var interval = expiresIn.TotalMilliseconds / 2;
var timer = new System.Threading.Timer(
callback: state => ExtendLockLifetime(lockKey, resourceId, expiresIn),
state: null,
dueTime: (int)interval,
period: (int)interval);
var stockKey = GetProductStockKey(ProductId);
var currentQuantity = (long)await _redisDatabase.Database.StringGetAsync(stockKey);
if (currentQuantity < 1)
throw new Exception("庫存不足");
var leftQuantity = currentQuantity - 1;
await _redisDatabase.Database.StringSetAsync(stockKey, leftQuantity);
timer.Change(Timeout.Infinite, Timeout.Infinite);
timer.Dispose();
timer = null;
return $"剩餘庫存:{leftQuantity}";
}
finally
{
// 釋放鎖,使用lua指令碼實現操作的原子性
await _redisDatabase.Database.ScriptEvaluateAsync(@"
if redis.call('get', KEYS[1]) == ARGV[1] then
return redis.call('del', KEYS[1])
else
return 0
end",
keys: new RedisKey[] { lockKey },
values: new RedisValue[] { resourceId },
CommandFlags.DemandMaster);
}
}
else
throw new Exception("獲取鎖失敗");
}
private void ExtendLockLifetime(string lockKey, string resourceId, TimeSpan expiresIn)
{
_redisDatabase.Database.ScriptEvaluate(@"
local currentVal = redis.call('get', KEYS[1])
if (currentVal == false) then
return redis.call('set', KEYS[1], ARGV[1], 'PX', ARGV[2]) and 1 or 0
elseif (currentVal == ARGV[1]) then
return redis.call('pexpire', KEYS[1], ARGV[2])
else
return -1
end
",
keys: new RedisKey[] { lockKey },
values: new RedisValue[] { resourceId, (long)expiresIn.TotalMilliseconds },
CommandFlags.DemandMaster);
}
使用RedLock.net中的分散式鎖
以上的版本5,已經包含了分散式鎖的基本思想了,不過我寫的肯定比較簡陋,所以我給大家推薦一個比較不錯的開源實現——RedLock.net
Redis官方文件整理了常用語言的分散式鎖實現,也梳理了RedLock的實現原理。
/// <summary>
/// 通過使用RedLock在應用叢集中扣減庫存
/// </summary>
/// <returns></returns>
[HttpPost("DecreaseProductStockInAppClusterWithRedLock")]
public async Task<string> DecreaseProductStockInAppClusterWithRedLock()
{
// 鎖的過期時間為30s,等待獲取鎖的時間為20s,如果沒有獲取到鎖,則等待1秒鐘後再次嘗試獲取
using var redLock = await _distributedLockFactory.CreateLockAsync(
resource: ProductId.ToString(),
expiryTime: TimeSpan.FromSeconds(30),
waitTime: TimeSpan.FromSeconds(20),
retryTime: TimeSpan.FromSeconds(1)
);
// 確認是否已獲取到鎖
if (redLock.IsAcquired)
{
var stockKey = GetProductStockKey(ProductId);
var currentQuantity = (long)await _redisDatabase.Database.StringGetAsync(stockKey);
if (currentQuantity < 1)
throw new Exception("庫存不足");
var leftQuantity = currentQuantity - 1;
await _redisDatabase.Database.StringSetAsync(stockKey, leftQuantity);
return $"剩餘庫存:{leftQuantity}";
}
else
throw new Exception("獲取鎖失敗");
}
站在Redis角度上
我們上面站在程式的角度上已經實現了分散式鎖,但是站在Redis角度上,還有幾個問題需要思考一下:
Redis宕機導致無法加鎖
如果Redis宕機了,就會導致Redis伺服器不可用,從而導致無法進行加鎖。
解決方法很簡單,可以通過配置主從關係,提高Redis的高可用性,但這樣又產生了下面的問題。
Redis主從切換導致鎖失效
過程是這樣的:
- 客戶端 A 從 Redis master 上獲取到了鎖
- 在代表鎖的 Key 同步到 Redis slave 之前,master 宕機了
- 然後 Redis 進行主從切換, Redis slave 升級為 Redis master
- 客戶端 B 從新 Redis master 中獲取到了上面客戶端 A 持有的鎖。
這顯然出大問題了!因此,RedLock演算法誕生了。
RedLock
我們不討論時鐘漂移,所以我們假設,多臺伺服器之間的時鐘漂移很小,以至於我們可以忽略它。
基本原理
首先,我們需要至少5臺(大於等於5的奇數個)Redis伺服器,這5臺Redis之間相互獨立,沒有任何主從、叢集關係。
接著,我們按照從左到右的順序,在Redis伺服器上獲取鎖,我們假設
- 鎖的過期時間為10s,
- 加鎖的開始時間是00:00:00,
- 在第一臺伺服器上獲取到鎖的時間為00:00:01,
- 在第二臺伺服器上獲取到鎖的時間為00:00:02,
- 在第三臺伺服器上獲取到鎖的時間為00:00:03。
現在,已經有超過半數(3/5)的Redis伺服器獲取到了鎖。
- 獲取鎖所用的時間 = 最後一臺獲取到鎖的Redis伺服器獲取到鎖的時間 - 加鎖的開始時間
- 鎖的有效剩餘時間(TTL) = 鎖的過期時間 - 獲取鎖所用的時間
獲取鎖所用的時間 = 00:00:03 - 00:00:00 = 3s,TTL = 10s - (00:00:03 - 00:00:00) = 7s。所以,獲取鎖的時間並沒有超過鎖的有效期,我們認為獲取鎖成功。
認為鎖獲取成功的條件有兩個:
- 超過半數的Redis伺服器獲取到了鎖
- 獲取鎖的時間沒有超過鎖的有效期
重試
以上列舉的示例是非常順利獲取到鎖的情況,然而很多時候,分散式鎖的獲取沒那麼順利,很可能出現以下情況:
- A已經獲取到了兩臺Redis伺服器的鎖
- B已經獲取到了兩臺Redis伺服器的鎖
- C已經獲取到了一臺Redis伺服器的鎖
如果三臺客戶端的請求一直處於阻塞狀態(直到達到鎖的有效期),會嚴重影響鎖的獲取效率,這時就需要重試機制。
重試機制:在一開始,同時向所有的(這裡是5臺)Redis伺服器,傳送SET key value EX senconds NX
命令,當所有伺服器都返回結果後,判斷是否以達成“鎖獲取成功的兩個條件”,如果達成了,則鎖獲取成功。如果沒有,則立即將已獲取的鎖釋放掉,並等待一小段時間,重複以上步驟(一般會嘗試3次)。如果這期間仍未達成“鎖獲取成功的兩個條件”,則認為鎖獲取失敗。
主從切換導致鎖失效
實際上,在RedLock演算法中,如果Redis服務配置了主從關係,仍然會出現我們之前提出的問題——主從切換導致鎖失效。
為了解決這個問題,我們需要延遲Redis slave節點提升為Redis master節點的時間,延遲的時間就是鎖的有效剩餘時間(TTL),這樣,就不會出現鎖失效的問題了(這似乎只存在於理論層面,如果你知道如何延遲slave提升master的時間,請一定要分享給我)。
釋放鎖
釋放鎖就很簡單了,給每臺伺服器都發送一個刪除鎖的命令就可以了,因為咱們的指令碼已經保證了,只會刪除與當前業務有關聯的鎖。
結語
梳理了那麼多,終於來到了結尾,你也發現了,基於Redis實現一個分散式鎖,並沒有想象的那麼簡單,細節問題真的很多很多。另外,至少在我看來,RedLock演算法實在是有些重量級了,如果不是那麼在乎Redis主從切換導致的鎖不一致的問題,單Redis其實就已經足夠了。
另外,RedLock.net中使用了一個變數extendUnlockSemaphore
,而不是使用的lock
,具體原因可以參考:Reentrant Async Locks in C#
最後,還可以基於Zookeeper來實現分散式鎖,有興趣的可以去了解一下。