1. 程式人生 > >使用Redis實現分散式鎖及其優化

使用Redis實現分散式鎖及其優化

目前實現分散式鎖的方式主要有資料庫、Redis和Zookeeper三種,本文主要闡述利用Redis的相關命令來實現分散式鎖。

相關Redis命令

SETNX

如果當前中沒有值,則將其設定為並返回1,否則返回0。

EXPIRE

將設定為秒後自動過期。

GETSET

將的值設定為,並返回其原來的舊值。如果原來沒有舊值,則返回nil

EVAL與EVALSHA

Redis2.6之後支援的功能,可以將一段lua指令碼傳送到Redis伺服器執行。

起——分散式鎖初探

利用SETNX命令的原子性,我們可以簡單的實現一個初步的分散式鎖(這裡原理就不詳述了,直接上虛擬碼):

booleantryLock(String key, int lockSeconds) {
  if (SETNX key "1" == 1) {
    EXPIRE key lockSeconds
    return true
  } else {
    return false
  }
}
boolean unlock(String key) {
  DEL key
}

tryLock是一個非阻塞的分散式鎖方法,在獲得鎖失敗後會立即返回。如果需要一個阻塞式的鎖方法,可以將tryLock方法包裝為輪詢(以一定的時間間隔來輪詢,這很重要,否則Redis會吃不消!)。

此種方法看似沒有什麼問題,但其實則有一個漏洞:在加鎖的過程中,客戶端順序的向Redis伺服器傳送了SETNX和EXPIRE命令,那麼假設在SETNX命令執行完成之後,在EXPIRE命令發出去之前客戶端發生崩潰(或客戶端與Redis伺服器的網路連線突然斷掉),導致EXPIRE命令沒有得到執行,其他客戶端將會發生永久死鎖!

承——分散式鎖的改進

更新:此方法解鎖存在漏洞,具體見最文後的追加內容。

為解決上面提出的問題,可以在加鎖時在key

中儲存這個鎖過期的時間(當前客戶端時間戳+鎖時間),然後在獲取鎖失敗時,取出value與當前客戶端時間進行比較,如果確定是已經過期的鎖,則可以確認發生了上面描述的錯誤情況,此時可以使用DEL清掉這個key,然後再重新嘗試去獲得這個鎖。可以嗎?當然不可以!如果沒辦法保證DEL操作和下次SETNX操作之間的原子性,則還是會產生一個競態條件,比如這樣:

C1DEL key
C1 SETNX key <expireTime>
C2 DEL key
C2 SETNX key <expireTime>

當Redis伺服器收到這樣的指令序列時,C1和C2的SETNX都同時返回了1,此時C1和C2都認為自己拿到了鎖,這種情況明顯是不符合預期的。

為解決這個問題,Redis的GETSET命令就派上用場了。客戶端可以使用GETSET命令去設定自己的過期時間,然後得到的返回值與之前GET到的返回值進行比較,如果不同,則表示這個過期的鎖被其他客戶端搶佔了(此時GETSET命令其實已經生效,也就是說key中的過期時間已經被修改,不過此誤差很小,可以忽略不計)。

根據上面的分析思路,可以得出一個改進後的分散式鎖,這裡直接給出Java的實現程式碼:

