redis系列之——分散式鎖
端午節最後一天了,三天假期過得太快了,更壞的訊息是,下週還要上六天班呢!
趁著假期的尾巴,和大家再叨逼叨redis是如何實現分散式鎖的。這期也是為了填之前《redis系列之——快取穿透、快取擊穿、快取雪崩》留下的坑。
這一期不是專門聊分散式鎖的,所以不會涉及到各種分散式鎖實現及相關的比較,只是聊一下如何使用redis實現分散式鎖。感覺上一個坑還沒填完,這裡又挖了一個大坑,各種分散式鎖實現及相關的比較後面有時間我會跪著填上的,請大家給我多一點點時間,多一點點溫柔。
本期是硬核輸出,也是面試高頻,更是實戰必會。
什麼是分散式鎖
先說一個場景,消費者在購物網站上下單或收銀員在POS機上下單,由於網路等問題,在連續點選了兩下,後端網站如何處理,如何響應?對於這個問題,前端需要處理,後端也需要處理。這裡主要說後端,後端不光要處理重複訂單問題,還有處理冪等問題。冪等問題簡單來說就是相同的請求,要有相同的響應結果,這裡就不展開了。重複訂單該如何處理?
對於一個小的訪問量不大的網站,部署了一個tomcat,這個問題可以簡單的通過JVM提供的同步鎖synchronized實現。但是當網站訪問量越來越大時,需要擴充套件機器,synchronized就不能起作用了。相同的下單引數連續兩次請求後端伺服器,可能會被分發到兩個tomcat上,就會出現synchronized失效問題。
分散式鎖要解決的就是多機器部署時,相同請求併發訪問時資源競爭問題。請求到達每個tomcat時,首先要去redis中註冊鎖,註冊成功返回true則說明獲得了鎖,可以繼續處理相關的業務,處理完成後釋放鎖。同一時刻只能有一個tomcat能獲得鎖,其他沒獲得鎖的tomcat則多次嘗試繼續獲得鎖,沒有獲得鎖不能處理業務。獲得鎖的tomcat釋放鎖後,其他的tomcat才能有一個獲得鎖。
這裡是使用redis做外部儲存介質儲存鎖的,使用zookeeper也是類似的。萬變不離其宗,原理都一樣,只是技術選型有差別。
redis實現
廢話就不說了,直接上程式碼。
1.pom.xml
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.1.RELEASE</version>
</parent>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
這裡需要注意,我使用的是spring-boot。redis的相關jar使用的是spring-boot-starter-data-redis。
注意版本是2以上,版本1和版本2在redis鎖的實現上有個關鍵的差異,後面會說到。
2.application.yml
server:
port: 8080
spring:
application:
name: java-summary
redis:
database: 10
password: 123456
timeout: 20000
host: 127.0.0.1
port: 6379
pool:
max-idle: 20
min-idle: 20
max-wait: 10000
max-active: 5000
3.RedisConfiguration.java
package com.wuxiaolong.redis;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
/**
* Description:
*
* @author 諸葛小猿
* @date 2020-06-27
*
* 去除redis序列化時,key-value的亂碼問題
*/
@Configuration
public class RedisConfiguration {
@Bean
@Primary
public RedisTemplate setRedisTemplate(RedisTemplate redisTemplate) {
RedisSerializer stringSerializer = new StringRedisSerializer();
redisTemplate.setKeySerializer(stringSerializer);
redisTemplate.setValueSerializer(stringSerializer);
redisTemplate.setHashKeySerializer(stringSerializer);
redisTemplate.setHashValueSerializer(stringSerializer);
return redisTemplate;
}
}
這個配置類就是為了解決redis的key和value在序列化時的亂碼問題,將序列化的方式設定為string格式
4.RedisLock.java
package com.wuxiaolong.redis;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
/**
* Description:
*
* @author 諸葛小猿
* @date 2020-06-27
*/
@Component
@Slf4j
public class RedisLock {
@Autowired
private RedisTemplate redisTemplate;
/**
* 新增元素
*
* @param key
* @param value
*/
public void set(String key, String value) {
if (key == null || value == null) {
return;
}
redisTemplate.opsForValue().set(key, value);
}
/**
* 如果已經存在返回false,否則返回true
*
* @param key
* @param value
* @return
*/
public Boolean setNx(String key, String value, Long expireTime, TimeUnit mimeUnit) {
if (key == null || value == null) {
return false;
}
//spiring boot 1.5 版本setIfAbsent只能設定key-value,需要單獨對key設定過期時間,因為是兩步操作,所以不是原子性
//Boolean tf = redisTemplate.opsForValue().setIfAbsent(key, value);
//redisTemplate.expire(key, expireTime, mimeUnit);
// 在spiring boot 2 可以直接使用 redisTemplate的setIfAbsent設定key-value和過期時間,是原子性
Boolean tf =redisTemplate.opsForValue().setIfAbsent(key,value,expireTime, mimeUnit);
return tf;
}
/**
* 獲取資料
*
* @param key
* @return
*/
public Object get(String key) {
if (key == null) {
return null;
}
return redisTemplate.opsForValue().get(key);
}
/**
* 刪除
*
* @param key
* @return
*/
public void remove(Object key) {
if (key == null) {
return ;
}
redisTemplate.delete(key);
}
/**
* 加鎖
*
* @param key
* @param waitTime 等待時間,在這個時間內會多次嘗試獲取鎖,超過這個時間還沒獲得鎖,就返回false
* @param interval 間隔時間,每隔多長時間嘗試一次獲的鎖
* @param expireTime key的過期時間
*/
public Boolean lock(String key, Long waitTime,Long interval, Long expireTime) {
String value = UUID.randomUUID().toString().replaceAll("-", "").toLowerCase();
Boolean flag = setNx(key, value, expireTime, TimeUnit.MILLISECONDS);
// 嘗試獲取鎖 成功返回
if (flag) {
return flag;
} else {
// 獲取失敗
// 現在時間
long newTime = System.currentTimeMillis();
// 等待過期時間
long loseTime = newTime + waitTime;
// 不斷嘗試獲取鎖成功返回
while (System.currentTimeMillis() < loseTime) {
Boolean testFlag = setNx(key, value, expireTime, TimeUnit.MILLISECONDS);
if (testFlag) {
return testFlag;
}
try {
Thread.sleep(interval);
} catch (InterruptedException e) {
log.error("獲取鎖異常",e);
}
}
}
return false;
}
/**
* 釋放鎖
*
* @param key
* @return
*/
public void unLock(String key) {
remove(key);
}
public Boolean setIfAbsent(String key, String value){
Boolean tf = redisTemplate.opsForValue().setIfAbsent(key, value);
redisTemplate.expire(key, 60, TimeUnit.DAYS);
return tf;
}
}
這個就是redis鎖的核心實現。上面提到的spring-boot版本問題就是這裡的setNx方法中的redisTemplate的setIfAbsent的差異。
setIfAbsent的作用是,在儲存key和value時,如果key不存在則儲存後返回true,說明獲得鎖;如果key存在則儲存後返回false,說明這個鎖正在使用中,不能獲取。
在獲取鎖時,有兩步操作,首先是要儲存這個key-value,然後需要對其設定一個過期時間,防止出現死鎖。在spring-boot的版本為1時,只提供了setIfAbsent(key, value)的API,設定成功後需要呼叫redisTemplate.expire(key, expireTime, mimeUnit)對這個key設定過期時間,這是兩步是非原子性的操作,如果第一步執行成功,第二步執行失敗,就可能出現死鎖,雖然概率很低。在spring-boot的版本為2時,提供了一個原子性的API setIfAbsent(key,value,expireTime, mimeUnit),在儲存key-value的同時設定了過期時間,推薦使用。
4.RedisLockTest.java
package com.wuxiaolong.redis;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import java.util.Map;
/**
* Description:
*
* @author 諸葛小猿
* @date 2020-06-27
*/
@RestController
@Slf4j
public class RedisLockTest {
@Autowired
private RedisLock redisLock;
public static final String ORDER_LOCK_PREFIX = "order:lock:";
@RequestMapping(value = "save/order",method = RequestMethod.POST)
public String saveOrder(@RequestBody Map<String,String> order){
// 獲取商戶號、門店號、收銀機號、收銀單號,組成全域性唯一的訂單號
String orderSn = order.get("merchantSn")+order.get("storeSn")+order.get("workstationSn")+order.get("checkSn");
// 使用全域性唯一的訂單號做分散式鎖的key
String lockKey = ORDER_LOCK_PREFIX + orderSn;
try{
// 加鎖 每隔100毫秒獲取一次鎖;1分鐘內拿不到鎖就返回false;鎖的過期時間5分鐘(防止死鎖)
Boolean tf = redisLock.lock(lockKey,1L * 60 * 1000,100L,5L * 60 * 1000);
if(tf){
// todo 處理拿到鎖時的業務
return "success";
}else {
// todo 處理沒有拿到鎖時的業務
return "fail";
}
}catch (Exception e){
log.error("業務異常",e);
}finally {
// 一定要在finally中釋放鎖
redisLock.unLock(lockKey);
}
return "success";
}
}
這是測試類的例項。首先需要獲得全域性唯一的訂單號,然後使用這個訂單號為key。在try塊的最前面先去redis中獲得鎖,根據是否獲得鎖做不同的操作。需要根據自己的業務定義waitTime,interval, expireTime這三個引數。一定要在最後的finally中釋放鎖。
打卡下班。
關注公眾號,輸入關鍵字"java-summary",可以獲得原始碼。
【傳播知識,共享價值】,感謝小夥伴們的關注和支援,我是【諸葛小猿】,一個彷徨中奮鬥的網際網路民工。留下的坑,跪著我也會填完的!!!
推薦:萬年曆外掛