1. 程式人生 > 其它 >基於Redis的分散式鎖設計

基於Redis的分散式鎖設計

前言

基於Redis的分散式鎖實現,原理很簡單嘛:檢測一下Key是否存在,不存在則Set Key,加鎖成功,存在則加鎖失敗。對嗎?這麼簡單嗎?

如果你真這麼想,那麼你真的需要好好聽我講一下了。接下來,咱們找個例子研究一下。

在開始之前,咱們先定些規則:

  • 關於示例程式碼:
    • 需要搭配我準備的示例程式碼,該示例採用C#編寫
    • 示例中的材料Id固定為10000
    • 示例中的材料初始庫存均為100
  • 關於Redis中的Key:
    • 指示材料庫存的Key為ProductStock_10000
    • 自己實現的分散式鎖中,指示鎖的Key為DistributedLock_10000
    • RedLock.net中,指示鎖的Key為redlock:10000

假如沒有鎖

如果沒有鎖,我們可以通過Jmeter併發100個請求,看看最後庫存是不是0

/// <summary>
/// 無鎖釦減庫存
/// </summary>
/// <returns></returns>
[HttpPost("DecreaseProductStockWithNoLock")]
public async Task<string> DecreaseProductStockWithNoLock()
{
    var stockKey = GetProductStockKey(ProductId);
    var currentQuantity = (long)(await _redisDatabase.Database.StringGetAsync(stockKey));
    if (currentQuantity < 1)
        throw new Exception("庫存不足");

    var leftQuantity = currentQuantity - 1;
    await _redisDatabase.Database.StringSetAsync(stockKey, leftQuantity);

    return $"剩餘庫存:{leftQuantity}";
}

完了,庫存全亂了,收拾收拾,跑路吧o(╥﹏╥)o!

單應用中的鎖

提到鎖,大多數人首先想到的應該就是Monitor的語法糖lock了,這是大多數人最先接觸到的一種鎖。在單應用中,因為lock是執行緒鎖,所以使用該鎖一般是沒有什麼問題的。

/// <summary>
/// 在單應用中扣減庫存
/// </summary>
/// <returns></returns>
[HttpPost("DecreaseProductStockInSingleApp")]
public string DecreaseProductStockInSingleApp()
{
    long leftQuantity;
    lock (_lockObj)
    {
        var stockKey = GetProductStockKey(ProductId);
        var currentQuantity = (long)_redisDatabase.Database.StringGet(stockKey);
        if (currentQuantity < 1)
            throw new Exception("庫存不足");

        leftQuantity = currentQuantity - 1;
        _redisDatabase.Database.StringSet(stockKey, leftQuantity);
    }

    return $"剩餘庫存:{leftQuantity}";
}

結果和我們所期望的一樣,剩餘庫存為0

但是如果我們進行應用叢集,部署多份一模一樣的應用,那lock就無能為力了。接下來,咱們啟動兩個應用例項來看看

# 以開發環境執行,能看到更多資訊
dotnet XXTk.Redis.DistributedLock.Api.dll --urls http://localhost:5000 --environment Development

dotnet XXTk.Redis.DistributedLock.Api.dll --urls http://localhost:5010 --environment Development

可見,一共傳送了100個請求,本應該最後庫存為0的,卻還剩17個

應用叢集中的鎖

版本1

很明顯,lock已經沒用了,是時候進入咱們的主題了——基於Redis的分散式鎖設計。

初步的思路是這樣的:

  1. 將材料Id作為Redis Key
  2. 如果Redis中存在該Key,則認為鎖已經被其他執行緒佔用了
  3. 如果Redis中不存在Key,則將該Key新增到Redis中,Value則隨意賦值
  4. 當獲取到鎖的業務執行完畢後,將Key從Redis中移除

有了思路,接下來就該想一下如何實現了。很幸運,Redis的命令SETNX key value完全滿足我們的需求,實現如下:

對Redis命令不熟悉的同學,可以參考這篇Redis命令文件

