基於redis的分散式鎖 | 併發程式設計網
1 介紹
這篇博文講介紹如何一步步構建一個基於Redis的分散式鎖。會從最原始的版本開始,然後根據問題進行調整,最後完成一個較為合理的分散式鎖。
本篇文章會將分散式鎖的實現分為兩部分,一個是單機環境,另一個是叢集環境下的Redis鎖實現。在介紹分散式鎖的實現之前,先來了解下分散式鎖的一些資訊。
2 分散式鎖
2.1 什麼是分散式鎖?
分散式鎖是控制分散式系統或不同系統之間共同訪問共享資源的一種鎖實現,如果不同的系統或同一個系統的不同主機之間共享了某個資源時,往往需要互斥來防止彼此干擾來保證一致性。
2.2 分散式鎖需要具備哪些條件
- 互斥性:在任意一個時刻,只有一個客戶端持有鎖。
- 無死鎖:即便持有鎖的客戶端崩潰或者其他意外事件,鎖仍然可以被獲取。
- 容錯:只要大部分Redis節點都活著,客戶端就可以獲取和釋放鎖
2.4 分散式鎖的實現有哪些?
- 資料庫
- Memcached(add命令)
- Redis(setnx命令)
- Zookeeper(臨時節點)
- 等等
3 單機Redis的分散式鎖
3.1 準備工作
3.1.1 定義常量類
public class LockConstants {
public static final String OK = "OK";
/** NX|XX, NX -- Only set the key if it does not already exist. XX -- Only set the key if it already exist. **/
public static final String NOT_EXIST = "NX";
public static final String EXIST = "XX";
/** expx EX|PX, expire time units: EX = seconds; PX = milliseconds **/
public static final String SECONDS = "EX";
public static final String MILLISECONDS = "PX";
private LockConstants () {}
}
複製程式碼
3.1.2 定義鎖的抽象類
抽象類RedisLock實現java.util.concurrent包下的Lock介面,然後對一些方法提供預設實現,子類只需實現lock方法和unlock方法即可。程式碼如下
public abstract class RedisLock implements Lock {
protected Jedis jedis;
protected String lockKey;
public RedisLock(Jedis jedis,String lockKey) {
this(jedis, lockKey);
}
public void sleepBySencond(int sencond){
try {
Thread.sleep(sencond*1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void lockInterruptibly(){}
@Override
public Condition newCondition() {
return null;
}
@Override
public boolean tryLock() {
return false;
}
@Override
public boolean tryLock(long time, TimeUnit unit){
return false;
}
}
複製程式碼
3.2 最基礎的版本1
先來一個最基礎的版本,程式碼如下
public class LockCase1 extends RedisLock {
public LockCase1(Jedis jedis, String name) {
super(jedis, name);
}
@Override
public void lock() {
while(true){
String result = jedis.set(lockKey, "value", NOT_EXIST);
if(OK.equals(result)){
System.out.println(Thread.currentThread().getId()+"加鎖成功!");
break;
}
}
}
@Override
public void unlock() {
jedis.del(lockKey);
}
}
複製程式碼
LockCase1類提供了lock和unlock方法。 其中lock方法也就是在reids客戶端執行如下命令
SET lockKey value NX
複製程式碼
而unlock方法就是呼叫DEL命令將鍵刪除。 好了,方法介紹完了。現在來想想這其中會有什麼問題? 假設有兩個客戶端A和B,A獲取到分散式的鎖。A執行了一會,突然A所在的伺服器斷電了(或者其他什麼的),也就是客戶端A掛了。這時出現一個問題,這個鎖一直存在,且不會被釋放,其他客戶端永遠獲取不到鎖。如下示意圖
可以通過設定過期時間來解決這個問題
3.3 版本2-設定鎖的過期時間
public void lock() {
while(true){
String result = jedis.set(lockKey, "value", NOT_EXIST,SECONDS,30);
if(OK.equals(result)){
System.out.println(Thread.currentThread().getId()+"加鎖成功!");
break;
}
}
}
複製程式碼
類似的Redis命令如下
SET lockKey value NX EX 30
複製程式碼
注:要保證設定過期時間和設定鎖具有原子性
這時又出現一個問題,問題出現的步驟如下
- 客戶端A獲取鎖成功,過期時間30秒。
- 客戶端A在某個操作上阻塞了50秒。
- 30秒時間到了,鎖自動釋放了。
- 客戶端B獲取到了對應同一個資源的鎖。
- 客戶端A從阻塞中恢復過來,釋放掉了客戶端B持有的鎖。
示意圖如下
這時會有兩個問題
- 過期時間如何保證大於業務執行時間?
- 如何保證鎖不會被誤刪除?
先來解決如何保證鎖不會被誤刪除這個問題。 這個問題可以通過設定value為當前客戶端生成的一個隨機字串,且保證在足夠長的一段時間內在所有客戶端的所有獲取鎖的請求中都是唯一的。
3.4 版本3-設定鎖的value
抽象類RedisLock增加lockValue欄位,lockValue欄位的預設值為UUID隨機值假設當前執行緒ID。
public abstract class RedisLock implements Lock {
//...
protected String lockValue;
public RedisLock(Jedis jedis,String lockKey) {
this(jedis, lockKey, UUID.randomUUID().toString()+Thread.currentThread().getId());
}
public RedisLock(Jedis jedis, String lockKey, String lockValue) {
this.jedis = jedis;
this.lockKey = lockKey;
this.lockValue = lockValue;
}
//...
}
複製程式碼
加鎖程式碼
public void lock() {
while(true){
String result = jedis.set(lockKey, lockValue, NOT_EXIST,SECONDS,30);
if(OK.equals(result)){
System.out.println(Thread.currentThread().getId()+"加鎖成功!");
break;
}
}
}
複製程式碼
解鎖程式碼
public void unlock() {
String lockValue = jedis.get(lockKey);
if (lockValue.equals(lockValue)){
jedis.del(lockKey);
}
}
複製程式碼
這時看看加鎖程式碼,好像沒有什麼問題啊。
再來看看解鎖的程式碼,這裡的解鎖操作包含三步操作:獲取值、判斷和刪除鎖。這時你有沒有想到在多執行緒環境下的i++
操作?
3.4.1 i++問題
i++
操作也可分為三個步驟:讀i的值,進行i+1,設定i的值。
如果兩個執行緒同時對i進行i++操作,會出現如下情況
- i設定值為0
- 執行緒A讀到i的值為0
- 執行緒B也讀到i的值為0
- 執行緒A執行了+1操作,將結果值1寫入到記憶體
- 執行緒B執行了+1操作,將結果值1寫入到記憶體
- 此時i進行了兩次i++操作,但是結果卻為1
在多執行緒環境下有什麼方式可以避免這類情況發生?
解決方式有很多種,例如用AtomicInteger、CAS、synchronized等等。
這些解決方式的目的都是要確保i++
操作的原子性。那麼回過頭來看看解鎖,同理我們也是要確保解鎖的原子性。我們可以利用Redis的lua指令碼來實現解鎖操作的原子性。
3.5 版本4-具有原子性的釋放鎖
lua指令碼內容如下
if redis.call("get",KEYS[1]) == ARGV[1] then
return redis.call("del",KEYS[1])
else
return 0
end
複製程式碼
這段Lua指令碼在執行的時候要把的lockValue作為ARGV[1]的值傳進去,把lockKey作為KEYS[1]的值傳進去。現在來看看解鎖的java程式碼
public void unlock() {
// 使用lua指令碼進行原子刪除操作
String checkAndDelScript = "if redis.call('get', KEYS[1]) == ARGV[1] then " +
"return redis.call('del', KEYS[1]) " +
"else " +
"return 0 " +
"end";
jedis.eval(checkAndDelScript, 1, lockKey, lockValue);
}
複製程式碼
好了,解鎖操作也確保了原子性了,那麼是不是單機Redis環境的分散式鎖到此就完成了? 別忘了版本2-設定鎖的過期時間還有一個,過期時間如何保證大於業務執行時間問題沒有解決。
3.6 版本5-確保過期時間大於業務執行時間
抽象類RedisLock增加一個boolean型別的屬性isOpenExpirationRenewal,用來標識是否開啟定時重新整理過期時間。 在增加一個scheduleExpirationRenewal方法用於開啟重新整理過期時間的執行緒。
public abstract class RedisLock implements Lock {
//...
protected volatile boolean isOpenExpirationRenewal = true;
/**
* 開啟定時重新整理
*/
protected void scheduleExpirationRenewal(){
Thread renewalThread = new Thread(new ExpirationRenewal());
renewalThread.start();
}
/**
* 重新整理key的過期時間
*/
private class ExpirationRenewal implements Runnable{
@Override
public void run() {
while (isOpenExpirationRenewal){
System.out.println("執行延遲失效時間中...");
String checkAndExpireScript = "if redis.call('get', KEYS[1]) == ARGV[1] then " +
"return redis.call('expire',KEYS[1],ARGV[2]) " +
"else " +
"return 0 end";
jedis.eval(checkAndExpireScript, 1, lockKey, lockValue, "30");
//休眠10秒
sleepBySencond(10);
}
}
}
}
複製程式碼
加鎖程式碼在獲取鎖成功後將isOpenExpirationRenewal置為true,並且呼叫scheduleExpirationRenewal方法,開啟重新整理過期時間的執行緒。
public void lock() {
while (true) {
String result = jedis.set(lockKey, lockValue, NOT_EXIST, SECONDS, 30);
if (OK.equals(result)) {
System.out.println("執行緒id:"+Thread.currentThread().getId() + "加鎖成功!時間:"+LocalTime.now());
//開啟定時重新整理過期時間
isOpenExpirationRenewal = true;
scheduleExpirationRenewal();
break;
}
System.out.println("執行緒id:"+Thread.currentThread().getId() + "獲取鎖失敗,休眠10秒!時間:"+LocalTime.now());
//休眠10秒
sleepBySencond(10);
}
}
複製程式碼
解鎖程式碼增加一行程式碼,將isOpenExpirationRenewal屬性置為false,停止重新整理過期時間的執行緒輪詢。
public void unlock() {
//...
isOpenExpirationRenewal = false;
}
複製程式碼
3.7 測試
測試程式碼如下
public void testLockCase5() {
//定義執行緒池
ThreadPoolExecutor pool = new ThreadPoolExecutor(0, 10,
1, TimeUnit.SECONDS,
new SynchronousQueue<>());
//新增10個執行緒獲取鎖
for (int i = 0; i < 10; i++) {
pool.submit(() -> {
try {
Jedis jedis = new Jedis("localhost");
LockCase5 lock = new LockCase5(jedis, lockName);
lock.lock();
//模擬業務執行15秒
lock.sleepBySencond(15);
lock.unlock();
} catch (Exception e){
e.printStackTrace();
}
});
}
//當執行緒池中的執行緒數為0時,退出
while (pool.getPoolSize() != 0) {}
}
複製程式碼
測試結果
或許到這裡基於單機Redis環境的分散式就介紹完了。但是使用java的同學有沒有發現一個鎖的重要特性
那就是鎖的重入,那麼分散式鎖的重入該如何實現呢?這裡就留一個坑了
4 叢集Redis的分散式鎖
在Redis的分散式環境中,Redis 的作者提供了RedLock 的演算法來實現一個分散式鎖。
4.1 加鎖
RedLock演算法加鎖步驟如下
- 獲取當前Unix時間,以毫秒為單位。
- 依次嘗試從N個例項,使用相同的key和隨機值獲取鎖。在步驟2,當向Redis設定鎖時,客戶端應該設定一個網路連線和響應超時時間,這個超時時間應該小於鎖的失效時間。例如你的鎖自動失效時間為10秒,則超時時間應該在5-50毫秒之間。這樣可以避免伺服器端Redis已經掛掉的情況下,客戶端還在死死地等待響應結果。如果伺服器端沒有在規定時間內響應,客戶端應該儘快嘗試另外一個Redis例項。
- 客戶端使用當前時間減去開始獲取鎖時間(步驟1記錄的時間)就得到獲取鎖使用的時間。當且僅當從大多數(這裡是3個節點)的Redis節點都取到鎖,並且使用的時間小於鎖失效時間時,鎖才算獲取成功。
- 如果取到了鎖,key的真正有效時間等於有效時間減去獲取鎖所使用的時間(步驟3計算的結果)。
- 如果因為某些原因,獲取鎖失敗(沒有在至少N/2+1個Redis例項取到鎖或者取鎖時間已經超過了有效時間),客戶端應該在所有的Redis例項上進行解鎖(即便某些Redis例項根本就沒有加鎖成功)。
4.2 解鎖
向所有的Redis例項傳送釋放鎖命令即可,不用關心之前有沒有從Redis例項成功獲取到鎖.
關於RedLock演算法,還有一個小插曲,就是Martin Kleppmann 和 RedLock 作者 antirez的對RedLock演算法的互懟。 官網原話如下
更多關於RedLock演算法這裡就不在說明,有興趣的可以到官網閱讀相關文章。
5 總結
這篇文章講述了一個基於Redis的分散式鎖的編寫過程及解決問題的思路,但是本篇文章實現的分散式鎖並不適合用於生產環境。java環境有 Redisson 可用於生產環境,但是分散式鎖還是Zookeeper會比較好一些(可以看Martin Kleppmann 和 RedLock的分析)。
整個專案的地址存放在Github上,有需要的可以看看:Github地址