1. 程式人生 > 實用技巧 >分散式鎖(3) —— 分散式鎖租約續期

分散式鎖(3) —— 分散式鎖租約續期

Redis分散式鎖在加鎖的時候,我們一般都會給一個鎖的過期時間(TTL),這是為了防止加鎖後client宕機,鎖無法被釋放的問題。但是所有這種姿勢的用法都會面臨同一個問題,就是沒發保證client的執行時間一定小於鎖的TTL。雖然大多數程式設計師都會樂觀的認為這種情況不可能發生,但是各種異常情況都會導致該問題的發生,比如網路延遲,jvm full gc。

Martin Kleppmann也質疑過這一點,這裡直接用他的圖:

  1. Client1獲取到鎖
  2. Client1開始任務,然後發生了STW的GC,時間超過了鎖的過期時間
  3. Client2 獲取到鎖,開始了任務
  4. Client1的GC結束,繼續任務,這個時候Client1和Client2都認為自己獲取了鎖,都會處理任務,從而發生錯誤。

如何解決呢?

可以給鎖設定一個WatchDog自動給鎖進行續期。實現的原理就是在加鎖成功之後啟動一個定時執行緒(WatchDog)自動給鎖進行續期。

Redisson WatchDog機制

// org.redisson.RedissonLock#tryAcquireAsync()
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {
  		  if (leaseTime != -1) {
            return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
        }
        RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(
          			commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), 
          									TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
        // 非同步獲取結果,如果獲取鎖成功,則啟動定時執行緒進行鎖續約
        ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
            if (e != null) {
                return;
            }

            // lock acquired
            if (ttlRemaining == null) {
                // 啟動WatchDog
                scheduleExpirationRenewal(threadId);
            }
        });
        return ttlRemainingFuture;
}
// org.redisson.RedissonLock#scheduleExpirationRenewal()
private static final ConcurrentMap<String, ExpirationEntry> EXPIRATION_RENEWAL_MAP = 
  																														new ConcurrentHashMap<>();
private void scheduleExpirationRenewal(long threadId) {
    ExpirationEntry entry = new ExpirationEntry();
    ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
    /**
    * 首先,會先判斷在expirationRenewalMap中是否存在了entryName,
    * 這是個map結構,主要還是判斷在這個服務例項中的加鎖客戶端的鎖key是否存在,
    * 如果已經存在了,就直接返回;主要是考慮到RedissonLock是可重入鎖。
    */
    if (oldEntry != null) {
        oldEntry.addThreadId(threadId);
    } else {
        entry.addThreadId(threadId);
        // 第一次加鎖的時候會呼叫,內部會啟動WatchDog
        renewExpiration();
    }
}
// org.redisson.RedissonLock#renewExpiration()
private void renewExpiration() {
    ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
    if (ee == null) {
        return;
    }
    
    Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
        @Override
        public void run(Timeout timeout) throws Exception {
            ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
            if (ent == null) {
                return;
            }
            Long threadId = ent.getFirstThreadId();
            if (threadId == null) {
                return;
            }
            // renewExpirationAsync續約租期
            RFuture<Boolean> future = renewExpirationAsync(threadId);
            future.onComplete((res, e) -> {
                if (e != null) {
                    log.error("Can't update lock " + getName() + " expiration", e);
                    return;
                }
                
                if (res) {
                    // reschedule itself
                    renewExpiration();
                }
            });
        }
      //每次間隔租期的1/3時間執行
    }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
    
    ee.setTimeout(task);
}
// org.redisson.RedissonLock#renewExpirationAsync()
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
  return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
            // 續約internalLockLeaseTime(30s)            
            "redis.call('pexpire', KEYS[1], ARGV[1]); " +
            "return 1; " +
            "end; " +
            "return 0;",
            Collections.singletonList(getName()),
            internalLockLeaseTime, getLockName(threadId));
}

WatchDog其實就是一個週期任務,給出一個更加簡單的例子:

private void startWatchdog(String lockName, String lockValue, int expiredTime) {
    String cronExpr = String.format("0/%s * * * * *", expiredTime / 3 > 60 ? 60 : expiredTime / 3);
    ExpirationEntry expirationEntry = new ExpirationEntry(lockName, lockValue);
    ScheduleExecutor.getInstance().addTask(new ScheduleLockWatchdogTask(expirationEntry,
                  new ScheduleCronTask(cronExpr, () -> {
            					if (isStillLock(lockName, lockValue)) {
                			jedisCluster.expire(lockName, expiredTime);
            					} else {
                				expirationEntry.setStillLock(false);
            					}
    }))).start();
}

public class ScheduleLockWatchdogTask implements ScheduleTask {
    private ExpirationEntry expirationEntry;
    private ScheduleTask cronTask;

    public ScheduleLockWatchdogTask(ExpirationEntry expirationEntry, ScheduleTask cronTask) {
        this.expirationEntry = expirationEntry;
        this.cronTask = cronTask;
    }

    @Override
    public void executeTask() {
        cronTask.executeTask();
    }
}