基於redis做分散式鎖
SETNX命令簡介
命令格式
SETNX key value
將 key 的值設為 value,當且僅當 key 不存在。
若給定的 key 已經存在,則 SETNX 不做任何動作。
SETNX 是SET if Not eXists的簡寫。
返回值
返回整數,具體為
- 1,當 key 的值被設定
- 0,當 key 的值沒被設定
例子
redis> SETNX mykey “hello”
(integer) 1
redis> SETNX mykey “hello”
(integer) 0
redis> GET mykey
“hello”
redis>
使用SETNX實現分散式鎖
多個程序執行以下Redis命令:
SETNX lock.foo <current Unix time + lock timeout + 1>
如果 SETNX 返回1,說明該程序獲得鎖,SETNX將鍵 lock.foo 的值設定為鎖的超時時間(當前時間 + 鎖的有效時間)。
如果 SETNX 返回0,說明其他程序已經獲得了鎖,程序不能進入臨界區。程序可以在一個迴圈中不斷地嘗試 SETNX 操作,以獲得鎖。
解決死鎖
考慮一種情況,如果程序獲得鎖後,斷開了與 Redis 的連線(可能是程序掛掉,或者網路中斷),如果沒有有效的釋放鎖的機制,那麼其他程序都會處於一直等待的狀態,即出現“死鎖”。
上面在使用 SETNX 獲得鎖時,我們將鍵 lock.foo 的值設定為鎖的有效時間,程序獲得鎖後,其他程序還會不斷的檢測鎖是否已超時,如果超時,那麼等待的程序也將有機會獲得鎖。
然而,鎖超時時,我們不能簡單地使用 DEL 命令刪除鍵 lock.foo 以釋放鎖。考慮以下情況,程序P1已經首先獲得了鎖 lock.foo,然後程序P1掛掉了。程序P2,P3正在不斷地檢測鎖是否已釋放或者已超時,執行流程如下:
- P2和P3程序讀取鍵 lock.foo 的值,檢測鎖是否已超時(通過比較當前時間和鍵 lock.foo 的值來判斷是否超時)
- P2和P3程序發現鎖 lock.foo 已超時
- P2執行 DEL lock.foo命令
- P2執行 SETNX lock.foo命令,並返回1,即P2獲得鎖
- P3執行 DEL lock.foo命令將P2剛剛設定的鍵 lock.foo 刪除(這步是由於P3剛才已檢測到鎖已超時)
- P3執行 SETNX lock.foo命令,並返回1,即P3獲得鎖
- P2和P3同時獲得了鎖
從上面的情況可以得知,在檢測到鎖超時後,程序不能直接簡單地執行 DEL 刪除鍵的操作以獲得鎖。
為了解決上述演算法可能出現的多個程序同時獲得鎖的問題,我們再來看以下的演算法。
我們同樣假設程序P1已經首先獲得了鎖 lock.foo,然後程序P1掛掉了。接下來的情況:
- 程序P4執行 SETNX lock.foo 以嘗試獲取鎖
- 由於程序P1已獲得了鎖,所以P4執行 SETNX lock.foo 返回0,即獲取鎖失敗
- P4執行 GET lock.foo 來檢測鎖是否已超時,如果沒超時,則等待一段時間,再次檢測
- 如果P4檢測到鎖已超時,即當前的時間大於鍵 lock.foo 的值,P4會執行以下操作
GETSET lock.foo <current Unix timestamp + lock timeout + 1>
- 由於 GETSET 操作在設定鍵的值的同時,還會返回鍵的舊值,通過比較鍵 lock.foo 的舊值是否小於當前時間,可以判斷程序是否已獲得鎖
- 假如另一個程序P5也檢測到鎖已超時,並在P4之前執行了 GETSET 操作,那麼P4的 GETSET 操作返回的是一個大於當前時間的時間戳,這樣P4就不會獲得鎖而繼續等待。注意到,即使P4接下來將鍵 lock.foo 的值設定了比P5設定的更大的值也沒影響。
另外,值得注意的是,在程序釋放鎖,即執行 DEL lock.foo 操作前,需要先判斷鎖是否已超時。如果鎖已超時,那麼鎖可能已由其他程序獲得,這時直接執行 DEL lock.foo 操作會導致把其他程序已獲得的鎖釋放掉。
直接上程式碼跟大家解釋:
/**
* Redis分散式鎖(這種方式伺服器時間一定要同步,否則會出問題)
* <p>
* 執行步驟
* 1. setnx(lockkey, 當前時間+過期超時時間) ,如果返回1,則獲取鎖成功;如果返回0則沒有獲取到鎖,轉向2。
* <p>
* 2. get(lockkey)獲取值oldExpireTime ,並將這個value值與當前的系統時間進行比較,如果小於當前系統時間,則認為這個鎖已經超時,可以允許別的請求重新獲取,轉向3。
* <p>
* 3. 計算newExpireTime=當前時間+過期超時時間,然後getset(lockkey, newExpireTime) 會返回當前lockkey的值currentExpireTime。
* <p>
* 4. 判斷currentExpireTime與oldExpireTime 是否相等,如果相等,說明當前getset設定成功,獲取到了鎖。如果不相等,說明這個鎖又被別的請求獲取走了,那麼當前請求可以直接返回失敗,或者繼續重試。
* <p>
* 5. 在獲取到鎖之後,當前執行緒可以開始自己的業務處理,當處理完畢後,比較自己的處理時間和對於鎖設定的超時時間,如果小於鎖設定的超時時間,則直接執行delete釋放鎖;如果大於鎖設定的超時時間,則不需要再鎖進行處理。
*
* @author fanglingxiao
*/
public class RedisLock {
/**
* 預設請求鎖的超時時間(ms 毫秒)
*/
private static final long TIME_OUT = 100;
/**
* 預設鎖的有效時間(s)
*/
public static final int EXPIRE = 60;
private static Logger logger = LoggerFactory.getLogger(RedisLock.class);
private RedisTemplate<String, Object> redisTemplate;
/**
* 鎖標誌對應的key
*/
private String lockKey;
/**
* 鎖的有效時間(s)
*/
private int expireTime = EXPIRE;
/**
* 請求鎖的超時時間(ms)
*/
private long timeOut = TIME_OUT;
/**
* 鎖的有效時間
*/
private long expires = 0;
/**
* 鎖標記
*/
private volatile boolean locked = false;
final Random random = new Random();
/**
* 使用預設的鎖過期時間和請求鎖的超時時間
*
* @param redisTemplate
* @param lockKey 鎖的key(Redis的Key)
*/
public RedisLock(RedisTemplate<String, Object> redisTemplate, String lockKey) {
if (Strings.isNullOrEmpty(lockKey)) {
throw new IllegalArgumentException("lockKey invalid.");
}
this.redisTemplate = redisTemplate;
this.lockKey = lockKey + "_lock";
}
/**
* 使用預設的請求鎖的超時時間,指定鎖的過期時間
*
* @param redisTemplate
* @param lockKey 鎖的key(Redis的Key)
* @param expireTime 鎖的過期時間(單位:秒)
*/
public RedisLock(RedisTemplate<String, Object> redisTemplate, String lockKey, int expireTime) {
this(redisTemplate, lockKey);
this.expireTime = expireTime;
}
/**
* 使用預設的鎖的過期時間,指定請求鎖的超時時間
*
* @param redisTemplate
* @param lockKey 鎖的key(Redis的Key)
* @param timeOut 請求鎖的超時時間(單位:毫秒)
*/
public RedisLock(RedisTemplate<String, Object> redisTemplate, String lockKey, long timeOut) {
this(redisTemplate, lockKey);
this.timeOut = timeOut;
}
/**
* 鎖的過期時間和請求鎖的超時時間都是用指定的值
*
* @param redisTemplate
* @param lockKey 鎖的key(Redis的Key)
* @param expireTime 鎖的過期時間(單位:秒)
* @param timeOut 請求鎖的超時時間(單位:毫秒)
*/
public RedisLock(RedisTemplate<String, Object> redisTemplate, String lockKey, int expireTime, long timeOut) {
this(redisTemplate, lockKey, expireTime);
this.timeOut = timeOut;
}
/**
* @return 獲取鎖的key
*/
public String getLockKey() {
return lockKey;
}
/**
* 獲得 tryLock.
* 實現思路: 主要是使用了redis 的setnx命令,快取了鎖.
* reids快取的key是鎖的key,所有的共享, value是鎖的到期時間(注意:這裡把過期時間放在value了,沒有時間上設定其超時時間)
* 執行過程:
* 1.通過setnx嘗試設定某個key的值,成功(當前沒有這個鎖)則返回,成功獲得鎖
* 2.鎖已經存在則獲取鎖的到期時間,和當前時間比較,超時的話,則設定新的值
*
* @return true if tryLock is acquired, false acquire timeouted
* @throws InterruptedException in case of thread interruption
*/
public boolean tryLock() {
if (Strings.isNullOrEmpty(lockKey)) {
throw new IllegalArgumentException("lockKey invalid.");
}
// 請求鎖超時時間,納秒
long timeout = timeOut * 1000000;
// 系統當前時間,納秒
long nowTime = System.nanoTime();
while ((System.nanoTime() - nowTime) < timeout) {
// 分散式伺服器有時差,這裡給1秒的誤差值
expires = System.currentTimeMillis() + expireTime * 1000 + 1000;
//鎖到期時間
String expiresStr = String.valueOf(expires);
Boolean r = redisTemplate.opsForValue().setIfAbsent(lockKey, expiresStr);
if (r != null && r) {
locked = true;
// 設定鎖的有效期,也是鎖的自動釋放時間,也是一個客戶端在其他客戶端能搶佔鎖之前可以執行任務的時間
// 可以防止因異常情況無法釋放鎖而造成死鎖情況的發生
redisTemplate.expire(lockKey, expireTime, TimeUnit.SECONDS);
// 上鎖成功結束請求
return true;
}
//redis裡的時間
String currentValueStr = (String) redisTemplate.opsForValue().get(lockKey);
if (currentValueStr != null && Long.parseLong(currentValueStr) < System.currentTimeMillis()) {
//判斷是否為空,不為空的情況下,如果被其他執行緒設定了值,則第二個條件判斷是過不去的
// tryLock is expired
String oldValueStr = (String) redisTemplate.opsForValue().getAndSet(lockKey, expiresStr);
//獲取上一個鎖到期時間,並設定現在的鎖到期時間,
//只有一個執行緒才能獲取上一個線上的設定時間,因為jedis.getSet是同步的
if (oldValueStr != null && oldValueStr.equals(currentValueStr)) {
//防止誤刪(覆蓋,因為key是相同的)了他人的鎖——這裡達不到效果,這裡值會被覆蓋,但是因為什麼相差了很少的時間,所以可以接受
//[分散式的情況下]:如過這個時候,多個執行緒恰好都到了這裡,但是隻有一個執行緒的設定值和當前值相同,他才有權利獲取鎖
// tryLock acquired
locked = true;
return true;
}
}
/*
延遲10 毫秒, 這裡使用隨機時間可能會好一點,可以防止飢餓程序的出現,即,當同時到達多個程序,
只會有一個程序獲得鎖,其他的都用同樣的頻率進行嘗試,後面有來了一些進行,也以同樣的頻率申請鎖,這將可能導致前面來的鎖得不到滿足.
使用隨機的等待時間可以一定程度上保證公平性
*/
try {
Thread.sleep(50, random.nextInt(50000));
} catch (InterruptedException e) {
logger.error("獲取分散式鎖休眠被中斷:", e);
}
}
return locked;
}
/**
* 解鎖
*/
public void unlock() {
// 只有加鎖成功並且鎖還有效才去釋放鎖
if (locked && expires > System.currentTimeMillis()) {
redisTemplate.delete(lockKey);
locked = false;
}
}
public static <T> T lockDistributed(RedisLock locker, Callable<T> callable) {
checkNotNull(locker);
checkNotNull(callable);
try {
if (locker.tryLock()) {
return callable.call();
}
return null;
} catch (Exception e) {
logger.error("execute callable error", e);
return null;
} finally {
locker.unlock();
}
}
}
主方法:
通過建立RedisLock的有參構造設定值,通過呼叫RedisLock的lockDistributed方法去執行tryLock(),當獲取鎖成功則呼叫callable的call()方法執行此次操作,最終釋放鎖。
public static void main(String[] args) {
RedisLock redisLock = new RedisLock(redisTemplate, "key", 5000L);
Integer r = RedisLock.lockDistributed(redisLock, () -> {
//...
return 0;
});
}