publicclass RedisLock {
    private static final Logger logger =LoggerFactory.getLogger(RedisLock.class);
    private final StringRedisTemplate stringRedisTemplate;
    private final byte[] lockKey;
    public RedisLock(StringRedisTemplate stringRedisTemplate,String lockKey) {
        this.stringRedisTemplate =stringRedisTemplate;
        this.lockKey = lockKey.getBytes();
    }
    private boolean tryLock(RedisConnection conn, intlockSeconds) throws Exception {
        long nowTime =System.currentTimeMillis();
        long expireTime = nowTime + lockSeconds* 1000 + 1000; //
容忍不同伺服器時間有1秒內的誤差
        if (conn.setNX(lockKey,longToBytes(expireTime))) {
           conn.expire(lockKey, lockSeconds);
            return true;
        } else {
            byte[]oldValue = conn.get(lockKey);
            if (oldValue!= null && bytesToLong(oldValue) < nowTime) {
               // 這個鎖已經過期了,可以獲得它
               // PS: 如果setNX和expire之間客戶端發生崩潰,可能會出現這樣的情況
               byte[] oldValue2 = conn.getSet(lockKey, longToBytes(expireTime));
               if (Arrays.equals(oldValue, oldValue2)) {
                   // 獲得了鎖
                   conn.expire(lockKey, lockSeconds);
                   return true;
               } else {
                   // 被別人搶佔了鎖(此時已經修改了lockKey中的值,不過誤差很小可以忽略)
                   return false;
               }
            }
        }
        return false;
    }
    /**
     * 嘗試獲得鎖,成功返回true,如果失敗或異常立即返回false
     *
     * @param lockSeconds 加鎖的時間(秒),超過這個時間後鎖會自動釋放
     */
    public boolean tryLock(final int lockSeconds) {
        returnstringRedisTemplate.execute(new RedisCallback<Boolean>() {
            @Override
            publicBoolean doInRedis(RedisConnection conn) throws DataAccessException {
               try {
                   return tryLock(conn, lockSeconds);
               } catch (Exception e) {
                   logger.error("tryLock Error", e);
                   return false;
               }
            }
        });
    }
    /**
     * 輪詢的方式去獲得鎖,成功返回true,超過輪詢次數或異常返回false
     *
     * @param lockSeconds      加鎖的時間(秒),超過這個時間後鎖會自動釋放
     * @param tryIntervalMillis 輪詢的時間間隔(毫秒)
     * @parammaxTryCount       最大的輪詢次數
     */
    public boolean tryLock(final int lockSeconds, final longtryIntervalMillis, final int maxTryCount) {
        return stringRedisTemplate.execute(newRedisCallback<Boolean>() {
            @Override
            publicBoolean doInRedis(RedisConnection conn) throws DataAccessException {
               int tryCount = 0;
               while (true) {
                   if (++tryCount >= maxTryCount) {
                       // 獲取鎖超時
                       return false;
                   }
                   try {
                       if (tryLock(conn, lockSeconds)) {
                           return true;
                       }
                   } catch (Exception e) {
                       logger.error("tryLock Error", e);
                       return false;
                   }
                   try {
                       Thread.sleep(tryIntervalMillis);
                   } catch (InterruptedException e) {
                       logger.error("tryLock interrupted", e);
                       return false;
                   }
               }
            }
        });
    }
    /**
     * 如果加鎖後的操作比較耗時,呼叫方其實可以在unlock前根據時間判斷下鎖是否已經過期
     * 如果已經過期可以不用呼叫,減少一次請求
     */
    public void unlock() {
        stringRedisTemplate.delete(newString(lockKey));
    }
    public byte[] longToBytes(long value) {
        ByteBuffer buffer =ByteBuffer.allocate(Long.SIZE / Byte.SIZE);
        buffer.putLong(value);
        return buffer.array();
    }
    public long bytesToLong(byte[] bytes) {
        if (bytes.length != Long.SIZE /Byte.SIZE) {
            throw newIllegalArgumentException("wrong length of bytes!");
        }
        returnByteBuffer.wrap(bytes).getLong();
    }
}

轉——分散式鎖的優化

更新:此方法解鎖存在漏洞,具體見本後最後的追加內容。

以上的分散式鎖實現邏輯已經較為複雜,涉及到了較多的Redis命令,並使得每一次嘗試加鎖的過程都會有至少2次的Redis命令執行,這也就意味著至少兩次與Redis伺服器的網路通訊。而新增後面複雜邏輯的原因只是因為SETNX與EXPIRE這兩條命令執行的原子性無法得到保證。(有些同學會提到Redis的pipeline特性,此處明顯不適用,因為第二條指令的執行以來與第一條執行的結果,pipeline無法實現)

