1. 程式人生 > 實用技巧 >分散式鎖實現

分散式鎖實現

date: 2020-11-25 17:10:00
updated: 2020-11-25 17:40:00

分散式鎖實現

電商場景,當用戶下單的時候,redis 裡庫存只有一件,併發執行的時候可能會造成庫存超賣問題

通過在執行第二步加鎖,可以保證併發請求在下單的時候操作是序列化的,但是併發增多,增加一臺機器

此時還是會造成庫存超賣問題。原因是:兩個系統執行在兩個不同的JVM裡面,他們加的鎖只對屬於自己JVM裡面的執行緒有效,對於其他JVM的執行緒是無效的。即 Java提供的原生鎖機制在多機部署場景下失效了

分散式鎖:redis 或 zookeeper

1. Redis 實現方式

思路:在redis中設定一個值表示加了鎖,然後釋放鎖的時候就把這個key刪除。

// 獲取鎖
// NX是指如果key不存在就成功,key存在返回false,PX可以指定過期時間
SET anyLock unique_value NX PX 30000


// 釋放鎖:通過執行一段lua指令碼
// 釋放鎖涉及到兩條指令,這兩條指令不是原子性的
// 需要用到redis的lua指令碼支援特性,redis執行lua指令碼是原子性的
if redis.call("get",KEYS[1]) == ARGV[1] then
return redis.call("del",KEYS[1])
else
return 0
end

需要注意的地方:

  1. 一定要用SET key value NX PX milliseconds 命令
    如果不用,先設定了值,再設定過期時間,這個不是原子性操作,有可能在設定過期時間之前宕機,會造成死鎖(key永久存在)
  2. value 要具有唯一性
    這個是為了在解鎖的時候,需要驗證value是和加鎖的一致才刪除key。
    這是避免了一種情況:假設A獲取了鎖,過期時間30s,此時35s之後,鎖已經自動釋放了,A去釋放鎖,但是此時可能B獲取了鎖。A客戶端就不能刪除B的鎖了。

這樣有可能會有一個問題是:設定了key的過期時間,但是業務處理邏輯的時間可能大於過期時間,這樣A獲取了鎖,但是處理超時了,key被過期,B獲取了鎖,也有可能會惡性迴圈

元件 Redission 實現

  1. redisson所有指令都通過lua指令碼執行,redis支援lua指令碼原子性執行
  2. redisson中有一個watchdog的概念,翻譯過來就是看門狗,它會在你獲取鎖之後,每隔10秒幫你把key的超時時間設為30s。這樣的話,就算一直持有鎖也不會出現key過期了,其他執行緒獲取到鎖的問題了。同時也保證了沒有死鎖的產生,哪怕機器宕機,key也會在時間到了之後自己過期
// 加鎖邏輯
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
    if (leaseTime != -1) {
        return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    }
    // 呼叫一段lua指令碼,設定一些key、過期時間
    RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
    ttlRemainingFuture.addListener(new FutureListener<Long>() {
        @Override
        public void operationComplete(Future<Long> future) throws Exception {
            if (!future.isSuccess()) {
                return;
            }

            Long ttlRemaining = future.getNow();
            // lock acquired
            if (ttlRemaining == null) {
                // 看門狗邏輯
                scheduleExpirationRenewal(threadId);
            }
        }
    });
    return ttlRemainingFuture;
}


<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    internalLockLeaseTime = unit.toMillis(leaseTime);

    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
              "if (redis.call('exists', KEYS[1]) == 0) then " +
                  "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                  "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                  "return nil; " +
              "end; " +
              "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                  "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                  "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                  "return nil; " +
              "end; " +
              "return redis.call('pttl', KEYS[1]);",
                Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}



// 看門狗最終會呼叫了這裡
private void scheduleExpirationRenewal(final long threadId) {
    if (expirationRenewalMap.containsKey(getEntryName())) {
        return;
    }

    // 這個任務會延遲10s執行
    Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
        @Override
        public void run(Timeout timeout) throws Exception {

            // 這個操作會將key的過期時間重新設定為30s
            RFuture<Boolean> future = renewExpirationAsync(threadId);

            future.addListener(new FutureListener<Boolean>() {
                @Override
                public void operationComplete(Future<Boolean> future) throws Exception {
                    expirationRenewalMap.remove(getEntryName());
                    if (!future.isSuccess()) {
                        log.error("Can't update lock " + getName() + " expiration", future.cause());
                        return;
                    }

                    if (future.getNow()) {
                        // reschedule itself
                        // 通過遞迴呼叫本方法,無限迴圈延長過期時間
                        scheduleExpirationRenewal(threadId);
                    }
                }
            });
        }

    }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);

    if (expirationRenewalMap.putIfAbsent(getEntryName(), new ExpirationEntry(threadId, task)) != null) {
        task.cancel();
    }
}

2. ZooKeeper 實現方式

Zookeeper是一種提供配置管理、分散式協同以及命名的中心化服務。

zk的模型是這樣的:zk包含一系列的節點,叫做znode,就好像檔案系統一樣每個znode表示一個目錄,然後znode有一些特性:

  • 有序節點
    • 假如當前有一個父節點為/lock,我們可以在這個父節點下面建立子節點
      zookeeper提供了一個可選的有序特性,例如我們可以建立子節點“/lock/node-”並且指明有序,那麼zookeeper在生成子節點時會根據當前的子節點數量自動新增整數序號
      也就是說,如果是第一個建立的子節點,那麼生成的子節點為/lock/node-0000000000,下一個節點則為/lock/node-0000000001,依次類推。
  • 臨時節點
    • 客戶端可以建立一個臨時節點,在會話結束或者會話超時後,zookeeper會自動刪除該節點。
  • 事件監聽
    • 在讀取資料時,我們可以同時對節點設定事件監聽,當節點資料或結構變化時,zookeeper會通知客戶端。當前zookeeper有如下四種事件:
      • 節點建立
      • 節點刪除
      • 節點資料修改
      • 子節點變更

實現分散式鎖的思路

  1. 使用zk的臨時節點和有序節點,每個執行緒獲取鎖就是在zk建立一個臨時有序的節點,比如在/lock/目錄下。
  2. 建立節點成功後,獲取/lock目錄下的所有臨時節點,再判斷當前執行緒建立的節點是否是所有的節點的序號最小的節點
  3. 如果當前執行緒建立的節點是所有節點序號最小的節點,則認為獲取鎖成功。
  4. 如果當前執行緒建立的節點不是所有節點序號最小的節點,則對節點序號的前一個節點新增一個事件監聽。

比如當前執行緒獲取到的節點序號為 /lock/003,然後所有的節點列表為[/lock/001,/lock/002,/lock/003],則對/lock/002這個節點新增一個事件監聽器。
如果鎖釋放了,會喚醒下一個序號的節點,然後重新執行第3步,判斷是否自己的節點序號是最小。比如/lock/001釋放了,/lock/002監聽到事件,此時節點集合為[/lock/002,/lock/003],則/lock/002為最小序號節點,獲取到鎖。