redis 分散式鎖的 5個坑,真是又大又深
引言
最近專案上線的頻率頗高,連著幾天加班熬夜,身體有點吃不消精神也有些萎靡,無奈業務方催的緊,工期就在眼前只能硬著頭皮上了。腦子渾渾噩噩的時候,寫的就不能叫程式碼,可以直接叫做Bug
。我就熬夜寫了一個bug
被罵慘了。
由於是做商城業務,要頻繁的對商品庫存進行扣減,應用是叢集部署,為避免併發造成庫存超買超賣
等問題,採用 redis
分散式鎖加以控制。本以為給扣庫存的程式碼加上鎖lock.tryLock
就萬事大吉了
/** * @author xiaofu * @description 扣減庫存 * @date 2020/4/21 12:10 */ public String stockLock() { RLock lock = redissonClient.getLock("stockLock"); try { /** * 獲取鎖 */ if (lock.tryLock(10, TimeUnit.SECONDS)) { /** * 查詢庫存數 */ Integer stock = Integer.valueOf(stringRedisTemplate.opsForValue().get("stockCount")); /** * 扣減庫存 */ if (stock > 0) { stock = stock - 1; stringRedisTemplate.opsForValue().set("stockCount", stock.toString()); LOGGER.info("庫存扣減成功,剩餘庫存數量:{}", stock); } else { LOGGER.info("庫存不足~"); } } else { LOGGER.info("未獲取到鎖業務結束.."); } } catch (Exception e) { LOGGER.info("處理異常", e); } finally { lock.unlock(); } return "ok"; }
結果業務程式碼執行完以後我忘了釋放鎖lock.unlock()
,導致redis
執行緒池被打滿,redis
服務大面積故障,造成庫存資料扣減混亂,被領導一頓臭罵,這個月績效~ 哎·~。
隨著 使用redis
鎖的時間越長,我發現 redis
鎖的坑遠比想象中要多。就算在面試題當中redis
分散式鎖的出鏡率也比較高,比如:“用鎖遇到過哪些問題?” ,“又是如何解決的?” 基本都是一套連招問出來的。
今天就分享一下我用redis
分散式鎖的踩坑日記,以及一些解決方案,和大家一起共勉。
一、鎖未被釋放
這種情況是一種低階錯誤,就是我上邊犯的錯,由於當前執行緒 獲取到redis
鎖,處理完業務後未及時釋放鎖,導致其它執行緒會一直嘗試獲取鎖阻塞,例如:用Jedis
redis.clients.jedis.exceptions.JedisConnectionException: Could not get a resource from the pool
redis執行緒池
已經沒有空閒執行緒來處理客戶端命令。
解決的方法也很簡單,只要我們細心一點,拿到鎖的執行緒處理完業務及時釋放鎖,如果是重入鎖未拿到鎖後,執行緒可以釋放當前連線並且sleep
一段時間。
public void lock() { while (true) { boolean flag = this.getLock(key); if (flag) { TODO ......... } else { // 釋放當前redis連線 redis.close(); // 休眠1000毫秒 sleep(1000); } } }
二、B的鎖被A給釋放了
我們知道Redis
實現鎖的原理在於 SETNX
命令。當 key
不存在時將 key
的值設為 value
,返回值為 1
;若給定的 key
已經存在,則 SETNX
不做任何動作,返回值為 0
。
SETNX key value
我們來設想一下這個場景:A
、B
兩個執行緒來嘗試給key
myLock
加鎖,A執行緒
先拿到鎖(假如鎖3秒
後過期),B執行緒
就在等待嘗試獲取鎖,到這一點毛病沒有。
那如果此時業務邏輯比較耗時,執行時間已經超過redis
鎖過期時間,這時A執行緒
的鎖自動釋放(刪除key
),B執行緒
檢測到myLock
這個key
不存在,執行 SETNX
命令也拿到了鎖。
但是,此時A執行緒
執行完業務邏輯之後,還是會去釋放鎖(刪除key
),這就導致B執行緒
的鎖被A執行緒
給釋放了。
為避免上邊的情況,一般我們在每個執行緒加鎖時要帶上自己獨有的value
值來標識,只釋放指定value
的key
,否則就會出現釋放鎖混亂的場景。
三、資料庫事務超時
emm~ 聊redis
鎖咋還扯到資料庫事務上來了?彆著急往下看,看下邊這段程式碼:
@Transaction
public void lock() {
while (true) {
boolean flag = this.getLock(key);
if (flag) {
insert();
}
}
}
給這個方法新增一個@Transaction
註解開啟事務,如程式碼中丟擲異常進行回滾,要知道資料庫事務可是有超時時間限制的,並不會無條件的一直等一個耗時的資料庫操作。
比如:我們解析一個大檔案,再將資料存入到資料庫,如果執行時間太長,就會導致事務超時自動回滾。
一旦你的key
長時間獲取不到鎖,獲取鎖等待的時間
遠超過資料庫事務超時時間
,程式就會報異常。
一般為解決這種問題,我們就需要將資料庫事務改為手動提交、回滾事務。
@Autowired
DataSourceTransactionManager dataSourceTransactionManager;
@Transaction
public void lock() {
//手動開啟事務
TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
try {
while (true) {
boolean flag = this.getLock(key);
if (flag) {
insert();
//手動提交事務
dataSourceTransactionManager.commit(transactionStatus);
}
}
} catch (Exception e) {
//手動回滾事務
dataSourceTransactionManager.rollback(transactionStatus);
}
}
四、鎖過期了,業務還沒執行完
這種情況和我們上邊提到的第二種比較類似,但解決思路上略有不同。
同樣是redis
分散式鎖過期,而業務邏輯沒執行完的場景,不過,這裡換一種思路想問題,把redis
鎖的過期時間再弄長點不就解決了嗎?
那還是有問題,我們可以在加鎖的時候,手動調長redis
鎖的過期時間,可這個時間多長合適?業務邏輯的執行時間是不可控的,調的過長又會影響操作效能。
要是redis
鎖的過期時間能夠自動續期就好了。
為了解決這個問題我們使用redis
客戶端redisson
,redisson
很好的解決了redis
在分散式環境下的一些棘手問題,它的宗旨就是讓使用者減少對Redis
的關注,將更多精力用在處理業務邏輯上。
redisson
對分散式鎖做了很好封裝,只需呼叫API
即可。
RLock lock = redissonClient.getLock("stockLock");
redisson
在加鎖成功後,會註冊一個定時任務監聽這個鎖,每隔10秒就去檢視這個鎖,如果還持有鎖,就對過期時間
進行續期。預設過期時間30秒。這個機制也被叫做:“看門狗
”,這名字。。。
舉例子:假如加鎖的時間是30秒,過10秒檢查一次,一旦加鎖的業務沒有執行完,就會進行一次續期,把鎖的過期時間再次重置成30秒。
通過分析下邊redisson
的原始碼實現可以發現,不管是加鎖
、解鎖
、續約
都是客戶端把一些複雜的業務邏輯,通過封裝在Lua
指令碼中傳送給redis
,保證這段複雜業務邏輯執行的原子性
。
@Slf4j
@Service
public class RedisDistributionLockPlus {
/**
* 加鎖超時時間,單位毫秒, 即:加鎖時間內執行完操作,如果未完成會有並發現象
*/
private static final long DEFAULT_LOCK_TIMEOUT = 30;
private static final long TIME_SECONDS_FIVE = 5 ;
/**
* 每個key的過期時間 {@link LockContent}
*/
private Map<String, LockContent> lockContentMap = new ConcurrentHashMap<>(512);
/**
* redis執行成功的返回
*/
private static final Long EXEC_SUCCESS = 1L;
/**
* 獲取鎖lua指令碼, k1:獲鎖key, k2:續約耗時key, arg1:requestId,arg2:超時時間
*/
private static final String LOCK_SCRIPT = "if redis.call('exists', KEYS[2]) == 1 then ARGV[2] = math.floor(redis.call('get', KEYS[2]) + 10) end " +
"if redis.call('exists', KEYS[1]) == 0 then " +
"local t = redis.call('set', KEYS[1], ARGV[1], 'EX', ARGV[2]) " +
"for k, v in pairs(t) do " +
"if v == 'OK' then return tonumber(ARGV[2]) end " +
"end " +
"return 0 end";
/**
* 釋放鎖lua指令碼, k1:獲鎖key, k2:續約耗時key, arg1:requestId,arg2:業務耗時 arg3: 業務開始設定的timeout
*/
private static final String UNLOCK_SCRIPT = "if redis.call('get', KEYS[1]) == ARGV[1] then " +
"local ctime = tonumber(ARGV[2]) " +
"local biz_timeout = tonumber(ARGV[3]) " +
"if ctime > 0 then " +
"if redis.call('exists', KEYS[2]) == 1 then " +
"local avg_time = redis.call('get', KEYS[2]) " +
"avg_time = (tonumber(avg_time) * 8 + ctime * 2)/10 " +
"if avg_time >= biz_timeout - 5 then redis.call('set', KEYS[2], avg_time, 'EX', 24*60*60) " +
"else redis.call('del', KEYS[2]) end " +
"elseif ctime > biz_timeout -5 then redis.call('set', KEYS[2], ARGV[2], 'EX', 24*60*60) end " +
"end " +
"return redis.call('del', KEYS[1]) " +
"else return 0 end";
/**
* 續約lua指令碼
*/
private static final String RENEW_SCRIPT = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('expire', KEYS[1], ARGV[2]) else return 0 end";
private final StringRedisTemplate redisTemplate;
public RedisDistributionLockPlus(StringRedisTemplate redisTemplate) {
this.redisTemplate = redisTemplate;
ScheduleTask task = new ScheduleTask(this, lockContentMap);
// 啟動定時任務
ScheduleExecutor.schedule(task, 1, 1, TimeUnit.SECONDS);
}
/**
* 加鎖
* 取到鎖加鎖,取不到鎖一直等待知道獲得鎖
*
* @param lockKey
* @param requestId 全域性唯一
* @param expire 鎖過期時間, 單位秒
* @return
*/
public boolean lock(String lockKey, String requestId, long expire) {
log.info("開始執行加鎖, lockKey ={}, requestId={}", lockKey, requestId);
for (; ; ) {
// 判斷是否已經有執行緒持有鎖,減少redis的壓力
LockContent lockContentOld = lockContentMap.get(lockKey);
boolean unLocked = null == lockContentOld;
// 如果沒有被鎖,就獲取鎖
if (unLocked) {
long startTime = System.currentTimeMillis();
// 計算超時時間
long bizExpire = expire == 0L ? DEFAULT_LOCK_TIMEOUT : expire;
String lockKeyRenew = lockKey + "_renew";
RedisScript<Long> script = RedisScript.of(LOCK_SCRIPT, Long.class);
List<String> keys = new ArrayList<>();
keys.add(lockKey);
keys.add(lockKeyRenew);
Long lockExpire = redisTemplate.execute(script, keys, requestId, Long.toString(bizExpire));
if (null != lockExpire && lockExpire > 0) {
// 將鎖放入map
LockContent lockContent = new LockContent();
lockContent.setStartTime(startTime);
lockContent.setLockExpire(lockExpire);
lockContent.setExpireTime(startTime + lockExpire * 1000);
lockContent.setRequestId(requestId);
lockContent.setThread(Thread.currentThread());
lockContent.setBizExpire(bizExpire);
lockContent.setLockCount(1);
lockContentMap.put(lockKey, lockContent);
log.info("加鎖成功, lockKey ={}, requestId={}", lockKey, requestId);
return true;
}
}
// 重複獲取鎖,線上程池中由於執行緒複用,執行緒相等並不能確定是該執行緒的鎖
if (Thread.currentThread() == lockContentOld.getThread()
&& requestId.equals(lockContentOld.getRequestId())){
// 計數 +1
lockContentOld.setLockCount(lockContentOld.getLockCount()+1);
return true;
}
// 如果被鎖或獲取鎖失敗,則等待100毫秒
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
// 這裡用lombok 有問題
log.error("獲取redis 鎖失敗, lockKey ={}, requestId={}", lockKey, requestId, e);
return false;
}
}
}
/**
* 解鎖
*
* @param lockKey
* @param lockValue
*/
public boolean unlock(String lockKey, String lockValue) {
String lockKeyRenew = lockKey + "_renew";
LockContent lockContent = lockContentMap.get(lockKey);
long consumeTime;
if (null == lockContent) {
consumeTime = 0L;
} else if (lockValue.equals(lockContent.getRequestId())) {
int lockCount = lockContent.getLockCount();
// 每次釋放鎖, 計數 -1,減到0時刪除redis上的key
if (--lockCount > 0) {
lockContent.setLockCount(lockCount);
return false;
}
consumeTime = (System.currentTimeMillis() - lockContent.getStartTime()) / 1000;
} else {
log.info("釋放鎖失敗,不是自己的鎖。");
return false;
}
// 刪除已完成key,先刪除本地快取,減少redis壓力, 分散式鎖,只有一個,所以這裡不加鎖
lockContentMap.remove(lockKey);
RedisScript<Long> script = RedisScript.of(UNLOCK_SCRIPT, Long.class);
List<String> keys = new ArrayList<>();
keys.add(lockKey);
keys.add(lockKeyRenew);
Long result = redisTemplate.execute(script, keys, lockValue, Long.toString(consumeTime),
Long.toString(lockContent.getBizExpire()));
return EXEC_SUCCESS.equals(result);
}
/**
* 續約
*
* @param lockKey
* @param lockContent
* @return true:續約成功,false:續約失敗(1、續約期間執行完成,鎖被釋放 2、不是自己的鎖,3、續約期間鎖過期了(未解決))
*/
public boolean renew(String lockKey, LockContent lockContent) {
// 檢測執行業務執行緒的狀態
Thread.State state = lockContent.getThread().getState();
if (Thread.State.TERMINATED == state) {
log.info("執行業務的執行緒已終止,不再續約 lockKey ={}, lockContent={}", lockKey, lockContent);
return false;
}
String requestId = lockContent.getRequestId();
long timeOut = (lockContent.getExpireTime() - lockContent.getStartTime()) / 1000;
RedisScript<Long> script = RedisScript.of(RENEW_SCRIPT, Long.class);
List<String> keys = new ArrayList<>();
keys.add(lockKey);
Long result = redisTemplate.execute(script, keys, requestId, Long.toString(timeOut));
log.info("續約結果,True成功,False失敗 lockKey ={}, result={}", lockKey, EXEC_SUCCESS.equals(result));
return EXEC_SUCCESS.equals(result);
}
static class ScheduleExecutor {
public static void schedule(ScheduleTask task, long initialDelay, long period, TimeUnit unit) {
long delay = unit.toMillis(initialDelay);
long period_ = unit.toMillis(period);
// 定時執行
new Timer("Lock-Renew-Task").schedule(task, delay, period_);
}
}
static class ScheduleTask extends TimerTask {
private final RedisDistributionLockPlus redisDistributionLock;
private final Map<String, LockContent> lockContentMap;
public ScheduleTask(RedisDistributionLockPlus redisDistributionLock, Map<String, LockContent> lockContentMap) {
this.redisDistributionLock = redisDistributionLock;
this.lockContentMap = lockContentMap;
}
@Override
public void run() {
if (lockContentMap.isEmpty()) {
return;
}
Set<Map.Entry<String, LockContent>> entries = lockContentMap.entrySet();
for (Map.Entry<String, LockContent> entry : entries) {
String lockKey = entry.getKey();
LockContent lockContent = entry.getValue();
long expireTime = lockContent.getExpireTime();
// 減少執行緒池中任務數量
if ((expireTime - System.currentTimeMillis())/ 1000 < TIME_SECONDS_FIVE) {
//執行緒池非同步續約
ThreadPool.submit(() -> {
boolean renew = redisDistributionLock.renew(lockKey, lockContent);
if (renew) {
long expireTimeNew = lockContent.getStartTime() + (expireTime - lockContent.getStartTime()) * 2 - TIME_SECONDS_FIVE * 1000;
lockContent.setExpireTime(expireTimeNew);
} else {
// 續約失敗,說明已經執行完 OR redis 出現問題
lockContentMap.remove(lockKey);
}
});
}
}
}
}
}
五、redis主從複製的坑
redis
高可用最常見的方案就是主從複製
(master-slave),這種模式也給redis分散式鎖
挖了一坑。
redis cluster
叢集環境下,假如現在A客戶端
想要加鎖,它會根據路由規則選擇一臺master
節點寫入key
mylock
,在加鎖成功後,master
節點會把key
非同步複製給對應的slave
節點。
如果此時redis master
節點宕機,為保證叢集可用性,會進行主備切換
,slave
變為了redis master
。B客戶端
在新的master
節點上加鎖成功,而A客戶端
也以為自己還是成功加了鎖的。
此時就會導致同一時間內多個客戶端對一個分散式鎖完成了加鎖,導致各種髒資料的產生。
至於解決辦法嘛,目前看還沒有什麼根治的方法,只能儘量保證機器的穩定性,減少發生此事件的概率。
總結
上面就是我在使用Redis
分散式鎖時遇到的一些坑,有點小感慨,經常用一個方法填上這個坑,沒多久就發現另一個坑又出來了,其實根本沒有什麼十全十美的解決方案,哪有什麼銀彈,只不過是在權衡利弊後,選一個在接受範圍內的折中方案而已。
小福利:
有一些付費課程 ,噓~,免費 送給小夥伴們。關注我的公號,回覆【666】,無套路自行領取哦
相關推薦
redis 分散式鎖的 5個坑,真是又大又深
引言 最近專案上線的頻率頗高,連著幾天加班熬夜,身體有點吃不消精神也有些萎靡,無奈業務方催的緊,工期就在眼前只能硬著頭皮上了。腦子渾渾噩噩的時候,寫的就不能叫程式碼,可以直接叫做Bug。我就熬夜寫了一個bug被罵慘了。 由於是做商城業務,要頻繁的對商品庫存進行扣減,應用是叢集部署,為避免併發造成庫存超買超賣等
【項目管理】經驗之談 | 資深項目經理都避免的5個坑,你中招了嗎?
尊重 最終 fail 同方 快速 這就是 tro 理解 動力 哈嘍!大家好! 那天看到最有趣的一句話就是 為了填坑,一位項目經理胖了20斤 。。。。。 今天就給大家介紹一下 項目經理要註意的那些“坑” 項目經理“誤踩雷區” 1 未告知成員工作目標 作為項目經理
拜託,面試請不要再問我Redis分散式鎖的實現原理!【石杉的架構筆記】
歡迎關注個人公眾號:石杉的架構筆記(ID:shishan100) 週一至五早8點半!精品技術文章準時送上! 目錄 一、寫在前面 二、Redisson實現Redis分散式鎖的底層原理 (1)加鎖機制 (2)鎖互斥機制  
拜託,面試請不要再問我Redis分散式鎖的實現原理!
目錄 一、寫在前面 二、Redisson實現Redis分散式鎖的底層原理 (1)加鎖機制 (2)鎖互斥機制 (3)watch dog自動延期機制 &nbs
Java架構-拜託,面試請不要再問我Redis分散式鎖的實現原理
一、寫在前面 現在面試,一般都會聊聊分散式系統這塊的東西。通常面試官都會從服務框架(Spring Cloud、Dubbo)聊起,一路聊到分散式事務、分散式鎖、ZooKeeper等知識。 所以咱們這篇文章就來聊聊分散式鎖這塊知識,具體的來看看Redis分散式鎖的實現原理。 說實
Redis系列-生產應用篇-分散式鎖(5)-單程序Redis分散式鎖的Java實現(Redisson使用與底層實現)-原子鎖類
Redisson單程序Redis分散式樂觀鎖的使用與實現 本文基於Redisson 3.7.5 4. 原子鎖類 Redisson中實現了兩種原子鎖類:RAtomicLong和RAtomicDouble,還有RLongAdder和RDoubleAdder RA
Redis分散式鎖----樂觀鎖的實現,以秒殺系統為例
摘要:本文使用redis來實現樂觀鎖,並以秒殺系統為例項來講解整個過程。 樂觀鎖 大多數是基於資料版本(version)的記錄機制實現的。即為資料增加一個版本標識,在基於資料庫表的版本解決方案中,一般是通過為資料庫表增加一個”version”欄位來
Redis 分散式鎖:樂觀鎖的實現,以秒殺系統為例
樂觀鎖大多數是基於資料版本(version)的記錄機制實現的。即為資料增加一個版本標識,在基於資料庫表的版本解決方案中,一般是通過為資料庫表增加一個”version”欄位來實現讀取出資料時,將此版本號一同讀出,之後更新時,對此版本號加1。此時,將提交資料的版本號與資料庫表對應
redis分散式鎖,無須設定有效期,自動檢測hold鎖的節點是否存活
1.有一個獨立的keeplive守護執行緒保證節點存活,頻率是n。 2.節點存活資訊由固定前置+mac+程序id+程序啟動時間,保證節點重啟問題。 3. 鎖的資訊由固定前置+mac+程序id+程序啟動時間。 4. 具體鎖的邏輯參考lock方法。 package six.c
分散式鎖原理的一些學習與思考redis分散式鎖,zookeeper分散式鎖
開發十年,就只剩下這套架構體系了! >>>
面試官問我,Redis分散式鎖如何續期?懵了。
開發十年,就只剩下這套架構體系了! >>>
5、redis分散式鎖
開發十年,就只剩下這套架構體系了! >>>
專案中用到了Redis分散式鎖,瞭解一下背後的原理
前言 以前在學校做小專案的時候,用到Redis,基本也只是用來當作快取。現在博主在某金融平臺實習,發現Redis在生產中並不只是當作快取這麼簡單。在我接觸到的專案中,Redis起到了一個分散式鎖的作用,具體情況是這樣的: 該專案在金融平臺中負責某塊業務,是一個分散式系統,線上大概跑著10個左右的例項。其中有一
關於Redis分散式鎖這一篇應該是講的最好的了,先收藏起來再看!
## 前言 在Java併發程式設計中,我們通常使用到`synchronized` 、`Lock`這兩個執行緒鎖,Java中的鎖,只能保證對同一個JVM中的執行緒有效。而在分散式叢集環境,這個時候我們就需要使用到分散式鎖。 **實現分散式鎖的方案** * 基於資料庫實現分散式鎖 * 基於快取R
手撕redis分散式鎖,隔壁張小帥都看懂了!
### 前言 上一篇老貓和小夥伴們分享了為什麼要使用分散式鎖以及分散式鎖的實現思路原理,目前我們主要採用第三方的元件作為分散式鎖的工具。上一篇運用了Mysql中的select ...for update實現了分散式鎖,但是我們說這種實現方式並不常用,因為當大併發量的時候,會給資料庫帶來比較大的壓力。當然也有
Java踩坑筆記:ObjectIOStream與IOStream的各種裝飾器(先挖個坑,以後再來詳細填)
ted objects lose val read thread 環境 valid 序列化對象 Java的序列化和ObjectStream真是一個大坑。。 先不說多線程環境下的問題,在單線程裏,一個Socket只能保持一個ObjectOutputStream,原因好像是
挖個坑,寫一個Spring+SpringMVC+Mybatis的項目
pri 自己 什麽 空間 ati 並且 servle 用戶註冊 留言板 想挖個坑督促自己練技術,有時候想到一個項目,大概想了一些要實現的功能,怎麽實現。現在覺得自己差不多能完成QQ空間的主要功能了。準備立個牌坊,寫一個類似功能的網站。並且把進度放到這裏來。 初步計劃
中國歷史上5個謠言,單是第1條就騙了不少人!
內部 說明 道理 技術分享 -i 取代 統一 三國 並不是 謠言1:嘉慶要殺和珅,和珅拿出乾隆留下的保命密信,打開一看卻傻了眼,上邊寫著三個字:“留全屍”。 【真相】:這個橋段其實是小說和電視劇裏杜撰的,只是為了增加劇情的需要,但卻被當成了歷史上的真事。身為清朝第一
學習程式設計的25個“坑”,你踩到了嗎?
0、忽視了程式設計終究是以人為本的。是的,機器的確會執行你的程式碼,但程式設計是為人們解決問題的,將他們的需求轉換為由其他人(或你自己線下)讀取,維護和修改的程式碼。 1、未能花充足的時間練習程式設計。 根據工作或日程安排,你可能會花費大量時間在與程式設計相關的任務上,但實際上這些並非程式設計本