另外,上面的分散式鎖還有一個問題,那就是伺服器之間時間同步的問題。在分散式場景中,多臺伺服器之間的時間做到同步是非常困難的,所以在程式碼中我加了1秒的時間容錯,但依賴伺服器時間的同步還是可能會不靠譜的。

從Redis 2.6開始,客戶端可以直接向Redis伺服器提交Lua指令碼,也就是說可以直接在Redis伺服器來執行一些較複雜的邏輯,而此指令碼的提交對於客戶端來說是相對原子性的。這恰好解決了我們的問題!

我們可以用一個這樣的lua指令碼來描述加鎖的邏輯(關於指令碼的提交命令和Redis的相關規則可以看https://redis.io/commands/eval):

if(redis.call('setnx', KEYS[1], ARGV[1]) == 1) then
    redis.call('expire', KEYS[1], tonumber(ARGV[2]))
    return true
else
    return false
end

注意:此指令碼中命令的執行並不是嚴格意義上的原子性,如果其中第二條指令EXPIRE執行失敗,整個指令碼執行會返回錯誤,但是第一條指令SETNX仍然是已經生效的!不過此種情況基本可以認為是Redis伺服器已經崩潰(除非是開發階段就可以排除的引數錯誤之類的問題),那麼鎖的安全性就已經不是這裡可以關注的點了。這裡認為對客戶端來說是相對原子性的就足夠了。

這個簡單的指令碼在Redis伺服器得到執行,並返回是否得到鎖。因為指令碼的提交執行只有一條Redis命令,就避免了上面所說的客戶端異常問題。

使用指令碼優化了鎖的邏輯和效能,這裡給出最終的Java實現程式碼:

publicclass RedisLock {
    private static final Logger logger =LoggerFactory.getLogger(RedisLock.class);
    private final StringRedisTemplate stringRedisTemplate;
    private final String lockKey;
    private final List<String> keys;
    /**
     *
使用指令碼在redis伺服器執行這個邏輯可以在一定程度上保證此操作的原子性
     * (即不會發生客戶端在執行setNX和expire命令之間,發生崩潰或失去與伺服器的連線導致expire沒有得到執行,發生永久死鎖)
     * <p>
     * 除非指令碼在redis伺服器執行時redis伺服器發生崩潰,不過此種情況鎖也會失效
     */
    private static final RedisScript<Boolean>SETNX_AND_EXPIRE_SCRIPT;
    static {
        StringBuilder sb = newStringBuilder();
        sb.append("if(redis.call('setnx', KEYS[1], ARGV[1]) == 1) then\n");
       sb.append("\tredis.call('expire', KEYS[1], tonumber(ARGV[2]))\n");
        sb.append("\treturn true\n");
        sb.append("else\n");
        sb.append("\treturnfalse\n");
        sb.append("end");
        SETNX_AND_EXPIRE_SCRIPT = newRedisScriptImpl<Boolean>(sb.toString(), Boolean.class);
    }
    public RedisLock(StringRedisTemplate stringRedisTemplate,String lockKey) {
        this.stringRedisTemplate =stringRedisTemplate;
        this.lockKey = lockKey;
        this.keys =Collections.singletonList(lockKey);
    }
    private boolean doTryLock(int lockSeconds) throws Exception{
        return stringRedisTemplate.execute(SETNX_AND_EXPIRE_SCRIPT,keys, "1", String.valueOf(lockSeconds));
    }
    /**
     * 嘗試獲得鎖,成功返回true,如果失敗立即返回false
     *
     * @param lockSeconds 加鎖的時間(秒),超過這個時間後鎖會自動釋放
     */
    public boolean tryLock(int lockSeconds) {
        try {
            returndoTryLock(lockSeconds);
        } catch (Exception e) {
           logger.error("tryLock Error", e);
            returnfalse;
        }
    }
    /**
     * 輪詢的方式去獲得鎖,成功返回true,超過輪詢次數或異常返回false
     *
     * @param lockSeconds      加鎖的時間(秒),超過這個時間後鎖會自動釋放
     * @param tryIntervalMillis 輪詢的時間間隔(毫秒)
     * @parammaxTryCount       最大的輪詢次數
     */
    public boolean tryLock(final int lockSeconds, final longtryIntervalMillis, final int maxTryCount) {
        int tryCount = 0;
        while (true) {
            if(++tryCount >= maxTryCount) {
               // 獲取鎖超時
               return false;
            }
            try {
               if (doTryLock(lockSeconds)) {
                   return true;
               }
            } catch(Exception e) {
               logger.error("tryLock Error", e);
               return false;
            }
            try {
               Thread.sleep(tryIntervalMillis);
            } catch(InterruptedException e) {
               logger.error("tryLock interrupted", e);
               return false;
            }
        }
    }
    /**
     * 如果加鎖後的操作比較耗時,呼叫方其實可以在unlock前根據時間判斷下鎖是否已經過期
     * 如果已經過期可以不用呼叫,減少一次請求
     */
    public void unlock() {
        stringRedisTemplate.delete(lockKey);
    }
    private static class RedisScriptImpl<T> implementsRedisScript<T> {
        private final String script;
        private final String sha1;
        private final Class<T>resultType;
        public RedisScriptImpl(Stringscript, Class<T> resultType) {
            this.script= script;
            this.sha1 =DigestUtils.sha1DigestAsHex(script);
           this.resultType = resultType;
        }
        @Override
        public String getSha1() {
            return sha1;
        }
        @Override
        public Class<T>getResultType() {
            returnresultType;
        }
        @Override
        public String getScriptAsString() {
            returnscript;
        }
    }
}

合——小節

最後,此文內容只是筆者自己學習折騰出來的結果,如果還有什麼筆者沒有考慮到的bug存在,還請不吝指出,大家一起學習進步~

追——解鎖漏洞(更新)

經過慎重考慮,發現以上實現的分散式鎖有一個較為嚴重的解鎖漏洞:因為解鎖操作只是做了簡單的DEL KEY,如果某客戶端在獲得鎖後執行業務的時間超過了鎖的過期時間,則最後的解鎖操作會誤解掉其他客戶端的操作。

為解決此問題,我們在建立RedisLock物件時用本機時間戳和UUID來建立一個絕對唯一的lockValue,然後在加鎖時存入此值,並在解鎖前用GET取出值進行比較,如果匹配才做DEL。這裡依然需要用LUA指令碼保證整個解鎖過程的原子性。

這裡給出修復此漏洞並做了一些小優化之後的程式碼:

importjava.util.Collections;
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DigestUtils;
import org.springframework.data.redis.core.script.RedisScript;
/**
 * Created On 10/24 2017
 * Redis
實現的分散式鎖(不可重入)
 * 此物件非執行緒安全,使用時務必注意
 */
public class RedisLock {
    private static final Logger logger =LoggerFactory.getLogger(RedisLock.class);
    private final StringRedisTemplate stringRedisTemplate;
    private final String lockKey;
    private final String lockValue;
    private boolean locked = false;
    /**
     * 使用指令碼在redis伺服器執行這個邏輯可以在一定程度上保證此操作的原子性
     * (即不會發生客戶端在執行setNX和expire命令之間,發生崩潰或失去與伺服器的連線導致expire沒有得到執行,發生永久死鎖)
     * <p>
     * 除非指令碼在redis伺服器執行時redis伺服器發生崩潰,不過此種情況鎖也會失效
     */
    private static final RedisScript<Boolean>SETNX_AND_EXPIRE_SCRIPT;
    static {
        StringBuilder sb = newStringBuilder();
        sb.append("if (redis.call('setnx',KEYS[1], ARGV[1]) == 1) then\n");
       sb.append("\tredis.call('expire', KEYS[1], tonumber(ARGV[2]))\n");
        sb.append("\treturntrue\n");
        sb.append("else\n");
        sb.append("\treturnfalse\n");
        sb.append("end");
        SETNX_AND_EXPIRE_SCRIPT = newRedisScriptImpl<Boolean>(sb.toString(), Boolean.class);
    }
    private static final RedisScript<Boolean>DEL_IF_GET_EQUALS;
    static {
        StringBuilder sb = newStringBuilder();
        sb.append("if (redis.call('get',KEYS[1]) == ARGV[1]) then\n");
        sb.append("\tredis.call('del',KEYS[1])\n");
        sb.append("\treturntrue\n");
        sb.append("else\n");
        sb.append("\treturnfalse\n");
        sb.append("end");
        DEL_IF_GET_EQUALS = newRedisScriptImpl<Boolean>(sb.toString(), Boolean.class);
    }
    public RedisLock(StringRedisTemplate stringRedisTemplate,String lockKey) {
        this.stringRedisTemplate =stringRedisTemplate;
        this.lockKey = lockKey;
        this.lockValue =UUID.randomUUID().toString() + "." + System.currentTimeMillis();
    }
    private boolean doTryLock(int lockSeconds) throws Exception{
        if (locked) {
            throw newIllegalStateException("already locked!");
        }
        locked = stringRedisTemplate.execute(SETNX_AND_EXPIRE_SCRIPT,Collections.singletonList(lockKey), lockValue,
                                            String.valueOf(lockSeconds));
        return locked;
    }
    /**
     * 嘗試獲得鎖,成功返回true,如果失敗立即返回false
     *
     * @param lockSeconds 加鎖的時間(秒),超過這個時間後鎖會自動釋放
     */
    public boolean tryLock(int lockSeconds) {
        try {
            returndoTryLock(lockSeconds);
        } catch (Exception e) {
           logger.error("tryLock Error", e);
            return false;
        }
    }
    /**
     * 輪詢的方式去獲得鎖,成功返回true,超過輪詢次數或異常返回false
     *
     * @paramlockSeconds       加鎖的時間(秒),超過這個時間後鎖會自動釋放
     * @param tryIntervalMillis 輪詢的時間間隔(毫秒)
     * @parammaxTryCount       最大的輪詢次數
     */
    public boolean tryLock(final int lockSeconds, final longtryIntervalMillis, final int maxTryCount) {
        int tryCount = 0;
        while (true) {
            if(++tryCount >= maxTryCount) {
               // 獲取鎖超時
               return false;
            }
            try {
               if (doTryLock(lockSeconds)) {
                   return true;
               }
            } catch(Exception e) {
               logger.error("tryLock Error", e);
               return false;
            }
            try {
               Thread.sleep(tryIntervalMillis);
            } catch(InterruptedException e) {
               logger.error("tryLock interrupted", e);
               return false;
            }
        }
    }
    /**
     * 解鎖操作
     */
    public void unlock() {
        if (!locked) {
            throw newIllegalStateException("not locked yet!");
        }
        locked = false;
        // 忽略結果
       stringRedisTemplate.execute(DEL_IF_GET_EQUALS,Collections.singletonList(lockKey), lockValue);
    }
    private static class RedisScriptImpl<T> implementsRedisScript<T> {
        private final String script;
        private final String sha1;
        private final Class<T>resultType;
        public RedisScriptImpl(Stringscript, Class<T> resultType) {
            this.script= script;
            this.sha1 =DigestUtils.sha1DigestAsHex(script);
           this.resultType = resultType;
        }
        @Override
        public String getSha1() {
            return sha1;
        }
        @Override
        public Class<T>getResultType() {
            returnresultType;
        }
        @Override
        public String getScriptAsString() {
            returnscript;
        }
    }
}

出處:http://mzorro.me/2017/10/25/redis-distributed-lock/