/// <summary>
/// 在應用叢集中扣減庫存V1
/// </summary>
/// <returns></returns>
[HttpPost("v1/DecreaseProductStockInAppCluster")]
public async Task<string> DecreaseProductStockInAppClusterV1()
{
    var lockKey = GetDistributedLockKey(ProductId.ToString());

    // 使用 SETNX key value 命令加鎖
    if (await _redisDatabase.Database.StringSetAsync(lockKey, 1, null, When.NotExists, CommandFlags.DemandMaster))
    {
        try
        {
            var stockKey = GetProductStockKey(ProductId);
            var currentQuantity = (long)await _redisDatabase.Database.StringGetAsync(stockKey);
            if (currentQuantity < 1)
                throw new Exception("庫存不足");

            var leftQuantity = currentQuantity - 1;
            await _redisDatabase.Database.StringSetAsync(stockKey, leftQuantity);

            return $"剩餘庫存:{leftQuantity}";
        }
        finally
        {
            // 釋放鎖
            await _redisDatabase.Database.KeyDeleteAsync(lockKey, CommandFlags.DemandMaster);
        }
    }
    else
        throw new Exception("獲取鎖失敗");
}

我沒找到Jmeter統計請求成功或失敗次數的方法,所以使用了聚合報告,通過報告裡的錯誤率手動計算。如果你知道,可以分享給我,謝謝!

通過計算,成功50次,失敗50次,而我們查到的庫存也是還剩餘50個,所以已經基本實現了我們的需求。

版本2

雖然版本1已經基本實現了我們的需求,但是試想一下:

  • 程式碼執行在try塊中時,應用崩潰了,導致鎖未被釋放
  • 釋放鎖時,由於網路問題,連線Redis失敗了,導致鎖未被釋放

如果發生了以上任何情況,都無法正確的釋放鎖,導致鎖永遠無法釋放,導致死鎖。

那我們應該怎麼辦呢?對,就是給鎖加一個過期時間!不過SETNX命令並沒有“過期時間”引數,那我們就需要在獲取到鎖後,通過EXPIRE命令設定鎖的過期時間。

這樣,可以嗎?當然不可以,我們需要將SETEXPIRE兩個操作合併為一個原子性操作,那我們應該怎麼做呢?別擔心,Redis對SET命令進行了增強,使用SET key value EX seconds NX命令即可,最後的NX則是表示與SETNX同義。

/// <summary>
/// 在應用叢集中扣減庫存V2
/// </summary>
/// <returns></returns>
[HttpPost("v2/DecreaseProductStockInAppCluster")]
public async Task<string> DecreaseProductStockInAppClusterV2()
{
    var lockKey = GetDistributedLockKey(ProductId.ToString());
    var expiresIn = TimeSpan.FromSeconds(30);

    // 使用 SET key value EX seconds NX 命令加鎖,並設定過期時間
    if (await _redisDatabase.AddAsync(lockKey, 1, expiresIn, When.NotExists, CommandFlags.DemandMaster))
    {
        try
        {
            var stockKey = GetProductStockKey(ProductId);
            var currentQuantity = (long)await _redisDatabase.Database.StringGetAsync(stockKey);
            if (currentQuantity < 1)
                throw new Exception("庫存不足");

            var leftQuantity = currentQuantity - 1;
            await _redisDatabase.Database.StringSetAsync(stockKey, leftQuantity);

            return $"剩餘庫存:{leftQuantity}";
        }
        finally
        {
            // 釋放鎖
            await _redisDatabase.Database.KeyDeleteAsync(lockKey, CommandFlags.DemandMaster);
        }
    }
    else
        throw new Exception("獲取鎖失敗");
}

版本3

好,死鎖的問題咱們已經解決了,那咱們的分散式鎖是不是已經完美了呢?NO!NO!NO!還是有一些問題滴:

  1. 如果執行緒A獲取到了鎖,並設定了鎖的過期時間是30s,而業務的執行時長需要40s,這就出現了鎖被提前釋放的問題
  2. 如果鎖被提前釋放了,然後被另一個執行緒B獲取到了,此時執行緒A的業務執行完畢了,然後執行了finally程式碼塊中的鎖釋放程式碼,這就把不屬於執行緒A而屬於執行緒B的鎖釋放掉了,這下可全亂套了。

是不是感覺越改問題越多?別灰心,咱們一個一個來解決,先來解決第二個“錯誤釋放了不屬於自己的鎖”的問題。為了讓執行緒知道哪個是自己的鎖,我們需要給執行緒起個唯一不重複的名字,當需要釋放鎖的時候,先檢查一下是不是自己的鎖,如果是,才釋放鎖。那這個名字放在哪裡呢?咱們之前LockKey對應的Value不是沒有用嘛,那咱們就把名字存這裡面,實現如下:

