1. 程式人生 > >基於redis做分散式鎖

基於redis做分散式鎖

SETNX命令簡介

命令格式

SETNX key value

將 key 的值設為 value,當且僅當 key 不存在。 
若給定的 key 已經存在,則 SETNX 不做任何動作。 
SETNX 是SET if Not eXists的簡寫。

返回值

返回整數,具體為 
- 1,當 key 的值被設定 
- 0,當 key 的值沒被設定

例子

redis> SETNX mykey “hello” 
(integer) 1 
redis> SETNX mykey “hello” 
(integer) 0 
redis> GET mykey 
“hello” 
redis>

使用SETNX實現分散式鎖

多個程序執行以下Redis命令:

SETNX lock.foo <current Unix time + lock timeout + 1>

如果 SETNX 返回1,說明該程序獲得鎖,SETNX將鍵 lock.foo 的值設定為鎖的超時時間(當前時間 + 鎖的有效時間)。 
如果 SETNX 返回0,說明其他程序已經獲得了鎖,程序不能進入臨界區。程序可以在一個迴圈中不斷地嘗試 SETNX 操作,以獲得鎖。

解決死鎖

考慮一種情況,如果程序獲得鎖後,斷開了與 Redis 的連線(可能是程序掛掉,或者網路中斷),如果沒有有效的釋放鎖的機制,那麼其他程序都會處於一直等待的狀態,即出現“死鎖”。

上面在使用 SETNX 獲得鎖時,我們將鍵 lock.foo 的值設定為鎖的有效時間,程序獲得鎖後,其他程序還會不斷的檢測鎖是否已超時,如果超時,那麼等待的程序也將有機會獲得鎖。

然而,鎖超時時,我們不能簡單地使用 DEL 命令刪除鍵 lock.foo 以釋放鎖。考慮以下情況,程序P1已經首先獲得了鎖 lock.foo,然後程序P1掛掉了。程序P2,P3正在不斷地檢測鎖是否已釋放或者已超時,執行流程如下:

  • P2和P3程序讀取鍵 lock.foo 的值,檢測鎖是否已超時(通過比較當前時間和鍵 lock.foo 的值來判斷是否超時)
  • P2和P3程序發現鎖 lock.foo 已超時
  • P2執行 DEL lock.foo命令
  • P2執行 SETNX lock.foo命令,並返回1,即P2獲得鎖
  • P3執行 DEL lock.foo命令將P2剛剛設定的鍵 lock.foo 刪除(這步是由於P3剛才已檢測到鎖已超時)
  • P3執行 SETNX lock.foo命令,並返回1,即P3獲得鎖
  • P2和P3同時獲得了鎖

從上面的情況可以得知,在檢測到鎖超時後,程序不能直接簡單地執行 DEL 刪除鍵的操作以獲得鎖。

為了解決上述演算法可能出現的多個程序同時獲得鎖的問題,我們再來看以下的演算法。 
我們同樣假設程序P1已經首先獲得了鎖 lock.foo,然後程序P1掛掉了。接下來的情況:

  • 程序P4執行 SETNX lock.foo 以嘗試獲取鎖
  • 由於程序P1已獲得了鎖,所以P4執行 SETNX lock.foo 返回0,即獲取鎖失敗
  • P4執行 GET lock.foo 來檢測鎖是否已超時,如果沒超時,則等待一段時間,再次檢測
  • 如果P4檢測到鎖已超時,即當前的時間大於鍵 lock.foo 的值,P4會執行以下操作 
    GETSET lock.foo <current Unix timestamp + lock timeout + 1>
  • 由於 GETSET 操作在設定鍵的值的同時,還會返回鍵的舊值,通過比較鍵 lock.foo 的舊值是否小於當前時間,可以判斷程序是否已獲得鎖
  • 假如另一個程序P5也檢測到鎖已超時,並在P4之前執行了 GETSET 操作,那麼P4的 GETSET 操作返回的是一個大於當前時間的時間戳,這樣P4就不會獲得鎖而繼續等待。注意到,即使P4接下來將鍵 lock.foo 的值設定了比P5設定的更大的值也沒影響。

另外,值得注意的是,在程序釋放鎖,即執行 DEL lock.foo 操作前,需要先判斷鎖是否已超時。如果鎖已超時,那麼鎖可能已由其他程序獲得,這時直接執行 DEL lock.foo 操作會導致把其他程序已獲得的鎖釋放掉。

直接上程式碼跟大家解釋:

