1. 程式人生 > 實用技巧 >分散式鎖(2) —— Redisson實現分散式鎖

分散式鎖(2) —— Redisson實現分散式鎖

什麼是 Redisson

Redisson是架設在Redis基礎上的一個Java駐記憶體資料網格(In-Memory Data Grid)。充分的利用了Redis鍵值資料庫提供的一系列優勢,基於Java實用工具包中常用介面,為使用者提供了一系列具有分散式特性的常用工具類。使得原本作為協調單機多執行緒併發程式的工具包獲得了協調分散式多機多執行緒併發系統的能力,大大降低了設計和研發大規模分散式系統的難度。同時結合各富特色的分散式服務,更進一步簡化了分散式環境中程式相互之間的協作。

Redisson 分散式重入鎖用法

Redisson 支援單點模式、主從模式、哨兵模式、叢集模式,這裡以單點模式為例:

public class RedissonDemo {
    
    public static RedissonClient getRedissonClient() {
        // 1. Create config object
        Config config = new Config();
        // use "rediss://" for SSL connection
        config.useClusterServers().addNodeAddress("redis://127.0.0.1:5379");
        // 2. Create RedissonClient
        RedissonClient redisson = Redisson.create(config);
        return redisson;
    }

    public static void main(String[] args) {
        RedissonClient redisson = getRedissonClient();
        // 3. 獲取鎖物件例項(無法保證按照執行緒的順序獲取到)
        RLock rLock = redisson.getLock("anyLock");
        long waitTimeout = 60;
        long leaseTime = 30;
        try {
            /**
             * 4. 嘗試獲取鎖
             * waitTimeout 嘗試獲取鎖的最大等待時間,超過這個值,則認為獲取鎖失敗
             * leaseTime   鎖的持有時間,超過這個時間鎖會自動失效
             						 (值應設定為大於業務處理的時間,確保在鎖有效期內業務能處理完)
             */
            boolean res = rLock.tryLock(waitTimeout, leaseTime, TimeUnit.SECONDS);
            if (res) {
                //成功獲得鎖,在這裡處理業務
            }
        } catch (Exception e) {
            throw new RuntimeException("aquire lock fail");
        }finally{
            // 無論如何,最後都要解鎖
            rLock.unlock();
        }
    }
}

加鎖原始碼分析

1. getLock方法獲取鎖物件

// org.redisson.Redisson#getLock()
@Override
public RLock getLock(String name) {
    /**
    * 構造並返回一個 RedissonLock 物件
    * commandExecutor: 與 Redis 節點通訊併發送指令的真正實現。
    * 需要說明一下,CommandExecutor 實現是通過 eval 命令來執行 Lua 指令碼
    */
    return new RedissonLock(connectionManager.getCommandExecutor(), name);
}

2. tryLock方法嘗試獲取鎖