/// <summary>
/// 在應用叢集中扣減庫存V3
/// </summary>
/// <returns></returns>
[HttpPost("v3/DecreaseProductStockInAppCluster")]
public async Task<string> DecreaseProductStockInAppClusterV3()
{
    var lockKey = GetDistributedLockKey(ProductId.ToString());
    var resourceId = Guid.NewGuid().ToString();
    var expiresIn = TimeSpan.FromSeconds(30);

    // 使用 SET key value EX seconds NX 命令加鎖,設定過期時間,並將值設定為業務Id
    if (await _redisDatabase.AddAsync(lockKey, resourceId, expiresIn, When.NotExists, CommandFlags.DemandMaster))
    {
        try
        {
            var stockKey = GetProductStockKey(ProductId);
            var currentQuantity = (long)await _redisDatabase.Database.StringGetAsync(stockKey);
            if (currentQuantity < 1)
                throw new Exception("庫存不足");

            var leftQuantity = currentQuantity - 1;
            await _redisDatabase.Database.StringSetAsync(stockKey, leftQuantity);

            return $"剩餘庫存:{leftQuantity}";
        }
        finally
        {
            // 釋放鎖
            if (await _redisDatabase.GetAsync<string>(lockKey) == resourceId)
            {
                _redisDatabase.Database.KeyDelete(lockKey, CommandFlags.DemandMaster);
            }
        }
    }
    else
        throw new Exception("獲取鎖失敗");
}

版本4

上面的程式碼,你應該看出問題了吧?沒錯,最後的釋放鎖程式碼是分兩步執行的,並不是原子操作,這肯定是不允許的啦!但是,Redis又沒有提供相關命令,所以我們只能使用lua指令碼了:

/// <summary>
/// 在應用叢集中扣減庫存V4
/// </summary>
/// <returns></returns>
[HttpPost("v4/DecreaseProductStockInAppCluster")]
public async Task<string> DecreaseProductStockInAppClusterV4()
{
    var lockKey = GetDistributedLockKey(ProductId.ToString());
    var resourceId = Guid.NewGuid().ToString();
    var expiresIn = TimeSpan.FromSeconds(30);

    // 使用 SET key value EX seconds NX 命令加鎖,設定過期時間,並將值設定為業務Id
    if (await _redisDatabase.AddAsync(lockKey, resourceId, expiresIn, When.NotExists, CommandFlags.DemandMaster))
    {
        try
        {
            var stockKey = GetProductStockKey(ProductId);
            var currentQuantity = (long)await _redisDatabase.Database.StringGetAsync(stockKey);
            if (currentQuantity < 1)
                throw new Exception("庫存不足");

            var leftQuantity = currentQuantity - 1;
            await _redisDatabase.Database.StringSetAsync(stockKey, leftQuantity);

            return $"剩餘庫存:{leftQuantity}";
        }
        finally
        {
            // 釋放鎖,使用lua指令碼實現操作的原子性
            await _redisDatabase.Database.ScriptEvaluateAsync(@"
                if redis.call('get', KEYS[1]) == ARGV[1] then
                    return redis.call('del', KEYS[1])
                else
                    return 0
                end",
             keys: new RedisKey[] { lockKey },
             values: new RedisValue[] { resourceId }, 
             CommandFlags.DemandMaster);
        }
    }
    else
        throw new Exception("獲取鎖失敗");
}

如果你沒有使用我的示例程式碼,而是自己寫的,可能會出現鎖未被正確釋放的問題:執行完lua指令碼後,返回的是0。這可能是因為你使用了Json序列化工具來將物件序列化為字串,以將其存放到Redis中。但是由於Json序列化字串時,將引號(")也序列化為了("),這就會導致字串"123"存入到Redis中為"\"123\""。具體解決辦法可以參考我實現的RedisNewtonsoftSerializer類。

版本5

最後,我們來解決最後一個問題——業務執行時長超過了鎖的過期時長,導致鎖提前被釋放。由於我們無法準確預測業務的執行時長,鎖過期時間設定的太長也不合理,所以,若業務還未執行完,我們必須能夠在鎖快過期的時候,適當的延長鎖過期時間。可以通過定時器來解決。

