【redis】基於redis實現分布式並發鎖
基於redis實現分布式並發鎖(註解實現)
說明
前提, 應用服務是分布式或多服務, 而這些"多"有共同的"redis";
GitHub: https://github.com/vergilyn/SpringBootDemo
代碼結構:
一、分布式並發鎖的幾種可行方案
(具體實現思路參考: 分布式鎖的實現、如何用消息系統避免分布式事務?)
1、基於數據庫
可以用數據庫的行鎖for update
, 或專門新建一張鎖控制表
來實現.
過於依賴數據庫, 且健壯性也不是特別好, 完全可以把此種方案舍棄.
(話說都涉及到分布式或多服務器,基本主要還是用redis、memcached或其他緩存服務實現並發鎖)
並未去研究, 參考上面的博客鏈接.
3、基於redis實現
redis實現的復雜度不算高, 只是需要註意一些實現細節. 健壯性貌似只比zookeeper差點, 但完全可接受.
(ps: 感覺redis官方jar中完全可以提供實現, 我們直接用就可以, 是我沒找到嗎?)
二、redis實現分布式並發鎖
2.1 實現思路
1、主要的redis核心命令: 利用redis是單線程的特性, 用setnx、getset、time來實現.
2、思路: redis的key-value就代表一個對象鎖, 當此key存在說明鎖已被獲取, 其余相同對象操作則需要等待獲取鎖.
1) 鎖的釋放, 要特別避免死鎖出現, 主要是特殊情況下如何釋放鎖.
2) 等待獲取鎖的線程, 最好有超時機制.
3) 註意多服務器之間的時間是否同步.
4) 註意獲取鎖操作別占用或創建太多的連接(即使及時關閉了連接), 很影響系統的性能.
2.2 redis並發鎖的2種策略說明
2.2.1 key代表鎖對象, value無意義
/**
* 鎖的策略參考: <a href="http://blog.csdn.net/u010359884/article/details/50310387">基於redis分布式鎖實現“秒殺”</a>
* FIXME 此方式加鎖策略存在一定缺陷: 在setIfAbsent()之後expire()執行之前程序異常 鎖不會被釋放. 雖然出現幾率極低
*
* @param timeout timeout的時間範圍內輪詢鎖, 單位: 秒
* @param expire 設置鎖超時時間
* @return true, 獲取鎖成功; false, 獲取鎖失敗.
*/
public boolean lock(long timeout, long expire, final TimeUnit unit) {
long beginTime = System.nanoTime(); // 用nanos、mills具體看需求.
timeout = TimeUnit.SECONDS.toNanos(timeout);
try {
// 在timeout的時間範圍內不斷輪詢鎖
while (System.nanoTime() - beginTime < timeout) {
// 鎖不存在的話,設置鎖並設置鎖過期時間,即加鎖
if (this.redisClient.opsForValue().setIfAbsent(this.key, "1")) {
this.redisClient.expire(key, expire, unit);//設置鎖失效時間, 防止永久阻塞
this.lock = true;
return true;
}
// 短暫休眠後輪詢,避免可能的活鎖
System.out.println("get lock waiting...");
Thread.sleep(30, RANDOM.nextInt(30));
}
} catch (Exception e) {
throw new RuntimeException("locking error", e);
}
return false;
}
以上鎖策略已經很完美, 1) 指定了獲取鎖的超時時間; 2) 設置了鎖的失效, 防止永久阻塞;
但可能有極端情況, 即setIfAbsent()
成功, expire()
執行之前, 如果出現異常情況, 導致expire()
沒有執行, 所以此時會出現永久阻塞. (道理是很難遇到這情況)
2.2.2 key代表鎖對象, value表示鎖超時時間
/**
* 特別註意: 如果多服務器之間存在時間差, 並不建議用System.nanoTime()、System.currentTimeMillis().
* 更好的是統一用redis-server的時間, 但只能獲取到milliseconds.
* 鎖的策略參考: <a href="http://www.jeffkit.info/2011/07/1000/?spm=5176.100239.blogcont60663.7.9f4d4a8h4IOxe">用Redis實現分布式鎖</a>
*
* @param timeout 獲取鎖超時, 單位: 毫秒
* @param expire 鎖失效時常, 單位: 毫秒
* @return true, 獲取鎖成功; false, 獲取鎖失敗.
*/
public boolean lockB(long timeout, long expire) {
long bt = System.currentTimeMillis();
long lockVal;
String lockExpireTime;
try {
while (!this.lock) {
if(System.currentTimeMillis() - bt > timeout){
throw new RedisLockException("get lock timeout!");
}
// 鎖的鍵值: {當前時間} + {失效時常} = {鎖失效時間}
lockVal = getRedisTime() + expire;
// 1. 嘗試獲取鎖
boolean ifAbsent = this.redisClient.opsForValue().setIfAbsent(this.key, lockVal + "");
if (ifAbsent) { // 設置成功, 表示獲得鎖
// 這種策略下, 是否設置key失效不太重要. 因為, 正常流程中最後會釋放鎖(del-key); 如果是異常情況下未釋放鎖, 後面的代碼也會判斷鎖是否失效.
// 設置的好處: 能減少redis的內存消耗, 及時清理無效的key(暫時只想到這)
// this.redisClient.expire(key, timeout, TimeUnit.SECONDS);
this.lock = true;
return true;
}
lockExpireTime = this.redisClient.opsForValue().get(this.key);
long curTime = getRedisTime();
// curTime > expireVal: 表示此鎖已無效
/* 在鎖無效的前提下, 嘗試獲取鎖: (一定要用)getAndSet()
*
* 假設鎖已失效, 且未正常expire. 此時C1、C2同時執行到此, C2先執行getAndSet(key, time-02), C2獲取到鎖
* 此時C1.getAndSet(key, time-01)返回的是time-02, 顯然curTime > time-02: false.
* 所以, C1並未獲取到鎖. 但C1修改了key的值為: time-01.
* 但因為C1、C2是同時執行到此, 所以time-01、time-02的值近視相等.
* (若多服務器存在時間差, 那這個差值有問題, 所以服務器時間如果不同步則不能用System.nanoTime()、System.currentTimeMillis(), 該用redis-server time.)
*/
if (curTime > NumberUtils.toLong(lockExpireTime, 0)) {
// getset必須在{curTime > expireVal} 判斷之後; 否則, 可能出現死循環
lockExpireTime = this.redisClient.opsForValue().getAndSet(this.key, lockVal + "");
if (curTime > NumberUtils.toLong(lockExpireTime, 0)) {
// this.redisClient.expire(key, timeout, TimeUnit.SECONDS); // 是否設置失效不重要, 理由同上.
this.lock = true;
return true;
}
}
// 鎖被占用, 短暫休眠等待輪詢
System.out.println(this + ": get lock waiting...");
Thread.sleep(40);
}
} catch (Exception e) {
e.printStackTrace();
throw new RedisLockException("locking error", e);
}
System.out.println(this + ": get lock error.");
return false;
}
此種鎖策略特別要註意:
1) 如果多服務器之間時間不同步, 那麽可以用redis-server的時間.
2) getset的調用必須在curTime > lockExpireTime
的前提下, 否則會出現死循環.
3) 並發時getset產生的誤差, 完全可忽略.
4) 特別要註意redis連接的釋放, 否則很容易占用過多的redis連接數.
三、完整實現代碼 (只是簡單實現, 性能有問題)
1. 核心redis鎖策略
public class RedisLock {
private String key;
private boolean lock = false;
private final StringRedisTemplate redisClient;
private final RedisConnection redisConnection;
/**
* @param purpose 鎖前綴
* @param key 鎖定的ID等東西
*/
public RedisLock(String purpose, String key, StringRedisTemplate redisClient) {
if (redisClient == null) {
throw new IllegalArgumentException("redisClient 不能為null!");
}
this.key = purpose + "_" + key + "_redis_lock";
this.redisClient = redisClient;
this.redisConnection = redisClient.getConnectionFactory().getConnection();
}
/**
* 鎖的策略參考: <a href="http://blog.csdn.net/u010359884/article/details/50310387">基於redis分布式鎖實現“秒殺”</a>
* FIXME 此方式加鎖策略存在一定缺陷: 在setIfAbsent()之後expire()執行之前程序異常 鎖不會被釋放. 雖然出現幾率極低
*
* @param timeout timeout的時間範圍內輪詢鎖, 單位: 秒
* @param expire 設置鎖超時時間
* @return true, 獲取鎖成功; false, 獲取鎖失敗.
*/
public boolean lockA(long timeout, long expire, final TimeUnit unit) {
long beginTime = System.nanoTime(); // 用nanos、mills具體看需求.
timeout = unit.toNanos(timeout);
try {
// 在timeout的時間範圍內不斷輪詢鎖
while (System.nanoTime() - beginTime < timeout) {
// 鎖不存在的話,設置鎖並設置鎖過期時間,即加鎖
if (this.redisClient.opsForValue().setIfAbsent(this.key, "1")) {
this.redisClient.expire(key, expire, unit);//設置鎖失效時間, 防止永久阻塞
this.lock = true;
return true;
}
// 短暫休眠後輪詢,避免可能的活鎖
System.out.println("get lock waiting...");
Thread.sleep(30);
}
} catch (Exception e) {
throw new RedisLockException("locking error", e);
}
return false;
}
/**
* 特別註意: 如果多服務器之間存在時間差, 並不建議用System.nanoTime()、System.currentTimeMillis().
* 更好的是統一用redis-server的時間, 但只能獲取到milliseconds.
* 鎖的策略參考: <a href="http://www.jeffkit.info/2011/07/1000/?spm=5176.100239.blogcont60663.7.9f4d4a8h4IOxe">用Redis實現分布式鎖</a>
*
* @param timeout 獲取鎖超時, 單位: 毫秒
* @param expire 鎖失效時常, 單位: 毫秒
* @return true, 獲取鎖成功; false, 獲取鎖失敗.
*/
public boolean lockB(long timeout, long expire) {
long bt = System.currentTimeMillis();
long lockVal;
String lockExpireTime;
try {
while (!this.lock) {
if(System.currentTimeMillis() - bt > timeout){
throw new RedisLockException("get lock timeout!");
}
// 鎖的鍵值: {當前時間} + {失效時常} = {鎖失效時間}
lockVal = getRedisTime() + expire;
// 1. 嘗試獲取鎖
boolean ifAbsent = this.redisClient.opsForValue().setIfAbsent(this.key, lockVal + "");
if (ifAbsent) { // 設置成功, 表示獲得鎖
this.lock = true;
return true;
}
lockExpireTime = this.redisClient.opsForValue().get(this.key);
long curTime = getRedisTime();
if (curTime > NumberUtils.toLong(lockExpireTime, 0)) {
lockExpireTime = this.redisClient.opsForValue().getAndSet(this.key, lockVal + "");
if (curTime > NumberUtils.toLong(lockExpireTime, 0)) {
this.lock = true;
return true;
}
}
// 鎖被占用, 短暫休眠等待輪詢
System.out.println(this + ": get lock waiting...");
Thread.sleep(40);
}
} catch (Exception e) {
e.printStackTrace();
throw new RedisLockException("locking error", e);
}
System.out.println(this + ": get lock error.");
return false;
}
/**
* @return current redis-server time in milliseconds.
*/
private long getRedisTime() {
return this.redisConnection.time();
}
private void closeConnection(){
if(!this.redisConnection.isClosed()){
this.redisConnection.close();
}
}
/** 釋放鎖 */
public void unlock() {
if (this.lock) {
redisClient.delete(key);
}
}
public boolean isLock() {
return lock;
}
}
2. 註解部分
@Target(ElementType.PARAMETER)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RedisLockedKey {
/**
* 復雜對象中需要加鎖的成員變量
*/
String field() default "";
}
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RedisDistributedLock {
/** 鎖key的前綴 */
String lockedPrefix() default "";
/** 輪詢鎖的時間超時時常, 單位: ms */
long timeout() default 2000;
/** redis-key失效時常, 單位: ms */
int expireTime() default 1000;
}
@Component
@Aspect
public class RedisDistributedLockAop {
@Autowired
private StringRedisTemplate redisTemplate;
/**
* 定義緩存邏輯
*/
@Around("@annotation(com.vergilyn.demo.springboot.distributed.lock.annotation.RedisDistributedLock)")
public void cache(ProceedingJoinPoint pjp) {
Method method = getMethod(pjp);
RedisDistributedLock cacheLock = method.getAnnotation(RedisDistributedLock.class);
String key = getRedisKey(method.getParameterAnnotations(), pjp.getArgs());
RedisLock redisLock = new RedisLock(cacheLock.lockedPrefix(), key, redisTemplate);
// boolean isLock = redisLock.lockB(cacheLock.timeout(), cacheLock.expireTime());
boolean isLock = redisLock.lockA(cacheLock.timeout(), cacheLock.expireTime(), TimeUnit.MILLISECONDS);
if (isLock) {
try {
pjp.proceed();
return;
} catch (Throwable e) {
e.printStackTrace();
} finally {
redisLock.unlock();
}
}
System.out.println("執行方法失敗");
}
/**
* 獲取被攔截的方法對象
*/
private Method getMethod(ProceedingJoinPoint pjp) {
Object[] args = pjp.getArgs();
Class[] argTypes = new Class[pjp.getArgs().length];
for (int i = 0; i < args.length; i++) {
argTypes[i] = args[i].getClass();
}
Method method = null;
try {
method = pjp.getTarget().getClass().getMethod(pjp.getSignature().getName(), argTypes);
} catch (NoSuchMethodException | SecurityException e) {
e.printStackTrace();
}
return method;
}
private String getRedisKey(Annotation[][] annotations, Object[] args){
if (null == args || args.length == 0) {
throw new RedisLockException("方法參數為空,沒有被鎖定的對象");
}
if (null == annotations || annotations.length == 0) {
throw new RedisLockException("沒有被註解的參數");
}
// 只支持第一個註解為RedisLockedKey的參數
for (int i = 0; i < annotations.length; i++) {
for (int j = 0; j < annotations[i].length; j++) {
if (annotations[i][j] instanceof RedisLockedKey) { //註解為LockedComplexObject
RedisLockedKey redisLockedKey = (RedisLockedKey) annotations[i][j];
String field = redisLockedKey.field();
try {
// field存在, 表示取參數對象的相應field;
if(StringUtils.isBlank(field)){
return args[i].toString();
}else {
return args[i].getClass().getDeclaredField(redisLockedKey.field()).toString();
}
} catch (NoSuchFieldException | SecurityException e) {
e.printStackTrace();
throw new RedisLockException("註解對象中不存在屬性: " + redisLockedKey.field());
}
}
}
}
throw new RedisLockException("未找到註解對象!");
}
}
public class RedisLockException extends RuntimeException{
public RedisLockException(String msg, Throwable throwable) {
super(msg, throwable);
}
public RedisLockException(String msg) {
super(msg);
}
}
3.測試部分
#### 視情況調整
# 部分redis配置
spring.redis.database=0
spring.redis.host=127.0.0.1
# spring.redis.password=
spring.redis.port=6379
# 連接池最大連接數(使用負值表示沒有限制)
spring.redis.pool.max-active=1
spring.redis.pool.max-wait=-1
# 連接池中的最大空閑連接
spring.redis.pool.max-idle=4
# 連接池中的最小空閑連接
spring.redis.pool.min-idle=0
spring.redis.timeout=2000
@SpringBootApplication
@EnableCaching
public class DistributedLockApplication implements CommandLineRunner{
@Autowired
StringRedisTemplate redisTemplate;
@Autowired
LockService lockService;
@Autowired
ThreadPoolTaskExecutor executor;
@Bean
public ThreadPoolTaskExecutor myExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 核心線程數
executor.setCorePoolSize(8);
// 最大線程數
executor.setMaxPoolSize(12);
// 運行線程滿時,等待隊列的大小
executor.setQueueCapacity(1000);
executor.setThreadNamePrefix("vl-thread-");
// 池和隊列滿的策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 空閑線程清除時間
executor.setKeepAliveSeconds(60);
// 是否允許釋放核心線程
executor.setAllowCoreThreadTimeOut(true);
executor.initialize();
return executor;
}
public static void main(String[] args) {
SpringApplication application = new SpringApplication(DistributedLockApplication.class);
application.setAdditionalProfiles("redis");
application.run(args);
}
@Override
public void run(String... args) throws Exception {
System.out.println("run....");
for (int i = 0; i < 2; i++) {
executor.execute(new Runnable() {
@Override
public void run() {
// lockService.lockMethod(new LockBean(1L));
lockService.lockMethod("arg1", 1L);
}
});
}
System.out.println(executor.getThreadPoolExecutor().getTaskCount());
}
}
public interface LockService {
public void lockMethod(String arg1,Long arg2);
public void lockMethod(LockBean lockBean);
}
@Service
public class LockServiceImpl implements LockService {
public static Map<Long, Integer> goods;
static{
goods = new HashMap<>();
goods.put(1L, 100);
goods.put(2L, 200);
}
@Override
@RedisDistributedLock(lockedPrefix="TEST_PREFIX")
public void lockMethod(String arg1, @RedisLockedKey Long arg2) {
//最簡單的秒殺,這裏僅作為demo示例
System.out.println("lockMethod, goods: " + reduceInventory(arg2));
}
@Override
@RedisDistributedLock(lockedPrefix="TEST_PREFIX")
public void lockMethod(@RedisLockedKey(field = "idic")LockBean lockBean) {
System.out.println("lockMethod bean, goods: " + reduceInventory(lockBean.getIdic()));
}
// 模擬秒殺操作,姑且認為一個秒殺就是將庫存減一
private Integer reduceInventory(Long commodityId){
goods.put(commodityId, goods.get(commodityId) - 1);
return goods.get(commodityId);
}
}
public class LockBean {
private Long idic;
public LockBean(){}
public LockBean(long idic) {
this.idic = idic;
}
public Long getIdic() {
return idic;
}
public void setIdic(Long idic) {
this.idic = idic;
}
}
以上只是簡單實現代碼, 如果用於實際項目中, 以上代碼存在很多性能問題, 具體性能問題:
1) 太頻繁的獲取redis連接、關閉連接.
lockA
: 每次while必定有一次setIfAbsent
, 可能會有expire
, 然後釋放鎖有delete
. 所以一次正常的流程就需要3個連接. 如果是並發同時競爭等待獲取鎖, 那麽性能影響也蠻大的.
lockB
: 這種策略要用到的連接更多, 並且如果是this.redisClient.getConnectionFactory().getConnection().time()
還要註意要手動釋放這個連接.
針對此問題, (個人)想到的可能的代碼改進方案, 每個RedisLock
中用一個redisConnection
, 把所有的StringRedisTemplate
命令換成更底層的redisConnection
命令:
public class RedisLock {
private String key;
private boolean lock = false;
private final RedisConnection redisConnection;
public RedisLock(String purpose, String key, RedisConnection redisConnection) {
if (redisConnection == null) {
throw new IllegalArgumentException("redisConnection 不能為null!");
}
this.key = purpose + "_" + key + "_redis_lock";
this.redisConnection = redisConnection;
}
public boolean lockAc(long timeout, long expire, final TimeUnit unit) {
long beginTime = System.nanoTime();
timeout = unit.toNanos(timeout);
try {
while (System.nanoTime() - beginTime < timeout) {
if (this.redisConnection.setNX(this.key.getBytes(), "1".getBytes())) {
this.redisConnection.expire(key.getBytes(), unit.toSeconds(expire));
this.lock = true;
return true;
}
System.out.println("lockAc get lock waiting...");
Thread.sleep(30);
}
} catch (Exception e) {
throw new RedisLockException("locking error", e);
}
return false;
}
private long getRedisTime() {
return this.redisConnection.time();
}
private void closeConnection(){
if(!this.redisConnection.isClosed()){
this.redisConnection.close();
}
}
public void unlock() {
if (this.lock) {
this.redisConnection.delete(key);
}
closeConnection(); // 用完一定要關閉, 這個位置不一定好, 可能在Aop調用unlock的finally處更好
}
public boolean isLock() {
return lock;
}
}
以上改進代碼依然可能存在的問題:
1) 連接很可能沒有正常關閉.
2) 連接依然過多, 假設並發有1000個, 那一樣會產生1000個連接, 且這些連接只會在競爭獲取鎖完後才會釋放.(且產生了1000個RedisLock對象)
3) 是否可以緩存註解對象?
針對問題2)
, 主要想達到怎麽盡可能減少redis連接?
比如: 有1000個並發, 其中200個是兌換商品A, 其中300個是兌換商品B, 其中500個是兌換商品C.
1、是否可以用單例模式
來實現RedisLock
?
對單例
、多線程
還是很混亂, 不好說. 但如果可行, 會否太影響獲取鎖的性能?
比如兌換商品A的200個並發共用一個redisConnection, 感覺還是合理的, 畢竟互相之間是競爭關系.
但商品A、商品B、商品C如果也共用一個redisConnection, 是不是完全不合理?
他們之間根本是"並行"的, 相互之間沒有一點聯系.
2、所以, 是否更進一步的實現是: 同一個鎖競爭用相同的RedisLock
對象和RedisConnection
連接.
即競爭商品A的200個並發用同一個"redisConnection_A"、"redisLock_A", 商品B的300個並發用同一個"redisConnection_B"、"redisLock_B"?
針對問題3)
, 在代碼RedisDistributedLockAop
中, 每次都會:
1) getMethod(pjp)
: 獲取攔截方法.
2) 通過攔截方法解析出getRedisKey
.
是不是可以這麽實現, 相同的攔截方法只有第一次需要通過反射獲取. 之後直接從緩存(如map)中獲取到method
, 且因為同一個方法, 所能取field
也是一樣的.
比如, 有一下幾個方法都需要用到分布式並發鎖:
@RedisDistributedLock(lockedPrefix="TEST_PREFIX")
public void a(String arg1, @RedisLockedKey Long arg2) {
// ...
}
@RedisDistributedLock(lockedPrefix="TEST_PREFIX")
public void b(@RedisLockedKey(field = "idic")LockBean lockBean) {
// ...
}
@RedisDistributedLock(lockedPrefix="TEST_PREFIX")
public void c(@RedisLockedKey(field = "xx")LockBean lockBean) {
// ...
}
// key: 完整方法名, 要唯一正確找到; value: 緩存的method
Map<String, Method> methodCache = new HashMap<>;
methodCache.put("com.service.aa.a()", method);
methodCache.put("com.service.aa.b()", method);
methodCache.put("com.service.bb.b()", method);
// 然後, 同一個方法的註解內容相同, 所以完全可以直接調用, 省略RedisLockedKey的邏輯判斷
if(StringUtils.isBlank(field)){
return args[i].toString();
}else {
return args[i].getClass().getDeclaredField(redisLockedKey.field()).toString();
}
以上只是自己的構想, 這些構想的可行性, 代碼的具體實現還很難說...
但現在覺得, 基於redis的分布式並發鎖完全可以有位大神寫個開源的實現啊, 我們這些小白直接拿到系統中用就好了. 我也不知道怎麽去找, 不知道github有沒有.
【redis】基於redis實現分布式並發鎖