聊聊分散式鎖的實現(一)
在併發程式設計中最常用的手段就是鎖了,比如基於JVM底層的synchronized、基於AQS的ReentrantLock;可是這些鎖都只是侷限於單機,本篇給大家介紹常見的分散式鎖。
為什麼需要分散式鎖
假如你的某個業務中存在某個需求:如果查詢不存在則生成一條記錄插入。那麼你可能會這樣寫程式碼:select --> if not exist --> insert; 這個時候你可能考慮到如果兩條執行緒同時select都拿不到結果會導致插入兩條記錄,這個時候你可能會在這個操作上加鎖保證執行緒安全,當然了具體的不同業務處理方式也有多種。如果是在分散式叢集環境中那麼該如何保證這個執行緒安全呢,這個時候你可以使用分散式鎖來解決這個問題。基於redis實現的分散式鎖
一、setnx key value
基於redis實現的分散式鎖我們可以使用setnx命令,這個命令的作用是如果指定的key不存在時則set一對k-v,這樣同一時刻就只能有一個請求可以set這個key達到加鎖的目的,但是這個命令不能同時設定過期時間,這樣可能會導致死鎖。如圖一,請求A和請求B在T1時刻同時發起setnx命令,請求A成功了,然後在T2設定key的過期時間,如果在這之前請求A所在的服務突然掛了,那這個key就一直存在,這個時候其他請求就無法加鎖。
圖一:setnx實現鎖二、set key value [expiration EX seconds|PX milliseconds] [NX|XX]
使用這個命令可以在設定一個key的同時設定key的過期時間,NX是當key不存在時進行操作,XX是當key存在時進行操作。業務執行完成之後,這個時候需要手動釋放這個鎖;那麼如何保證釋放鎖的安全性呢?首先要確保釋放的鎖是自己的,我們可以利用key對應的value來判斷當前這個key是不是自己設定的,這樣就能保證釋放的鎖是自己的;
private Boolean lock(String key,String value) {
return stringRedisTemplate.opsForValue().setIfAbsent(key,value,10L,TimeUnit.SECONDS);
}
private Boolean unLock(String key,String value) {
String cacheValue = stringRedisTemplate.opsForValue().get(key);
if (!value.equals(cacheValue)) {
return false;
}
return stringRedisTemplate.delete(key);
}
複製程式碼
那上面這段程式碼就能保證釋放鎖的安全性嗎?這個方法存在的問題在於在判斷了key對應的value與自己的value相等之後,如果這個時候key不爭氣的剛好到期失效了,其他執行緒獲取了這個鎖,那麼下面的delete key操作就將其他執行緒的鎖釋放掉了。怎麼就那麼多麼蛾子…… 那麼如何保證釋放鎖的原子性呢?
三、Lua指令碼保證釋放鎖的原子性
Lua指令碼我不過多的介紹,有興趣的同學可以去了解,直接上程式碼
private Boolean luaUnLock(String key,String value) {
ScriptSource lua = new ResourceScriptSource(new ClassPathResource("redisUnLock.lua"));
DefaultRedisScript<Boolean> redisScript = new DefaultRedisScript<>();
redisScript.setScriptSource(lua);
redisScript.setResultType(Boolean.class);
return stringRedisTemplate.execute(redisScript,Collections.singletonList(key),value);
}
複製程式碼
redisUnLock.lua
if redis.call('get',KEYS[1]) == ARGV[1] then
return redis.call('del',KEYS[1])
else
return 0
end
複製程式碼
這段指令碼比較簡單,就是比較引數key對應的value是否與引數value相等,相等則刪除這個key,在redis中lua指令碼能夠保證原子性。那麼問題叒來了!這樣就保證了這個分散式鎖的安全性嗎?現在這個分散式鎖的問題在於存在業務時間過長導致鎖過期被其他執行緒獲取的情況,此時需要檢測續租鎖來避免這個問題。
圖二:鎖過期導致業務錯誤四、redisson的watch dog實現續租鎖
4.1、Demo演示
那麼如何續租呢,主要思路就是用一個執行緒檢測當前這個業務是否執行完,鎖還有多久過期;如果鎖即將失效時業務還沒有執行完那麼就給這個鎖重新設定過期時間。這裡我們使用redisson的實現,畢竟自己實現的輪子沒那麼靠譜?。
public class RedissonLockerImpl implements RedissonLocker {
@Resource
private RedissonClient redissonClient;
@Override
public void lock(String lockKey) {
RLock lock = redissonClient.getLock(lockKey);
lock.lock();
}
@Override
public void unlock(String lockKey) {
RLock lock = redissonClient.getLock(lockKey);
lock.unlock();
}
}
public void test() {
CountDownLatch count = new CountDownLatch(2);
String lockKey = "LOCK_KEY";
CustomizeThreadPool.threadPool.execute(() -> {
try {
count.await();
redissonLocker.lock(lockKey);
log.info("執行緒1獲取鎖");
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
redissonLocker.unlock(lockKey);
log.info("執行緒1釋放鎖");
}
});
count.countDown();
CustomizeThreadPool.threadPool.execute(() -> {
try {
count.await();
redissonLocker.lock(lockKey);
log.info("執行緒2獲取鎖");
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
redissonLocker.unlock(lockKey);
log.info("執行緒2釋放鎖");
}
});
count.countDown();
}
複製程式碼
public void test2() {
String lockKey = "LOCK_KEY";
CustomizeThreadPool.threadPool.execute(() -> {
redissonLocker.lock(lockKey);
redissonLocker.lock(lockKey);
try {
TimeUnit.SECONDS.sleep(25);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
redissonLocker.unlock(lockKey);
}
});
}
複製程式碼
怎麼樣,這個API是不是相當簡潔呢,可能有同學看到之前的程式碼會問你這裡加鎖的時候為什麼沒有給鎖設定過期時間呢?你加鎖key對應的value呢?通過這兩個例子我們可以看到加鎖key對應的value是一個hash結構,第一個屬性對應的就是我們所說的value,用來判斷是否是自己加的鎖;第二個屬性對應的其實是加鎖的次數,這和Java中的ReentrantLock一樣是可重入鎖,所以第二個例子裡只做了一次unLock沒辦法釋放鎖。至於這個key對應的value和鎖過期時間在下面的原始碼分析介紹。
4.2、原始碼分析
package org.redisson.config;
import ..........
public class Config {
// 其他原始碼省略
private long lockWatchdogTimeout;
public Config() {
// 預設的鎖過期時間
this.lockWatchdogTimeout = 30000L;
}
public Config(Config oldConf) {
this.lockWatchdogTimeout = 30000L;
// 如果有讀取配置檔案修改的鎖過期時間
this.setLockWatchdogTimeout(oldConf.getLockWatchdogTimeout());
}
}
// 不帶過期時間加鎖
public void lock() {
try {
this.lockInterruptibly();
} catch (InterruptedException var2) {
Thread.currentThread().interrupt();
}
}
public void lockInterruptibly() throws InterruptedException {
this.lockInterruptibly(-1L,(TimeUnit)null);
}
public void lock(long leaseTime,TimeUnit unit) {
try {
this.lockInterruptibly(leaseTime,unit);
} catch (InterruptedException var5) {
Thread.currentThread().interrupt();
}
}
public void lockInterruptibly(long leaseTime,TimeUnit unit) throws InterruptedException {
// 獲取當前執行緒的ID
long threadId = Thread.currentThread().getId();
// 嘗試獲取鎖返回過期時間
Long ttl = this.tryAcquire(leaseTime,unit,threadId);
// 如果加鎖失敗
if (ttl != null) {
// 訂閱解鎖佇列
RFuture<RedissonLockEntry> future = this.subscribe(threadId);
this.commandExecutor.syncSubscription(future);
try {
while(true) {
// 嘗試加鎖
ttl = this.tryAcquire(leaseTime,threadId);
// 加鎖成功則返回
if (ttl == null) {
return;
}
if (ttl >= 0L) {
// 加鎖失敗阻塞
this.getEntry(threadId).getLatch().tryAcquire(ttl,TimeUnit.MILLISECONDS);
} else {
this.getEntry(threadId).getLatch().acquire();
}
}
} finally {
// 取消訂閱解鎖訊息
this.unsubscribe(future,threadId);
}
}
}
//嘗試加鎖
private Long tryAcquire(long leaseTime,TimeUnit unit,long threadId) {
return (Long)this.get(this.tryAcquireAsync(leaseTime,threadId));
}
private <T> RFuture<Long> tryAcquireAsync(long leaseTime,final long threadId) {
// 設定了預設的過期時間
if (leaseTime != -1L) {
return this.tryLockInnerAsync(leaseTime,threadId,RedisCommands.EVAL_LONG);
} else {
// 如果呼叫的是沒有過期時間的lock,則預設時間為lockWatchdogTimeout
RFuture<Long> ttlRemainingFuture = this.tryLockInnerAsync(this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),TimeUnit.MILLISECONDS,RedisCommands.EVAL_LONG);
ttlRemainingFuture.addListener(new FutureListener<Long>() {
public void operationComplete(Future<Long> future) throws Exception {
if (future.isSuccess()) {
Long ttlRemaining = (Long)future.getNow();
// 加鎖成功之後開始一個排程任務
if (ttlRemaining == null) {
RedissonLock.this.scheduleExpirationRenewal(threadId);
}
}
}
});
return ttlRemainingFuture;
}
}
// 呼叫lua指令碼非同步加鎖,value由getLockName()生成,uuid+threadId
<T> RFuture<T> tryLockInnerAsync(long leaseTime,long threadId,RedisStrictCommand<T> command) {
this.internalLockLeaseTime = unit.toMillis(leaseTime);
// 如果當前不存在key則加鎖,如果當前存在並且是自己的則加鎖次數加一
return this.commandExecutor.evalWriteAsync(this.getName(),LongCodec.INSTANCE,command,"if (redis.call('exists',KEYS[1]) == 0) then redis.call('hset',KEYS[1],ARGV[2],1); redis.call('pexpire',ARGV[1]); return nil; end; if (redis.call('hexists',ARGV[2]) == 1) then redis.call('hincrby',ARGV[1]); return nil; end; return redis.call('pttl',KEYS[1]);",Collections.singletonList(this.getName()),new Object[]{this.internalLockLeaseTime,this.getLockName(threadId)});
}
// 鎖續租定時任務
private void scheduleExpirationRenewal(final long threadId) {
if (!expirationRenewalMap.containsKey(this.getEntryName())) {
// 新增一個回撥任務,每1/3鎖過期時間執行一次
Timeout task = this.commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
public void run(Timeout timeout) throws Exception {
// 非同步重置鎖過期時間
RFuture<Boolean> future = RedissonLock.this.renewExpirationAsync(threadId);
future.addListener(new FutureListener<Boolean>() {
public void operationComplete(Future<Boolean> future) throws Exception {
// 從加鎖集合移除 RedissonLock.expirationRenewalMap.remove(RedissonLock.this.getEntryName());
if (!future.isSuccess()) {
RedissonLock.log.error("Can't update lock " + RedissonLock.this.getName() + " expiration",future.cause());
} else {
// 成功重置鎖時間之後再次呼叫
if ((Boolean)future.getNow()) {
RedissonLock.this.scheduleExpirationRenewal(threadId);
}
}
}
});
}
},this.internalLockLeaseTime / 3L,TimeUnit.MILLISECONDS);
// 保證任務不會被重複建立,取消任務
if (expirationRenewalMap.putIfAbsent(this.getEntryName(),new RedissonLock.ExpirationEntry(threadId,task)) != null) {
task.cancel();
}
}
}
複製程式碼
從原始碼中我們可以瞭解redisson實現分散式鎖的大致流程;當我們沒有設定鎖過期時間的時候,redisson會使用lockWatchdogTimeout時間(預設為30s)設定為鎖過期時間;redisson設定的redis資料結構是一個hash,其中一個屬性是鎖的值,由uuid和當前執行緒id組成,另一個屬性是加鎖次數用來實現可重入性;當沒有設定鎖過期時間的時候,redisson會每隔1/3鎖過期時間將鎖過期時間重置為初始值(預設30s時,當過期時間還有20s就會重新設定過期時間為30s)直到釋放鎖;如果設定了過期時間則不會有鎖續租的功能。加鎖的時候如果當前key不存在則直接設定key,如果存在並且是自己的則將加鎖次數加一。加鎖失敗則訂閱釋放鎖redis channel,執行緒進入阻塞。釋放鎖先判斷當前是否是自己的鎖,如果是則將當前加鎖次數減一,如果減一之後為0則刪除key,如果有續租任務則取消續租任務,向redis channel中發一條訊息喚醒被阻塞的執行緒獲取鎖。