/// <summary>
/// 在應用叢集中扣減庫存V5
/// </summary>
/// <returns></returns>
[HttpPost("v5/DecreaseProductStockInAppCluster")]
public async Task<string> DecreaseProductStockInAppClusterV5()
{
    var lockKey = GetDistributedLockKey(ProductId.ToString());
    var resourceId = Guid.NewGuid().ToString();
    var expiresIn = TimeSpan.FromSeconds(30);

    // 使用 SET key value EX seconds NX 命令加鎖,設定過期時間,並將值設定為業務Id
    if (await _redisDatabase.AddAsync(lockKey, resourceId, expiresIn, When.NotExists, CommandFlags.DemandMaster))
    {
        try
        {
            // 啟動定時器,定時延長key的過期時間
            var interval = expiresIn.TotalMilliseconds / 2;
            var timer = new System.Threading.Timer(
                callback: state => ExtendLockLifetime(lockKey, resourceId, expiresIn),
                state: null,
                dueTime: (int)interval,
                period: (int)interval);

            var stockKey = GetProductStockKey(ProductId);
            var currentQuantity = (long)await _redisDatabase.Database.StringGetAsync(stockKey);
            if (currentQuantity < 1)
                throw new Exception("庫存不足");

            var leftQuantity = currentQuantity - 1;
            await _redisDatabase.Database.StringSetAsync(stockKey, leftQuantity);

            timer.Change(Timeout.Infinite, Timeout.Infinite);
            timer.Dispose();
            timer = null;

            return $"剩餘庫存:{leftQuantity}";
        }
        finally
        {
            // 釋放鎖,使用lua指令碼實現操作的原子性
            await _redisDatabase.Database.ScriptEvaluateAsync(@"
                if redis.call('get', KEYS[1]) == ARGV[1] then
                    return redis.call('del', KEYS[1])
                else
                    return 0
                end",
             keys: new RedisKey[] { lockKey },
             values: new RedisValue[] { resourceId },
             CommandFlags.DemandMaster);
        }
    }
    else
        throw new Exception("獲取鎖失敗");
}

private void ExtendLockLifetime(string lockKey, string resourceId, TimeSpan expiresIn)
{
    _redisDatabase.Database.ScriptEvaluate(@"
        local currentVal = redis.call('get', KEYS[1])
        if (currentVal == false) then
            return redis.call('set', KEYS[1], ARGV[1], 'PX', ARGV[2]) and 1 or 0
        elseif (currentVal == ARGV[1]) then
            return redis.call('pexpire', KEYS[1], ARGV[2])
        else
            return -1
        end
    ",
    keys: new RedisKey[] { lockKey },
    values: new RedisValue[] { resourceId, (long)expiresIn.TotalMilliseconds },
    CommandFlags.DemandMaster);
}

使用RedLock.net中的分散式鎖

以上的版本5,已經包含了分散式鎖的基本思想了,不過我寫的肯定比較簡陋,所以我給大家推薦一個比較不錯的開源實現——RedLock.net

Redis官方文件整理了常用語言的分散式鎖實現,也梳理了RedLock的實現原理。

/// <summary>
/// 通過使用RedLock在應用叢集中扣減庫存
/// </summary>
/// <returns></returns>
[HttpPost("DecreaseProductStockInAppClusterWithRedLock")]
public async Task<string> DecreaseProductStockInAppClusterWithRedLock()
{
    // 鎖的過期時間為30s,等待獲取鎖的時間為20s,如果沒有獲取到鎖,則等待1秒鐘後再次嘗試獲取
    using var redLock = await _distributedLockFactory.CreateLockAsync(
        resource: ProductId.ToString(),
        expiryTime: TimeSpan.FromSeconds(30),
        waitTime: TimeSpan.FromSeconds(20),
        retryTime: TimeSpan.FromSeconds(1)
    );

    // 確認是否已獲取到鎖
    if (redLock.IsAcquired)
    {
        var stockKey = GetProductStockKey(ProductId);
        var currentQuantity = (long)await _redisDatabase.Database.StringGetAsync(stockKey);
        if (currentQuantity < 1)
            throw new Exception("庫存不足");

        var leftQuantity = currentQuantity - 1;
        await _redisDatabase.Database.StringSetAsync(stockKey, leftQuantity);

        return $"剩餘庫存:{leftQuantity}";
    }
    else
        throw new Exception("獲取鎖失敗");
}

站在Redis角度上

我們上面站在程式的角度上已經實現了分散式鎖,但是站在Redis角度上,還有幾個問題需要思考一下:

Redis宕機導致無法加鎖

如果Redis宕機了,就會導致Redis伺服器不可用,從而導致無法進行加鎖。