/**
 * Redis分散式鎖(這種方式伺服器時間一定要同步,否則會出問題)
 * <p>
 * 執行步驟
 * 1. setnx(lockkey, 當前時間+過期超時時間) ,如果返回1,則獲取鎖成功;如果返回0則沒有獲取到鎖,轉向2。
 * <p>
 * 2. get(lockkey)獲取值oldExpireTime ,並將這個value值與當前的系統時間進行比較,如果小於當前系統時間,則認為這個鎖已經超時,可以允許別的請求重新獲取,轉向3。
 * <p>
 * 3. 計算newExpireTime=當前時間+過期超時時間,然後getset(lockkey, newExpireTime) 會返回當前lockkey的值currentExpireTime。
 * <p>
 * 4. 判斷currentExpireTime與oldExpireTime 是否相等,如果相等,說明當前getset設定成功,獲取到了鎖。如果不相等,說明這個鎖又被別的請求獲取走了,那麼當前請求可以直接返回失敗,或者繼續重試。
 * <p>
 * 5. 在獲取到鎖之後,當前執行緒可以開始自己的業務處理,當處理完畢後,比較自己的處理時間和對於鎖設定的超時時間,如果小於鎖設定的超時時間,則直接執行delete釋放鎖;如果大於鎖設定的超時時間,則不需要再鎖進行處理。
 *
 * @author fanglingxiao
 */
public class RedisLock {

    /**
     * 預設請求鎖的超時時間(ms 毫秒)
     */
    private static final long TIME_OUT = 100;

    /**
     * 預設鎖的有效時間(s)
     */
    public static final int EXPIRE = 60;

    private static Logger logger = LoggerFactory.getLogger(RedisLock.class);

    private RedisTemplate<String, Object> redisTemplate;

    /**
     * 鎖標誌對應的key
     */
    private String lockKey;
    /**
     * 鎖的有效時間(s)
     */
    private int expireTime = EXPIRE;

    /**
     * 請求鎖的超時時間(ms)
     */
    private long timeOut = TIME_OUT;

    /**
     * 鎖的有效時間
     */
    private long expires = 0;

    /**
     * 鎖標記
     */
    private volatile boolean locked = false;

    final Random random = new Random();

    /**
     * 使用預設的鎖過期時間和請求鎖的超時時間
     *
     * @param redisTemplate
     * @param lockKey       鎖的key(Redis的Key)
     */
    public RedisLock(RedisTemplate<String, Object> redisTemplate, String lockKey) {

        if (Strings.isNullOrEmpty(lockKey)) {
            throw new IllegalArgumentException("lockKey invalid.");
        }
        this.redisTemplate = redisTemplate;
        this.lockKey = lockKey + "_lock";
    }

    /**
     * 使用預設的請求鎖的超時時間,指定鎖的過期時間
     *
     * @param redisTemplate
     * @param lockKey       鎖的key(Redis的Key)
     * @param expireTime    鎖的過期時間(單位:秒)
     */
    public RedisLock(RedisTemplate<String, Object> redisTemplate, String lockKey, int expireTime) {
        this(redisTemplate, lockKey);
        this.expireTime = expireTime;
    }

    /**
     * 使用預設的鎖的過期時間,指定請求鎖的超時時間
     *
     * @param redisTemplate
     * @param lockKey       鎖的key(Redis的Key)
     * @param timeOut       請求鎖的超時時間(單位:毫秒)
     */
    public RedisLock(RedisTemplate<String, Object> redisTemplate, String lockKey, long timeOut) {
        this(redisTemplate, lockKey);
        this.timeOut = timeOut;
    }

    /**
     * 鎖的過期時間和請求鎖的超時時間都是用指定的值
     *
     * @param redisTemplate
     * @param lockKey       鎖的key(Redis的Key)
     * @param expireTime    鎖的過期時間(單位:秒)
     * @param timeOut       請求鎖的超時時間(單位:毫秒)
     */
    public RedisLock(RedisTemplate<String, Object> redisTemplate, String lockKey, int expireTime, long timeOut) {
        this(redisTemplate, lockKey, expireTime);
        this.timeOut = timeOut;
    }

    /**
     * @return 獲取鎖的key
     */
    public String getLockKey() {
        return lockKey;
    }

