1. 程式人生 > >C# Redis分散式鎖 - 單節點

C# Redis分散式鎖 - 單節點

為什麼要用分散式鎖?

先上一張截圖,這是在瀏覽別人的部落格時看到的.

 

在瞭解為什麼要用分散式鎖之前,我們應該知道到底什麼是分散式鎖.

鎖按照不同的維度,有多種分類.比如

1.悲觀鎖,樂觀鎖;

2.公平鎖,非公平鎖;

3.獨享鎖,共享鎖;

4.執行緒鎖,程序鎖;

等等.

我們平時用的鎖,比如 lock,它是執行緒鎖,主要用來給方法,程式碼塊加鎖.由於程序的記憶體單元是被其所有執行緒共享的,所以執行緒鎖控制的實際是多個執行緒對同一塊記憶體區域的訪問.

有執行緒鎖,就必然有程序鎖.顧名思義,程序鎖的目的是控制多個程序對共享資源的訪問.因為程序之間彼此獨立,各個程序是無法控制其他程序對資源的訪問,所以只能通過作業系統來控制.比如 Mutex.

但是程序鎖有一個前提,那就是需要多個程序在同一個系統中,如果多個程序不在同一個系統,那就只能使用分散式鎖來控制了.

分散式鎖是控制分散式系統中不同系統之間訪問共享資源的一種鎖實現.它和執行緒鎖,程序鎖的作用都是一樣,只是範圍不一樣.

所以要實現分散式鎖,就必須依靠第三方儲存介質來儲存鎖的資訊.因為各個程序之間彼此誰都不服誰,只能找一個帶頭大哥咯;

 

以下示例需引用NUGET: CSRedisCore

示例一

            CSRedisClient redisClient = new CSRedis.CSRedisClient("127.0.0.1:6379,defaultDatabase=0");
            var lockKey = "lockKey";
            var stock = 5;//商品庫存
            var taskCount = 10;//執行緒數量
            redisClient.Del(lockKey);//測試前,先把鎖刪了.

            for (int i = 0; i < taskCount; i++)
            {
                Task.Run(() =>
                {
                    //獲取鎖
                    do
                    {
                        //setnx : key不存在才會成功,存在則失敗.
                        var success = redisClient.SetNx(lockKey, 1);
                        if (success == true)
                        {
                            break;
                        }
                        Thread.Sleep(TimeSpan.FromSeconds(1));//休息1秒再嘗試獲取鎖
                    } while (true);

                    Console.WriteLine($"執行緒:{Task.CurrentId} 拿到了鎖,開始消費");

                    if (stock <= 0)
                    {
                        Console.WriteLine($"庫存不足,執行緒:{Task.CurrentId} 搶購失敗!");
                        redisClient.Del(lockKey);
                        return;
                    }

                    stock--;
                    //模擬處理業務
                    Thread.Sleep(TimeSpan.FromSeconds(new Random().Next(1, 3)));

                    Console.WriteLine($"執行緒:{Task.CurrentId} 消費完畢!剩餘 {stock} 個");
                    //業務處理完後,釋放鎖.
                    redisClient.Del(lockKey);
                });
            }

執行結果:

 

看起來貌似沒毛病,實際上上述程式碼有個致命的問題:

當某個執行緒拿到鎖之後,如果系統崩潰了,那麼鎖永遠都不會被釋放.因此,我們應該給鎖加一個過期時間,當時間到了,還沒有被主動釋放,我們就讓redis釋放掉它,以保證其他消費者可以拿到鎖,進行消費.

這裡給鎖加過期時間也有講究,不能拿到鎖後再加,比如:

                        ......
              //setnx : key不存在才會成功,存在則失敗. var success = redisClient.SetNx(lockKey, 1); if (success == true) { redisClient.Set(lockKey, 1, expireSeconds: 5); break; }