// org.redisson.RedissonLock#tryLock()
@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
    // 最大等待時間
    long time = unit.toMillis(waitTime);
    // 當前時間
    long current = System.currentTimeMillis();
    long threadId = Thread.currentThread().getId();
    // 1.嘗試申請鎖,返回還剩餘的鎖過期時間
    Long ttl = tryAcquire(leaseTime, unit, threadId);
    // 2.如果為空,表示申請鎖成功
    if (ttl == null) {
        return true;
    }
    // 3.申請鎖的耗時如果大於等於最大等待時間,則申請鎖失敗
    time -= System.currentTimeMillis() - current;
    if (time <= 0) {
        acquireFailed(threadId);
        return false;
    }
    
    current = System.currentTimeMillis();
    /**
     * 4.訂閱鎖釋放事件,並通過await方法阻塞等待鎖釋放,有效的解決了無效的鎖申請浪費資源的問題:
     * 基於資訊量,當鎖被其它資源佔用時,
     * 當前執行緒通過 Redis 的 channel 訂閱鎖的釋放事件,一旦鎖釋放會發訊息通知待等待的執行緒進行競爭
     * 當 this.await返回false,說明等待時間已經超出獲取鎖最大等待時間,取消訂閱並返回獲取鎖失敗
     * 當 this.await返回true,進入迴圈嘗試獲取鎖
     */
    RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
    //await 方法內部是用CountDownLatch來實現阻塞,獲取subscribe非同步執行的結果(應用了Netty 的 Future)
    if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
        if (!subscribeFuture.cancel(false)) {
            subscribeFuture.onComplete((res, e) -> {
                if (e == null) {
                    unsubscribe(subscribeFuture, threadId);
                }
            });
        }
        acquireFailed(threadId);
        return false;
    }

    try {
        //計算獲取鎖的總耗時,如果大於等於最大等待時間,則獲取鎖失敗
        time -= System.currentTimeMillis() - current;
        if (time <= 0) {
            acquireFailed(threadId);
            return false;
        }
        /**
         * 5.收到鎖釋放的訊號後,在最大等待時間之內,迴圈一次接著一次的嘗試獲取鎖
         * 獲取鎖成功,則立馬返回true,
         * 若在最大等待時間之內還沒獲取到鎖,則認為獲取鎖失敗,返回false結束迴圈
         */
        while (true) {
            long currentTime = System.currentTimeMillis();
            // 再次嘗試申請鎖
            ttl = tryAcquire(leaseTime, unit, threadId);
            // 成功獲取鎖則直接返回true結束迴圈
            if (ttl == null) {
                return true;
            }
            // 超過最大等待時間則返回false結束迴圈,獲取鎖失敗
            time -= System.currentTimeMillis() - currentTime;
            if (time <= 0) {
                acquireFailed(threadId);
                return false;
            }

            /**
             * 6.阻塞等待鎖(通過訊號量(共享鎖)阻塞,等待解鎖訊息):
             */
            currentTime = System.currentTimeMillis();
            if (ttl >= 0 && ttl < time) {
                subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
            } else {
                subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
            }
            // 7.更新剩餘的等待時間(最大等待時間-已經消耗的阻塞時間)
            time -= System.currentTimeMillis() - currentTime;
            if (time <= 0) {
                acquireFailed(threadId);
                return false;
            }
        }
    } finally {
        // 8.無論是否獲得鎖,都要取消訂閱解鎖訊息
        unsubscribe(subscribeFuture, threadId);
    }
}

主體過程就是申請鎖 --> 如果獲取失敗則訂閱鎖釋放事件(通過redis釋出訂閱功能)

3.加鎖流程

private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
    return get(tryAcquireAsync(leaseTime, unit, threadId));
}

private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {
    if (leaseTime != -1) {
      return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    }
    // 加鎖核心為tryLockInnerAsync
  	RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().
                                   getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, 
                                                         threadId, RedisCommands.EVAL_LONG);
  	ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
    		if (e != null) {
      			return;
    		}

        // lock acquired
        if (ttlRemaining == null) {
          scheduleExpirationRenewal(threadId);
        }
  	});
  	return ttlRemainingFuture;
}
// org.redisson.RedissonLock#tryLockInnerAsync()
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, 
												long threadId, RedisStrictCommand<T> command) {
    internalLockLeaseTime = unit.toMillis(leaseTime);
    /**
    * KEYS[1]就是Collections.singletonList(getName()),表示分散式鎖的key;
    * ARGV[1]就是internalLockLeaseTime,即鎖的租約時間(持有鎖的有效時間),預設30s;
    * ARGV[2]就是getLockName(threadId),是獲取鎖時set的唯一值 value,即UUID+threadId。
    */
    return evalWriteAsync(getName(), LongCodec.INSTANCE, command, 
            // 如果不存在鍵則加鎖              
            "if (redis.call('exists', KEYS[1]) == 0) then " +
            "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
            "redis.call('pexpire', KEYS[1], ARGV[1]); " +
            "return nil; " +
            "end; " +
            // 如果存在鍵且是當前執行緒加鎖,則重入該鎖,並更新過期時間              
            "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
            "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
            "redis.call('pexpire', KEYS[1], ARGV[1]); " +
            "return nil; " +
            "end; " +
            // 返回鎖剩餘時間              
            "return redis.call('pttl', KEYS[1]);",
    Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}

解鎖原始碼分析

// org.redisson.RedissonLock#unlock()
@Override
public void unlock() {
  	try {
    		get(unlockAsync(Thread.currentThread().getId()));
  	} catch (RedisException e) {
   			if (e.getCause() instanceof IllegalMonitorStateException) {
      	throw (IllegalMonitorStateException) e.getCause();
    } else {
      	throw e;
    }
}
// org.redisson.RedissonLock#unlockAsync()
@Override
public RFuture<Void> unlockAsync(long threadId) {
    RPromise<Void> result = new RedissonPromise<Void>();
    // 核心方法 unlockInnerAsync
    RFuture<Boolean> future = unlockInnerAsync(threadId);

    future.onComplete((opStatus, e) -> {
        cancelExpirationRenewal(threadId);

        if (e != null) {
            result.tryFailure(e);
            return;
        }

        if (opStatus == null) {
            IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
                    + id + " thread-id: " + threadId);
            result.tryFailure(cause);
            return;
        }

        result.trySuccess(null);
    });

    return result;
}
// org.redisson.RedissonLock#unlockInnerAsync()
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
    return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            // 鎖不存在直接返回
            "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
            "return nil;" +
            "end; " +
            // 鎖存在則可重入鎖計數減1              
            "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
            // 計數>0,更新過期時間               
            "if (counter > 0) then " +
            "redis.call('pexpire', KEYS[1], ARGV[2]); " +
            "return 0; " +
            // 計數==0,刪除key,併發送解鎖事件              
            "else " +
            "redis.call('del', KEYS[1]); " +
            "redis.call('publish', KEYS[2], ARGV[1]); " +
            "return 1; " +
            "end; " +
            "return nil;",
            Arrays.asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}