    /**
     * 獲得 tryLock.
     * 實現思路: 主要是使用了redis 的setnx命令,快取了鎖.
     * reids快取的key是鎖的key,所有的共享, value是鎖的到期時間(注意:這裡把過期時間放在value了,沒有時間上設定其超時時間)
     * 執行過程:
     * 1.通過setnx嘗試設定某個key的值,成功(當前沒有這個鎖)則返回,成功獲得鎖
     * 2.鎖已經存在則獲取鎖的到期時間,和當前時間比較,超時的話,則設定新的值
     *
     * @return true if tryLock is acquired, false acquire timeouted
     * @throws InterruptedException in case of thread interruption
     */
    public boolean tryLock() {


        if (Strings.isNullOrEmpty(lockKey)) {
            throw new IllegalArgumentException("lockKey invalid.");
        }

        // 請求鎖超時時間,納秒
        long timeout = timeOut * 1000000;
        // 系統當前時間,納秒
        long nowTime = System.nanoTime();

        while ((System.nanoTime() - nowTime) < timeout) {
            // 分散式伺服器有時差,這裡給1秒的誤差值
            expires = System.currentTimeMillis() + expireTime * 1000 + 1000;
            //鎖到期時間
            String expiresStr = String.valueOf(expires);

            Boolean r = redisTemplate.opsForValue().setIfAbsent(lockKey, expiresStr);
            if (r != null && r) {
                locked = true;
                // 設定鎖的有效期,也是鎖的自動釋放時間,也是一個客戶端在其他客戶端能搶佔鎖之前可以執行任務的時間
                // 可以防止因異常情況無法釋放鎖而造成死鎖情況的發生
                redisTemplate.expire(lockKey, expireTime, TimeUnit.SECONDS);

                // 上鎖成功結束請求
                return true;
            }
            //redis裡的時間
            String currentValueStr = (String) redisTemplate.opsForValue().get(lockKey);
            if (currentValueStr != null && Long.parseLong(currentValueStr) < System.currentTimeMillis()) {
                //判斷是否為空,不為空的情況下,如果被其他執行緒設定了值,則第二個條件判斷是過不去的
                // tryLock is expired

                String oldValueStr = (String) redisTemplate.opsForValue().getAndSet(lockKey, expiresStr);
                //獲取上一個鎖到期時間,並設定現在的鎖到期時間,
                //只有一個執行緒才能獲取上一個線上的設定時間,因為jedis.getSet是同步的
                if (oldValueStr != null && oldValueStr.equals(currentValueStr)) {
                    //防止誤刪(覆蓋,因為key是相同的)了他人的鎖——這裡達不到效果,這裡值會被覆蓋,但是因為什麼相差了很少的時間,所以可以接受

                    //[分散式的情況下]:如過這個時候,多個執行緒恰好都到了這裡,但是隻有一個執行緒的設定值和當前值相同,他才有權利獲取鎖
                    // tryLock acquired
                    locked = true;
                    return true;
                }
            }

            /*
                延遲10 毫秒,  這裡使用隨機時間可能會好一點,可以防止飢餓程序的出現,即,當同時到達多個程序,
                只會有一個程序獲得鎖,其他的都用同樣的頻率進行嘗試,後面有來了一些進行,也以同樣的頻率申請鎖,這將可能導致前面來的鎖得不到滿足.
                使用隨機的等待時間可以一定程度上保證公平性
             */
            try {
                Thread.sleep(50, random.nextInt(50000));
            } catch (InterruptedException e) {
                logger.error("獲取分散式鎖休眠被中斷:", e);
            }

        }
        return locked;
    }


    /**
     * 解鎖
     */
    public void unlock() {
        // 只有加鎖成功並且鎖還有效才去釋放鎖
        if (locked && expires > System.currentTimeMillis()) {
            redisTemplate.delete(lockKey);
            locked = false;
        }
    }

    public static <T> T lockDistributed(RedisLock locker, Callable<T> callable) {
        checkNotNull(locker);
        checkNotNull(callable);

        try {
            if (locker.tryLock()) {
                return callable.call();
            }
            return null;
        } catch (Exception e) {
            logger.error("execute callable error", e);
            return null;
        } finally {
            locker.unlock();
        }
    }

}

主方法:

通過建立RedisLock的有參構造設定值,通過呼叫RedisLock的lockDistributed方法去執行tryLock(),當獲取鎖成功則呼叫callable的call()方法執行此次操作,最終釋放鎖。

public static void main(String[] args) {
    RedisLock redisLock = new RedisLock(redisTemplate, "key", 5000L);
        Integer r = RedisLock.lockDistributed(redisLock, () -> {
            //...
            return 0;
        });
}