解決方法很簡單,可以通過配置主從關係,提高Redis的高可用性,但這樣又產生了下面的問題。

Redis主從切換導致鎖失效

過程是這樣的:

  1. 客戶端 A 從 Redis master 上獲取到了鎖
  2. 在代表鎖的 Key 同步到 Redis slave 之前,master 宕機了
  3. 然後 Redis 進行主從切換, Redis slave 升級為 Redis master
  4. 客戶端 B 從新 Redis master 中獲取到了上面客戶端 A 持有的鎖。

這顯然出大問題了!因此,RedLock演算法誕生了。

RedLock

我們不討論時鐘漂移,所以我們假設,多臺伺服器之間的時鐘漂移很小,以至於我們可以忽略它。

基本原理

首先,我們需要至少5臺(大於等於5的奇數個)Redis伺服器,這5臺Redis之間相互獨立,沒有任何主從、叢集關係。

接著,我們按照從左到右的順序,在Redis伺服器上獲取鎖,我們假設

  • 鎖的過期時間為10s,
  • 加鎖的開始時間是00:00:00,
  • 在第一臺伺服器上獲取到鎖的時間為00:00:01,
  • 在第二臺伺服器上獲取到鎖的時間為00:00:02,
  • 在第三臺伺服器上獲取到鎖的時間為00:00:03。

現在,已經有超過半數(3/5)的Redis伺服器獲取到了鎖

  • 獲取鎖所用的時間 = 最後一臺獲取到鎖的Redis伺服器獲取到鎖的時間 - 加鎖的開始時間
  • 鎖的有效剩餘時間(TTL) = 鎖的過期時間 - 獲取鎖所用的時間

獲取鎖所用的時間 = 00:00:03 - 00:00:00 = 3s,TTL = 10s - (00:00:03 - 00:00:00) = 7s。所以,獲取鎖的時間並沒有超過鎖的有效期,我們認為獲取鎖成功。

認為鎖獲取成功的條件有兩個:

  1. 超過半數的Redis伺服器獲取到了鎖
  2. 獲取鎖的時間沒有超過鎖的有效期

重試

以上列舉的示例是非常順利獲取到鎖的情況,然而很多時候,分散式鎖的獲取沒那麼順利,很可能出現以下情況:

  • A已經獲取到了兩臺Redis伺服器的鎖
  • B已經獲取到了兩臺Redis伺服器的鎖
  • C已經獲取到了一臺Redis伺服器的鎖

如果三臺客戶端的請求一直處於阻塞狀態(直到達到鎖的有效期),會嚴重影響鎖的獲取效率,這時就需要重試機制

重試機制:在一開始,同時向所有的(這裡是5臺)Redis伺服器,傳送SET key value EX senconds NX命令,當所有伺服器都返回結果後,判斷是否以達成“鎖獲取成功的兩個條件”,如果達成了,則鎖獲取成功。如果沒有,則立即將已獲取的鎖釋放掉,並等待一小段時間,重複以上步驟(一般會嘗試3次)。如果這期間仍未達成“鎖獲取成功的兩個條件”,則認為鎖獲取失敗。

主從切換導致鎖失效

實際上,在RedLock演算法中,如果Redis服務配置了主從關係,仍然會出現我們之前提出的問題——主從切換導致鎖失效。

為了解決這個問題,我們需要延遲Redis slave節點提升為Redis master節點的時間,延遲的時間就是鎖的有效剩餘時間(TTL),這樣,就不會出現鎖失效的問題了(這似乎只存在於理論層面,如果你知道如何延遲slave提升master的時間,請一定要分享給我)。

釋放鎖

釋放鎖就很簡單了,給每臺伺服器都發送一個刪除鎖的命令就可以了,因為咱們的指令碼已經保證了,只會刪除與當前業務有關聯的鎖。

結語

梳理了那麼多,終於來到了結尾,你也發現了,基於Redis實現一個分散式鎖,並沒有想象的那麼簡單,細節問題真的很多很多。另外,至少在我看來,RedLock演算法實在是有些重量級了,如果不是那麼在乎Redis主從切換導致的鎖不一致的問題,單Redis其實就已經足夠了。

另外,RedLock.net中使用了一個變數extendUnlockSemaphore,而不是使用的lock,具體原因可以參考:Reentrant Async Locks in C#

最後,還可以基於Zookeeper來實現分散式鎖,有興趣的可以去了解一下。