解鎖訊息處理

// org.redisson.pubsub#onMessage()
public class LockPubSub extends PublishSubscribe<RedissonLockEntry> {

    public static final Long UNLOCK_MESSAGE = 0L;
    public static final Long READ_UNLOCK_MESSAGE = 1L;

    @Override
    protected void onMessage(RedissonLockEntry value, Long message) {
        // 判斷是否解鎖訊息
        if (message.equals(UNLOCK_MESSAGE)) {
            Runnable runnableToExecute = value.getListeners().poll();
            if (runnableToExecute != null) {
                runnableToExecute.run();
            }
            // 釋放一個訊號量,喚醒等待的entry.getLatch().tryAcquire去再次嘗試申請鎖
            value.getLatch().release();
        } else if (message.equals(READ_UNLOCK_MESSAGE)) {
            while (true) {
                Runnable runnableToExecute = value.getListeners().poll();
                if (runnableToExecute == null) {
                    break;
                }
                runnableToExecute.run();
            }
            value.getLatch().release(value.getLatch().getQueueLength());
        }
    }
}

總結

通過 Redisson 實現分散式鎖,基本原理和自己通過set key value px milliseconds nx + lua 實現一樣,但是效果更好些,主要因為:

  • RedissonLock是可重入的;

  • 考慮了失敗重試;

  • 阻塞的同時可以設定鎖的最大等待時間 ;

  • 在實現上也做了一些優化,減少了無效的鎖申請,提升了資源的利用率。

RedissonLock 同樣沒有解決 節點掛掉的時候,存在丟失鎖的風險的問題。而現實情況是有一些場景無法容忍的,所以 Redisson 提供了實現了redlock演算法的 RedissonRedLock,RedissonRedLock 真正解決了單點失敗的問題,代價是需要額外的為 RedissonRedLock 搭建Redis環境。

所以,如果業務場景可以容忍這種小概率的錯誤,則推薦使用 RedissonLock, 如果無法容忍,則推薦使用 RedissonRedLock。

幾個問答

1.為什麼RedissonLock是可重入的鎖?

儲存鎖的資料結構是hashmap而不是string,鎖名:{thread標識:計數值},增減計數值使用lua指令碼進行處理。

RedLock

由於Redis的主從是使用非同步的同步機制(slaveof,fork子程序生成RDB檔案,之後AOF同步)。所以無論使用哪種部署形式,在master down掉的情況下,鎖都是不可靠的(master未同步至slave, 主從切換後另外一個執行緒在slave獲取鎖)。所以Redis官方給出了RedLock演算法的解決方案。大體原理就是少數服從多數。

