Redisson 分散式鎖實現分析(一)
設計分散式鎖要注意的問題
互斥
分散式系統中執行著多個節點,必須確保在同一時刻只能有一個節點的一個執行緒獲得鎖,這是最基本的一點。
死鎖
分散式系統中,可能產生死鎖的情況要相對複雜一些。分散式系統是處在複雜網路環境中的,當一個節點獲取到鎖,如果它在釋放鎖之前掛掉了,或者因網路故障無法執行釋放鎖的命令,都會導致其他節點無法申請到鎖。
因此分散式鎖有必要設定時效,確保在未來的一定時間內,無論獲得鎖的節點發生了什麼問題,最終鎖都能被釋放掉。
效能
對於訪問量大的共享資源,如果針對其獲取鎖時造成長時間的等待,導致大量節點阻塞,是絕對不能接受的。
所以設計分散式鎖時要能夠掌握鎖持有者的動態,若判斷鎖持有者處於不活動狀態,要能夠強制釋放其持有的鎖。
此外,排隊等待鎖的節點如果不知道鎖何時會被釋放,則只能隔一段時間嘗試獲取一次鎖,這樣無法保證資源的高效利用,因此當鎖釋放時,要能夠通知等待佇列,使一個等待節點能夠立刻獲得鎖。
重入
考慮到一些應用場景和資源的高效利用,鎖要設計成可重入的,就像 JDK 中的 ReentrantLock 一樣,同一個執行緒可以重複拿到同一個資源的鎖。
RedissonLock 實現解讀
本文中 Redisson 的程式碼版本為 2.2.17-SNAPSHOT。
這裡以 lock()
方法為例,其他一系列方法與其核心實現基本一致。
先來看 lock() 的基本用法
RLock lock = redisson.getLock("foobar"); // 1.獲得鎖物件例項 lock.lock(); // 2.獲取分散式鎖 try { // do sth. } finally { lock.unlock(); // 3.釋放鎖 }
- 通過 RedissonClient 的
getLock()
方法取得一個 RLock 例項。 lock()
方法嘗試獲取鎖,如果成功獲得鎖,則繼續往下執行,否則等待鎖被釋放,然後再繼續嘗試獲取鎖,直到成功獲得鎖。unlock()
方法釋放獲得的鎖,並通知等待的節點鎖已釋放。
下面來看看 RedissonLock 的具體實現
org.redisson.Redisson#getLock()
@Override public RLock getLock(String name) { return new RedissonLock(commandExecutor, name, id); }
這裡的 RLock 是繼承自 java.util.concurrent.locks.Lock 的一個 interface,getLock
返回的實際上是其實現類 RedissonLock 的例項。
來看看構造 RedissonLock 的引數
- commandExecutor: 與 Redis 節點通訊併發送指令的真正實現。需要說明一下,Redisson 預設的 CommandExecutor 實現是通過
eval
命令來執行 Lua 指令碼,所以要求 Redis 的版本必須為 2.6 或以上,否則你可能要自己來實現 CommandExecutor。關於 Redisson 的 CommandExecutor 以後會專門解讀,所以本次就不多說了。 - name: 鎖的全域性名稱,例如上面程式碼中的
"foobar"
,具體業務中通常可能使用共享資源的唯一標識作為該名稱。 - id: Redisson 客戶端唯一標識,實際上就是一個 UUID.randomUUID()。
org.redisson.RedissonLock#lock()
此處略過前面幾個方法的層層呼叫,直接看最核心部分的方法 lockInterruptibly()
,該方法在 RLock 中宣告,支援對獲取鎖的執行緒進行中斷操作。在直接使用 lock()
方法獲取鎖時,最後實際執行的是 lockInterruptibly(-1, null)
。
@Override
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
// 1.嘗試獲取鎖
Long ttl = tryAcquire(leaseTime, unit);
// 2.獲得鎖成功
if (ttl == null) {
return;
}
// 3.等待鎖釋放,並訂閱鎖
long threadId = Thread.currentThread().getId();
Future<RedissonLockEntry> future = subscribe(threadId);
get(future);
try {
while (true) {
// 4.重試獲取鎖
ttl = tryAcquire(leaseTime, unit);
// 5.成功獲得鎖
if (ttl == null) {
break;
}
// 6.等待鎖釋放
if (ttl >= 0) {
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
getEntry(threadId).getLatch().acquire();
}
}
} finally {
// 7.取消訂閱
unsubscribe(future, threadId);
}
}
- 首先嚐試獲取鎖,具體程式碼下面再看,返回結果是已存在的鎖的剩餘存活時間,為
null
則說明沒有已存在的鎖併成功獲得鎖。 - 如果獲得鎖則結束流程,回去執行業務邏輯。
- 如果沒有獲得鎖,則需等待鎖被釋放,並通過 Redis 的 channel 訂閱鎖釋放的訊息,這裡的具體實現本文也不深入,只是簡單提一下 Redisson 在執行 Redis 命令時提供了同步和非同步的兩種實現,但實際上同步的實現都是基於非同步的,具體做法是使用 Netty 中的非同步工具 Future 和 FutureListener 結合 JDK 中的 CountDownLatch 一起實現。
- 訂閱鎖的釋放訊息成功後,進入一個不斷重試獲取鎖的迴圈,迴圈中每次都先試著獲取鎖,並得到已存在的鎖的剩餘存活時間。
- 如果在重試中拿到了鎖,則結束迴圈,跳過第 6 步。
- 如果鎖當前是被佔用的,那麼等待釋放鎖的訊息,具體實現使用了 JDK 併發的訊號量工具 Semaphore 來阻塞執行緒,當鎖釋放併發布釋放鎖的訊息後,訊號量的
release()
方法會被呼叫,此時被訊號量阻塞的等待佇列中的一個執行緒就可以繼續嘗試獲取鎖了。 - 在成功獲得鎖後,就沒必要繼續訂閱鎖的釋放訊息了,因此要取消對 Redis 上相應 channel 的訂閱。
下面著重看看 tryAcquire()
方法的實現,
private Long tryAcquire(long leaseTime, TimeUnit unit) {
// 1.將非同步執行的結果以同步的形式返回
return get(tryAcquireAsync(leaseTime, unit, Thread.currentThread().getId()));
}
private <T> Future<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {
if (leaseTime != -1) {
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
// 2.用預設的鎖超時時間去獲取鎖
Future<Long> ttlRemainingFuture = tryLockInnerAsync(LOCK_EXPIRATION_INTERVAL_SECONDS,
TimeUnit.SECONDS, threadId, RedisCommands.EVAL_LONG);
ttlRemainingFuture.addListener(new FutureListener<Long>() {
@Override
public void operationComplete(Future<Long> future) throws Exception {
if (!future.isSuccess()) {
return;
}
Long ttlRemaining = future.getNow();
// 成功獲得鎖
if (ttlRemaining == null) {
// 3.鎖過期時間重新整理任務排程
scheduleExpirationRenewal();
}
}
});
return ttlRemainingFuture;
}
<T> Future<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId,
RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
// 3.使用 EVAL 命令執行 Lua 指令碼獲取鎖
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.<Object>singletonList(getName()), internalLockLeaseTime,
getLockName(threadId));
}
- 上面說過 Redisson 實現的執行 Redis 命令都是非同步的,但是它在非同步的基礎上提供了以同步的方式獲得執行結果的封裝。
- 前面提到分散式鎖要確保未來的一段時間內鎖一定能夠被釋放,因此要對鎖設定超時釋放的時間,在我們沒有指定該時間的情況下,Redisson 預設指定為30秒。
- 在成功獲取到鎖的情況下,為了避免業務中對共享資源的操作還未完成,鎖就被釋放掉了,需要定期(鎖失效時間的三分之一)重新整理鎖失效的時間,這裡 Redisson 使用了 Netty 的 TimerTask、Timeout 工具來實現該任務排程。
- 獲取鎖真正執行的命令,Redisson 使用
EVAL
命令執行上面的 Lua 指令碼來完成獲取鎖的操作: - 如果通過
exists
命令發現當前 key 不存在,即鎖沒被佔用,則執行hset
寫入 Hash 型別資料 key:全域性鎖名稱(例如共享資源ID), field:鎖例項名稱(Redisson客戶端ID:執行緒ID), value:1,並執行pexpire
對該 key 設定失效時間,返回空值nil
,至此獲取鎖成功。 - 如果通過
hexists
命令發現 Redis 中已經存在當前 key 和 field 的 Hash 資料,說明當前執行緒之前已經獲取到鎖,因為這裡的鎖是可重入的,則執行hincrby
對當前 key field 的值加一,並重新設定失效時間,返回空值,至此重入獲取鎖成功。 - 最後是鎖已被佔用的情況,即當前 key 已經存在,但是 Hash 中的 Field 與當前值不同,則執行
pttl
獲取鎖的剩餘存活時間並返回,至此獲取鎖失敗。
以上就是對 lock()
的解讀,不過在實際業務中我們可能還會經常使用 tryLock()
,雖然兩者有一定差別,但核心部分的實現都是相同的,另外還有其他一些方法可以支援更多自定義引數,本文中就不一一詳述了。
org.redisson.RedissonLock#unlock()
最後來看鎖的釋放,
@Override
public void unlock() {
// 1.通過 EVAL 和 Lua 指令碼執行 Redis 命令釋放鎖
Boolean opStatus = commandExecutor.evalWrite(getName(), LongCodec.INSTANCE,
RedisCommands.EVAL_BOOLEAN,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end;" +
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; "+
"end; " +
"return nil;",
Arrays.<Object>asList(getName(), getChannelName()),
LockPubSub.unlockMessage, internalLockLeaseTime,
getLockName(Thread.currentThread().getId()));
// 2.非鎖的持有者釋放鎖時丟擲異常
if (opStatus == null) {
throw new IllegalMonitorStateException(
"attempt to unlock lock, not locked by current thread by node id: "
+ id + " thread-id: " + Thread.currentThread().getId());
}
// 3.釋放鎖後取消重新整理鎖失效時間的排程任務
if (opStatus) {
cancelExpirationRenewal();
}
}
- 使用
EVAL
命令執行 Lua 指令碼來釋放鎖: - key 不存在,說明鎖已釋放,直接執行
publish
命令釋出釋放鎖訊息並返回1
。 - key 存在,但是 field 在 Hash 中不存在,說明自己不是鎖持有者,無權釋放鎖,返回
nil
。 - 因為鎖可重入,所以釋放鎖時不能把所有已獲取的鎖全都釋放掉,一次只能釋放一把鎖,因此執行
hincrby
對鎖的值減一。 - 釋放一把鎖後,如果還有剩餘的鎖,則重新整理鎖的失效時間並返回
0
;如果剛才釋放的已經是最後一把鎖,則執行del
命令刪除鎖的 key,併發布鎖釋放訊息,返回1
。 - 上面執行結果返回
nil
的情況(即第2中情況),因為自己不是鎖的持有者,不允許釋放別人的鎖,故丟擲異常。 - 執行結果返回
1
的情況,該鎖的所有例項都已全部釋放,所以不需要再重新整理鎖的失效時間。
總結
寫了這麼多,其實最主要的就是上面的兩段 Lua 指令碼,基於 Redis 的分散式鎖的設計完全體現在其中,看完這兩段指令碼,再回顧一下前面的 設計分散式鎖要注意的問題 就豁然開朗了。
作者:Raymond_Z
連結:https://www.jianshu.com/p/de5a69622e49
來源:簡書
簡書著作權歸作者所有,任何形式的轉載都請聯絡作者獲得授權並註明出處。
Why 分散式鎖
java.util.concurrent.locks
中包含了 JDK 提供的在多執行緒情況下對共享資源的訪問控制的一系列工具,它們可以幫助我們解決程序內多執行緒併發時的資料一致性問題。
但是在分散式系統中,JDK 原生的併發鎖工具在一些場景就無法滿足我們的要求了,這就是為什麼要使用分散式鎖。我總結了一句話,分散式鎖是用於解決分散式系統中操作共享資源時的資料一致性問題。
設計分散式鎖要注意的問題
互斥
分散式系統中執行著多個節點,必須確保在同一時刻只能有一個節點的一個執行緒獲得鎖,這是最基本的一點。
死鎖
分散式系統中,可能產生死鎖的情況要相對複雜一些。分散式系統是處在複雜網路環境中的,當一個節點獲取到鎖,如果它在釋放鎖之前掛掉了,或者因網路故障無法執行釋放鎖的命令,都會導致其他節點無法申請到鎖。
因此分散式鎖有必要設定時效,確保在未來的一定時間內,無論獲得鎖的節點發生了什麼問題,最終鎖都能被釋放掉。
效能
對於訪問量大的共享資源,如果針對其獲取鎖時造成長時間的等待,導致大量節點阻塞,是絕對不能接受的。
所以設計分散式鎖時要能夠掌握鎖持有者的動態,若判斷鎖持有者處於不活動狀態,要能夠強制釋放其持有的鎖。
此外,排隊等待鎖的節點如果不知道鎖何時會被釋放,則只能隔一段時間嘗試獲取一次鎖,這樣無法保證資源的高效利用,因此當鎖釋放時,要能夠通知等待佇列,使一個等待節點能夠立刻獲得鎖。
重入
考慮到一些應用場景和資源的高效利用,鎖要設計成可重入的,就像 JDK 中的 ReentrantLock 一樣,同一個執行緒可以重複拿到同一個資源的鎖。
RedissonLock 實現解讀
本文中 Redisson 的程式碼版本為 2.2.17-SNAPSHOT。
這裡以 lock()
方法為例,其他一系列方法與其核心實現基本一致。
先來看 lock() 的基本用法
RLock lock = redisson.getLock("foobar"); // 1.獲得鎖物件例項
lock.lock(); // 2.獲取分散式鎖
try {
// do sth.
} finally {
lock.unlock(); // 3.釋放鎖
}
- 通過 RedissonClient 的
getLock()
方法取得一個 RLock 例項。 lock()
方法嘗試獲取鎖,如果成功獲得鎖,則繼續往下執行,否則等待鎖被釋放,然後再繼續嘗試獲取鎖,直到成功獲得鎖。unlock()
方法釋放獲得的鎖,並通知等待的節點鎖已釋放。
下面來看看 RedissonLock 的具體實現
org.redisson.Redisson#getLock()
@Override
public RLock getLock(String name) {
return new RedissonLock(commandExecutor, name, id);
}
這裡的 RLock 是繼承自 java.util.concurrent.locks.Lock 的一個 interface,getLock
返回的實際上是其實現類 RedissonLock 的例項。
來看看構造 RedissonLock 的引數
- commandExecutor: 與 Redis 節點通訊併發送指令的真正實現。需要說明一下,Redisson 預設的 CommandExecutor 實現是通過
eval
命令來執行 Lua 指令碼,所以要求 Redis 的版本必須為 2.6 或以上,否則你可能要自己來實現 CommandExecutor。關於 Redisson 的 CommandExecutor 以後會專門解讀,所以本次就不多說了。 - name: 鎖的全域性名稱,例如上面程式碼中的
"foobar"
,具體業務中通常可能使用共享資源的唯一標識作為該名稱。 - id: Redisson 客戶端唯一標識,實際上就是一個 UUID.randomUUID()。
org.redisson.RedissonLock#lock()
此處略過前面幾個方法的層層呼叫,直接看最核心部分的方法 lockInterruptibly()
,該方法在 RLock 中宣告,支援對獲取鎖的執行緒進行中斷操作。在直接使用 lock()
方法獲取鎖時,最後實際執行的是 lockInterruptibly(-1, null)
。
@Override
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
// 1.嘗試獲取鎖
Long ttl = tryAcquire(leaseTime, unit);
// 2.獲得鎖成功
if (ttl == null) {
return;
}
// 3.等待鎖釋放,並訂閱鎖
long threadId = Thread.currentThread().getId();
Future<RedissonLockEntry> future = subscribe(threadId);
get(future);
try {
while (true) {
// 4.重試獲取鎖
ttl = tryAcquire(leaseTime, unit);
// 5.成功獲得鎖
if (ttl == null) {
break;
}
// 6.等待鎖釋放
if (ttl >= 0) {
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
getEntry(threadId).getLatch().acquire();
}
}
} finally {
// 7.取消訂閱
unsubscribe(future, threadId);
}
}
- 首先嚐試獲取鎖,具體程式碼下面再看,返回結果是已存在的鎖的剩餘存活時間,為
null
則說明沒有已存在的鎖併成功獲得鎖。 - 如果獲得鎖則結束流程,回去執行業務邏輯。
- 如果沒有獲得鎖,則需等待鎖被釋放,並通過 Redis 的 channel 訂閱鎖釋放的訊息,這裡的具體實現本文也不深入,只是簡單提一下 Redisson 在執行 Redis 命令時提供了同步和非同步的兩種實現,但實際上同步的實現都是基於非同步的,具體做法是使用 Netty 中的非同步工具 Future 和 FutureListener 結合 JDK 中的 CountDownLatch 一起實現。
- 訂閱鎖的釋放訊息成功後,進入一個不斷重試獲取鎖的迴圈,迴圈中每次都先試著獲取鎖,並得到已存在的鎖的剩餘存活時間。
- 如果在重試中拿到了鎖,則結束迴圈,跳過第 6 步。
- 如果鎖當前是被佔用的,那麼等待釋放鎖的訊息,具體實現使用了 JDK 併發的訊號量工具 Semaphore 來阻塞執行緒,當鎖釋放併發布釋放鎖的訊息後,訊號量的
release()
方法會被呼叫,此時被訊號量阻塞的等待佇列中的一個執行緒就可以繼續嘗試獲取鎖了。 - 在成功獲得鎖後,就沒必要繼續訂閱鎖的釋放訊息了,因此要取消對 Redis 上相應 channel 的訂閱。
下面著重看看 tryAcquire()
方法的實現,
private Long tryAcquire(long leaseTime, TimeUnit unit) {
// 1.將非同步執行的結果以同步的形式返回
return get(tryAcquireAsync(leaseTime, unit, Thread.currentThread().getId()));
}
private <T> Future<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {
if (leaseTime != -1) {
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
// 2.用預設的鎖超時時間去獲取鎖
Future<Long> ttlRemainingFuture = tryLockInnerAsync(LOCK_EXPIRATION_INTERVAL_SECONDS,
TimeUnit.SECONDS, threadId, RedisCommands.EVAL_LONG);
ttlRemainingFuture.addListener(new FutureListener<Long>() {
@Override
public void operationComplete(Future<Long> future) throws Exception {
if (!future.isSuccess()) {
return;
}
Long ttlRemaining = future.getNow();
// 成功獲得鎖
if (ttlRemaining == null) {
// 3.鎖過期時間重新整理任務排程
scheduleExpirationRenewal();
}
}
});
return ttlRemainingFuture;
}
<T> Future<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId,
RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
// 3.使用 EVAL 命令執行 Lua 指令碼獲取鎖
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.<Object>singletonList(getName()), internalLockLeaseTime,
getLockName(threadId));
}
- 上面說過 Redisson 實現的執行 Redis 命令都是非同步的,但是它在非同步的基礎上提供了以同步的方式獲得執行結果的封裝。
- 前面提到分散式鎖要確保未來的一段時間內鎖一定能夠被釋放,因此要對鎖設定超時釋放的時間,在我們沒有指定該時間的情況下,Redisson 預設指定為30秒。
- 在成功獲取到鎖的情況下,為了避免業務中對共享資源的操作還未完成,鎖就被釋放掉了,需要定期(鎖失效時間的三分之一)重新整理鎖失效的時間,這裡 Redisson 使用了 Netty 的 TimerTask、Timeout 工具來實現該任務排程。
- 獲取鎖真正執行的命令,Redisson 使用
EVAL
命令執行上面的 Lua 指令碼來完成獲取鎖的操作: - 如果通過
exists
命令發現當前 key 不存在,即鎖沒被佔用,則執行hset
寫入 Hash 型別資料 key:全域性鎖名稱(例如共享資源ID), field:鎖例項名稱(Redisson客戶端ID:執行緒ID), value:1,並執行pexpire
對該 key 設定失效時間,返回空值nil
,至此獲取鎖成功。 - 如果通過
hexists
命令發現 Redis 中已經存在當前 key 和 field 的 Hash 資料,說明當前執行緒之前已經獲取到鎖,因為這裡的鎖是可重入的,則執行hincrby
對當前 key field 的值加一,並重新設定失效時間,返回空值,至此重入獲取鎖成功。 - 最後是鎖已被佔用的情況,即當前 key 已經存在,但是 Hash 中的 Field 與當前值不同,則執行
pttl
獲取鎖的剩餘存活時間並返回,至此獲取鎖失敗。
以上就是對 lock()
的解讀,不過在實際業務中我們可能還會經常使用 tryLock()
,雖然兩者有一定差別,但核心部分的實現都是相同的,另外還有其他一些方法可以支援更多自定義引數,本文中就不一一詳述了。
org.redisson.RedissonLock#unlock()
最後來看鎖的釋放,
@Override
public void unlock() {
// 1.通過 EVAL 和 Lua 指令碼執行 Redis 命令釋放鎖
Boolean opStatus = commandExecutor.evalWrite(getName(), LongCodec.INSTANCE,
RedisCommands.EVAL_BOOLEAN,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end;" +
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; "+
"end; " +
"return nil;",
Arrays.<Object>asList(getName(), getChannelName()),
LockPubSub.unlockMessage, internalLockLeaseTime,
getLockName(Thread.currentThread().getId()));
// 2.非鎖的持有者釋放鎖時丟擲異常
if (opStatus == null) {
throw new IllegalMonitorStateException(
"attempt to unlock lock, not locked by current thread by node id: "
+ id + " thread-id: " + Thread.currentThread().getId());
}
// 3.釋放鎖後取消重新整理鎖失效時間的排程任務
if (opStatus) {
cancelExpirationRenewal();
}
}
- 使用
EVAL
命令執行 Lua 指令碼來釋放鎖: - key 不存在,說明鎖已釋放,直接執行
publish
命令釋出釋放鎖訊息並返回1
。 - key 存在,但是 field 在 Hash 中不存在,說明自己不是鎖持有者,無權釋放鎖,返回
nil
。 - 因為鎖可重入,所以釋放鎖時不能把所有已獲取的鎖全都釋放掉,一次只能釋放一把鎖,因此執行
hincrby
對鎖的值減一。 - 釋放一把鎖後,如果還有剩餘的鎖,則重新整理鎖的失效時間並返回
0
;如果剛才釋放的已經是最後一把鎖,則執行del
命令刪除鎖的 key,併發布鎖釋放訊息,返回1
。 - 上面執行結果返回
nil
的情況(即第2中情況),因為自己不是鎖的持有者,不允許釋放別人的鎖,故丟擲異常。 - 執行結果返回
1
的情況,該鎖的所有例項都已全部釋放,所以不需要再重新整理鎖的失效時間。
總結
寫了這麼多,其實最主要的就是上面的兩段 Lua 指令碼,基於 Redis 的分散式鎖的設計完全體現在其中,看完這兩段指令碼,再回顧一下前面的 設計分散式鎖要注意的問題 就豁然開朗了。
redis中lua指令碼的操作是原子性的,所以在此使用兩段lua指令碼進行操作