redisson實現分佈鎖的原始碼分析、邏輯圖
阿新 • • 發佈:2019-01-07
此處只以RLock.java的void lock(long leaseTime, TimeUnit unit)方法為例。
本文按程式碼邏輯順序進行整理分析。
@Override
public void lock(long leaseTime, TimeUnit unit) {
try {
lockInterruptibly(leaseTime, unit);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
@Override
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
long threadId = Thread.currentThread().getId();
//嘗試獲得鎖,此方法也是redisson加鎖的重點
Long ttl = tryAcquire(leaseTime, unit, threadId);
// 獲得鎖
if (ttl == null) {
return ;
}
//訂閱鎖釋放的訊息。此處redis的機制如何實現的?我還沒搞明白
RFuture<RedissonLockEntry> future = subscribe(threadId);
commandExecutor.syncSubscription(future);
try {
while (true) {
ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
break;
}
// 等到鎖釋放
if (ttl >= 0) {
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
getEntry(threadId).getLatch().acquire();
}
}
} finally {
//取消訂閱鎖釋放資訊
unsubscribe(future, threadId);
}
}
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
/**
*此處呼叫了非同步方法,然而用同步方式返回給上一段程式碼。我不明白這樣的意義何在?
*/
return get(tryAcquireAsync(leaseTime, unit, threadId));
}
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
//有傳入leaseTime
if (leaseTime != -1) {
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
//由於未設定leaseTime,所以用redisson預設的30秒鎖超時
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(LOCK_EXPIRATION_INTERVAL_SECONDS, TimeUnit.SECONDS, threadId, RedisCommands.EVAL_LONG);
ttlRemainingFuture.addListener(new FutureListener<Long>() {
@Override
public void operationComplete(Future<Long> future) throws Exception {
if (!future.isSuccess()) {
return;
}
Long ttlRemaining = future.getNow();
// lock acquired
if (ttlRemaining == null) {
scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
}
tryLockInnerAsync()方法是redisson核心邏輯
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
//使用 EVAL 命令執行 Lua 指令碼獲取鎖
//KEYS[1] :需要加鎖的key,這裡需要是字串型別。
//ARGV[1] :鎖的超時時間,防止死鎖
//ARGV[2] :鎖的唯一標識 id(UUID.randomUUID()) + ":" + threadId
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +/**判斷鎖是否存在*/
"redis.call('hset', KEYS[1], ARGV[2], 1); " +/**設定鎖*/
"redis.call('pexpire', KEYS[1], ARGV[1]); " +/**設定超時時間*/
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +/**可重入鎖邏輯*/
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +/**value加1*/
"redis.call('pexpire', KEYS[1], ARGV[1]); " +/**重新設定超時時間*/
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
上面的程式碼就是redisson進行加鎖的主要程式碼。
下面是根據程式碼所畫出的流程圖。
本人菜鳥,理解可能會有偏差,錯誤之處,望各位指教