假設我們有N個Redis master節點,這些節點都是完全獨立的,我們不用任何複製或者其他隱含的分散式協調機制。之前我們已經描述了在Redis單例項下怎麼安全地獲取和釋放鎖。我們確保將在每(N)個例項上使用此方法獲取和釋放鎖。在我們的例子裡面我們把N設成5,這是一個比較合理的設定,所以我們需要在5臺機器上面或者5臺虛擬機器上面執行這些例項,這樣保證他們不會同時都宕掉。為了取到鎖,客戶端應該執行以下操作:

  1. 獲取當前Unix時間,以毫秒為單位。
  2. 依次嘗試從5個例項,使用相同的key和具有唯一性的value(例如UUID)獲取鎖。當向Redis請求獲取鎖時,客戶端應該設定一個嘗試從某個Reids例項獲取鎖的最大等待時間(超過這個時間,則立馬詢問下一個例項),這個超時時間應該小於鎖的失效時間。例如你的鎖自動失效時間為10秒,則超時時間應該在5-50毫秒之間。這樣可以避免伺服器端Redis已經掛掉的情況下,客戶端還在死死地等待響應結果。如果伺服器端沒有在定時間內響應,客戶端應該儘快嘗試去另外一個Redis例項請求獲取鎖。
  3. 客戶端使用當前時間減去開始獲取鎖時間(步驟1記錄的時間)就得到獲取鎖消耗的時間。當且僅當從大多數(N/2+1,這裡是3個節點)的Redis節點都取到鎖,並且使用的總耗時小於鎖失效時間時,鎖才算獲取成功。
  4. 如果取到了鎖,key的真正有效時間 = 有效時間(獲取鎖時設定的key的自動超時時間) – 獲取鎖的總耗時(詢問各個Redis例項的總耗時之和)(步驟3計算的結果)。
  5. 如果因為某些原因,最終獲取鎖失敗(即沒有在至少 “N/2+1 ”個Redis例項取到鎖或者“獲取鎖的總耗時”超過了“有效時間”),客戶端應該在所有的Redis例項上進行解鎖(即便某些Redis例項根本就沒有加鎖成功,這樣可以防止某些節點獲取到鎖但是客戶端沒有得到響應而導致接下來的一段時間不能被重新獲取鎖)。

Redisson實現RedLock

這裡以三個單機模式為例子,他們是完全相互獨立的。

public static void RedLockTest() {
			 Config config1 = new Config();
        config1.useSingleServer().setAddress("redis://127.0.0.1:5379")
          .setPassword("123456").setDatabase(0);
        RedissonClient redissonClient1 = Redisson.create(config1);
        Config config2 = new Config();
        config2.useSingleServer().setAddress("redis://127.0.0.1:5380")
          .setPassword("123456").setDatabase(0);
        RedissonClient redissonClient2 = Redisson.create(config2);
        Config config3 = new Config();
        config3.useSingleServer().setAddress("redis://127.0.0.1:5381")
          .setPassword("123456").setDatabase(0);
        RedissonClient redissonClient3 = Redisson.create(config3);

        /**
         * 獲取多個 RLock 物件
         */
        RLock lock1 = redissonClient1.getLock("anyLock");
        RLock lock2 = redissonClient2.getLock("anyLock");
        RLock lock3 = redissonClient3.getLock("anyLock");

        /**
         * 根據多個 RLock 物件構建 RedissonRedLock (最核心的差別就在這裡)
         */
        RedissonRedLock redLock = new RedissonRedLock(lock1, lock2, lock3)
        try {
            /**
             * 4.嘗試獲取鎖
             * waitTimeout 嘗試獲取鎖的最大等待時間,超過這個值,則認為獲取鎖失敗
             * leaseTime   鎖的持有時間,超過這個時間鎖會自動失效(值應設定為大於業務處理的時間,
             *             確保在鎖有效期內業務能處理完)
             */
            boolean res = redLock.tryLock((long)waitTimeout, (long)leaseTime, TimeUnit.SECONDS);
            if (res) {
                //成功獲得鎖,在這裡處理業務
            }
        } catch (Exception e) {
            throw new RuntimeException("aquire lock fail");
        }finally{
            //無論如何, 最後都要解鎖
            redLock.unlock();
        }
 }