這樣操作的話,獲取鎖和設定鎖的過期時間就不是原子操作,同樣會出現上面提到的問題.Redis 提供了一個合而為一的操作可以解決這個問題.

                        //set : key存在則失敗,不存在才會成功,並且過期時間5秒
                        var success = redisClient.Set(lockKey, 1, expireSeconds: 5, exists: RedisExistence.Nx);

這個問題雖然解決了,但隨之產生了一個新的問題:

假設有3個執行緒A,B,C

當執行緒A拿到鎖後執行業務的時候超時了,超過了鎖的過期時間還沒執行完,這時候鎖被Redis釋放了,

於是執行緒B拿到了鎖並開始執行業務邏輯.

當執行緒B的業務邏輯還沒執行完的時候,執行緒A的業務邏輯執行完了,於是乎就跑去釋放掉了鎖.

這時候執行緒C就可以拿到鎖開始執行它的業務邏輯.

這不就亂套了麼...

因此,執行緒在釋放鎖的時候應該判斷這個鎖還屬不屬於自己.

所以,在設定鎖的時候,redis的value值不能像上面程式碼那樣,隨便給個1,而應該給一個隨機值,代表當前執行緒.

                    var id = Guid.NewGuid().ToString("N");
                    //獲取鎖
                    do
                    {
                        //set : key存在則失敗,不存在才會成功,並且過期時間5秒
                        var success = redisClient.Set(lockKey, id, expireSeconds: 5, exists: RedisExistence.Nx);
                        if (success == true)
                        {
                            break;
                        }
                        Thread.Sleep(TimeSpan.FromSeconds(1));//休息1秒再嘗試獲取鎖
                    } while (true);

                    Console.WriteLine($"執行緒:{Task.CurrentId} 拿到了鎖,開始消費");
            .........
//業務處理完後,釋放鎖. var value = redisClient.Get<string>(lockKey); if (value == id) { redisClient.Del(lockKey); }

 

完美了嗎?

不完美.還是老生常談的問題,取value和刪除key 分了兩步走,不是原子操作.

並且,這裡還不能用pipe,因為需要根據取到的value來決定下一個操作.上面設定過期時間倒是可以用pipe.

所以,這裡只能用lua.

完整的程式碼如下:

            CSRedisClient redisClient = new CSRedis.CSRedisClient("127.0.0.1:6379,defaultDatabase=0");
            var lockKey = "lockKey";
            var stock = 5;//商品庫存
            var taskCount = 10;//執行緒數量
            var script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";//釋放鎖的redis指令碼

            redisClient.Del(lockKey);//測試前,先把鎖刪了.

            for (int i = 0; i < taskCount; i++)
            {
                Task.Run(() =>
                {
                    var id = Guid.NewGuid().ToString("N");
                    //獲取鎖
                    do
                    {
                        //set : key存在則失敗,不存在才會成功,並且過期時間5秒
                        var success = redisClient.Set(lockKey, id, expireSeconds: 5, exists: RedisExistence.Nx);
                        if (success == true)
                        {
                            break;
                        }
                        Thread.Sleep(TimeSpan.FromSeconds(1));//休息1秒再嘗試獲取鎖
                    } while (true);

                    Console.WriteLine($"執行緒:{Task.CurrentId} 拿到了鎖,開始消費");

                    if (stock <= 0)
                    {
                        Console.WriteLine($"庫存不足,執行緒:{Task.CurrentId} 搶購失敗!");
                        redisClient.Eval(script,lockKey,id);
                        return;
                    }

                    stock--;
                    //模擬處理業務,這裡不考慮失敗的情況
                    Thread.Sleep(TimeSpan.FromSeconds(new Random().Next(1, 3)));

                    Console.WriteLine($"執行緒:{Task.CurrentId} 消費完畢!剩餘 {stock} 個");

                    //業務處理完後,釋放鎖.
                    redisClient.Eval(script, lockKey, id);
                });
            }

 

這篇文章只介紹了單節點Redis的分散式鎖,因為單節點,所以不是高可用.

多節點Redis則需要用官方介紹的RedLock,這玩意有點繞,我需要捋一