1. 程式人生 > >redisson實現分佈鎖的原始碼分析、邏輯圖

redisson實現分佈鎖的原始碼分析、邏輯圖

此處只以RLock.java的void lock(long leaseTime, TimeUnit unit)方法為例。
本文按程式碼邏輯順序進行整理分析。

@Override
 public void lock(long leaseTime, TimeUnit unit) {
        try {
            lockInterruptibly(leaseTime, unit);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
@Override
    public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
        long threadId = Thread.currentThread().getId();
        //嘗試獲得鎖,此方法也是redisson加鎖的重點
        Long ttl = tryAcquire(leaseTime, unit, threadId);
        // 獲得鎖
        if (ttl == null) {
            return
; } //訂閱鎖釋放的訊息。此處redis的機制如何實現的?我還沒搞明白 RFuture<RedissonLockEntry> future = subscribe(threadId); commandExecutor.syncSubscription(future); try { while (true) { ttl = tryAcquire(leaseTime, unit, threadId); // lock acquired
if (ttl == null) { break; } // 等到鎖釋放 if (ttl >= 0) { getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } else { getEntry(threadId).getLatch().acquire(); } } } finally { //取消訂閱鎖釋放資訊 unsubscribe(future, threadId); } }
 private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
         /**
         *此處呼叫了非同步方法,然而用同步方式返回給上一段程式碼。我不明白這樣的意義何在?
         */
        return get(tryAcquireAsync(leaseTime, unit, threadId));
    }
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
        //有傳入leaseTime
        if (leaseTime != -1) {
            return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
        }
        //由於未設定leaseTime,所以用redisson預設的30秒鎖超時
        RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(LOCK_EXPIRATION_INTERVAL_SECONDS, TimeUnit.SECONDS, threadId, RedisCommands.EVAL_LONG);
        ttlRemainingFuture.addListener(new FutureListener<Long>() {
            @Override
            public void operationComplete(Future<Long> future) throws Exception {
                if (!future.isSuccess()) {
                    return;
                }

                Long ttlRemaining = future.getNow();
                // lock acquired
                if (ttlRemaining == null) {
                    scheduleExpirationRenewal(threadId);
                }
            }
        });
        return ttlRemainingFuture;
    }

tryLockInnerAsync()方法是redisson核心邏輯

<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        internalLockLeaseTime = unit.toMillis(leaseTime);
        //使用 EVAL 命令執行 Lua 指令碼獲取鎖
        //KEYS[1] :需要加鎖的key,這裡需要是字串型別。
        //ARGV[1] :鎖的超時時間,防止死鎖
        //ARGV[2] :鎖的唯一標識 id(UUID.randomUUID()) + ":" + threadId
        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                  "if (redis.call('exists', KEYS[1]) == 0) then " +/**判斷鎖是否存在*/
                      "redis.call('hset', KEYS[1], ARGV[2], 1); " +/**設定鎖*/
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +/**設定超時時間*/
                      "return nil; " +
                  "end; " +
                  "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +/**可重入鎖邏輯*/
                      "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +/**value加1*/
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +/**重新設定超時時間*/
                      "return nil; " +
                  "end; " +
                  "return redis.call('pttl', KEYS[1]);",
                    Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
    }

上面的程式碼就是redisson進行加鎖的主要程式碼。
下面是根據程式碼所畫出的流程圖。
這裡寫圖片描述

本人菜鳥,理解可能會有偏差,錯誤之處,望各位指教