由於RedLock演算法是建立在多個相互獨立的redis之上的(為了區分可以叫做Redisson node),所以在編碼之中也是獲取多個鎖的例項以組成一個RedLock。

Redisson實現RedLock原始碼分析

@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {

    long newLeaseTime = -1;
    if (leaseTime != -1) {
        if (waitTime == -1) {
            newLeaseTime = unit.toMillis(leaseTime);
        } else {
            newLeaseTime = unit.toMillis(waitTime)*2;
        }
    }
    
    long time = System.currentTimeMillis();
    long remainTime = -1;
    if (waitTime != -1) {
        remainTime = unit.toMillis(waitTime);
    }
    long lockWaitTime = calcLockWaitTime(remainTime);
    /**
     * 1. 允許加鎖失敗節點個數限制(N-(N/2+1))
     */
    int failedLocksLimit = failedLocksLimit();
    List<RLock> acquiredLocks = new ArrayList<>(locks.size());
    /**
     * 2. 遍歷所有節點通過EVAL命令執行lua加鎖
     */
    for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) {
        RLock lock = iterator.next();
        boolean lockAcquired;
        /**
         *  3.對節點嘗試加鎖,使用的就是RedissonLock.tryLock()
         */
        try {
            if (waitTime == -1 && leaseTime == -1) {
                lockAcquired = lock.tryLock();
            } else {
                long awaitTime = Math.min(lockWaitTime, remainTime);
                lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);
            }
        } catch (RedisResponseTimeoutException e) {
            // 如果丟擲這類異常,為了防止加鎖成功,但是響應失敗,需要解鎖所有節點
            unlockInner(Arrays.asList(lock));
            lockAcquired = false;
        } catch (Exception e) {
            // 丟擲異常表示獲取鎖失敗
            lockAcquired = false;
        }
        
        if (lockAcquired) {
            /**
             *4. 如果獲取到鎖則新增到已獲取鎖集合中
             */
            acquiredLocks.add(lock);
        } else {
             /**
             * 5. 計算已經申請鎖失敗的節點是否已經到達 允許加鎖失敗節點個數限制 (N-(N/2+1))
             * 如果已經到達, 就認定最終申請鎖失敗,則沒有必要繼續從後面的節點申請了
             * 因為 Redlock 演算法要求至少N/2+1 個節點都加鎖成功,才算最終的鎖申請成功
             */
            if (locks.size() - acquiredLocks.size() == failedLocksLimit()) {
                break;
            }

            if (failedLocksLimit == 0) {
                unlockInner(acquiredLocks);
                if (waitTime == -1) {
                    return false;
                }
                failedLocksLimit = failedLocksLimit();
                acquiredLocks.clear();
                // reset iterator
                while (iterator.hasPrevious()) {
                    iterator.previous();
                }
            } else {
                failedLocksLimit--;
            }
        }
        /**
         * 6.計算 目前從各個節點獲取鎖已經消耗的總時間,如果已經等於最大等待時間,
         *        則認定最終申請鎖失敗,返回false
         */
        if (remainTime != -1) {
            remainTime -= System.currentTimeMillis() - time;
            time = System.currentTimeMillis();
            if (remainTime <= 0) {
                unlockInner(acquiredLocks);
                return false;
            }
        }
    }

    if (leaseTime != -1) {
        List<RFuture<Boolean>> futures = new ArrayList<>(acquiredLocks.size());
        for (RLock rLock : acquiredLocks) {
            RFuture<Boolean> future = ((RedissonLock) rLock).expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS);
            futures.add(future);
        }
        
        for (RFuture<Boolean> rFuture : futures) {
            rFuture.syncUninterruptibly();
        }
    }
     /**
     * 7.如果邏輯正常執行完則認為最終申請鎖成功,返回true
     */
    return true;
}