Redis分布式鎖解決搶購問題
首先分享一個業務場景-搶購。一個典型的高並發問題,所需的最關鍵字段就是庫存,在高並發的情況下每次都去數據庫查詢顯然是不合適的,因此把庫存信息存入Redis中,利用redis的鎖機制來控制並發訪問,是一個不錯的解決方案。
首先是一段業務代碼:
@Transactional
public void orderProductMockDiffUser(String productId){
//1.查庫存
int stockNum = stock.get(productId);
if(stocknum == 0){
throw new SellException(ProductStatusEnum.STOCK_EMPTY);
//這裏拋出的異常要是運行時異常,否則無法進行數據回滾,這也是spring中比較基礎的
}else{
//2.下單
orders.put(KeyUtil.genUniqueKey(),productId);//生成隨機用戶id模擬高並發
sotckNum = stockNum-1;
try{
Thread.sleep(100);
} catch (InterruptedExcption e){
e.printStackTrace();
}
stock.put(productId,stockNum);
}
}
這裏有一種比較簡單的解決方案,就是synchronized關鍵字。
public synchronized void orderProductMockDiffUser(String productId)
這就是java自帶的一種鎖機制,簡單的對函數加鎖和釋放鎖。但問題是這個實在是太慢了,感興趣的可以可以寫個接口用apache ab壓測一下。
ab -n 500 -c 100 http://localhost:8080/xxxxxxx
下面就是redis分布式鎖的解決方法。首先要了解兩個redis指令
SETNX 和 GETSET,可以在redis中文網上找到詳細的介紹。
SETNX就是set if not exist的縮寫,如果不存在就返回保存value並返回1,如果存在就返回0。
GETSET其實就是兩個指令GET和SET,首先會GET到當前key的值並返回,然後在設置當前Key為要設置Value。
首先我們先新建一個RedisLock類:
@Slf4j
@Component
public class RedisService {
@Autowired
private StringRedisTemplate stringRedisTemplate;
/***
* 加鎖
* @param key
* @param value 當前時間+超時時間
* @return 鎖住返回true
*/
public boolean lock(String key,String value){
if(stringRedisTemplate.opsForValue().setIfAbsent(key,value)){//setNX 返回boolean
return true;
}
//如果鎖超時 ***
String currentValue = stringRedisTemplate.opsForValue().get(key);
if(!StringUtils.isEmpty(currentValue) && Long.parseLong(currentValue)<System.currentTimeMillis()){
//獲取上一個鎖的時間
String oldvalue = stringRedisTemplate.opsForValue().getAndSet(key,value);
if(!StringUtils.isEmpty(oldvalue)&&oldvalue.equals(currentValue)){
return true;
}
}
return false;
}
/***
* 解鎖
* @param key
* @param value
* @return
*/
public void unlock(String key,String value){
try {
String currentValue = stringRedisTemplate.opsForValue().get(key);
if(!StringUtils.isEmpty(currentValue)&¤tValue.equals(value)){
stringRedisTemplate.opsForValue().getOperations().delete(key);
}
} catch (Exception e) {
log.error("解鎖異常");
}
}
}
這個項目是springboot的項目。首先要加入redis的pom依賴,該類只有兩個功能,加鎖和解鎖,解鎖比較簡單,就是刪除當前key的鍵值對。我們主要來說一說加鎖這個功能。
首先,鎖的value值是當前時間加上過期時間的時間戳,Long類型。首先看到用setiFAbsent方法也就是對應的SETNX,在沒有線程獲得鎖的情況下可以直接拿到鎖,並返回true也就是加鎖,最後沒有獲得鎖的線程會返回false。 最重要的是中間對於鎖超時的處理,如果沒有這段代碼,當秒殺方法發生異常的時候,後續的線程都無法得到鎖,也就陷入了一個死鎖的情況。我們可以假設CurrentValue為A,並且在執行過程中拋出了異常,這時進入了兩個value為B的線程來爭奪這個鎖,也就是走到了註釋*的地方。currentValue==A,這時某一個線程執行到了getAndSet(key,value)函數(某一時刻一定只有一個線程執行這個方法,其他要等待)。這時oldvalue也就是之前的value等於A,在方法執行過後,oldvalue會被設置為當前的value也就是B。這時繼續執行,由於oldValue==currentValue所以該線程獲取到鎖。而另一個線程獲取的oldvalue是B,而currentValue是A,所以他就獲取不到鎖啦。多線程還是有些亂的,需要好好想一想。
接下來就是在業務代碼中加鎖啦:首要要@Autowired註入剛剛RedisLock類,不要忘記對這個類加一個@Component註解否則無法註入
private static final int TIMEOUT= 10*1000;
@Transactional
public void orderProductMockDiffUser(String productId){
long time = System.currentTimeMillions()+TIMEOUT;
if(!redislock.lock(productId,String.valueOf(time)){
throw new SellException(101,"換個姿勢再試試")
}
//1.查庫存
int stockNum = stock.get(productId);
if(stocknum == 0){
throw new SellException(ProductStatusEnum.STOCK_EMPTY);
//這裏拋出的異常要是運行時異常,否則無法進行數據回滾,這也是spring中比較基礎的
}else{
//2.下單
orders.put(KeyUtil.genUniqueKey(),productId);//生成隨機用戶id模擬高並發
sotckNum = stockNum-1;
try{
Thread.sleep(100);
} catch (InterruptedExcption e){
e.printStackTrace();
}
stock.put(productId,stockNum);
}
redisLock.unlock(productId,String.valueOf(time));
}
Redis分布式鎖解決搶購問題