使用Redisson實現分布式鎖,Spring AOP簡化之
源碼
Redisson概述
Redisson是一個在Redis的基礎上實現的Java駐內存數據網格(In-Memory Data Grid)。它不僅提供了一系列的分布式的Java常用對象,還提供了許多分布式服務。其中包括(BitSet, Set, Multimap, SortedSet, Map, List, Queue, BlockingQueue, Deque, BlockingDeque, Semaphore, Lock, AtomicLong, CountDownLatch, Publish / Subscribe, Bloom filter, Remote service, Spring cache, Executor service, Live Object service, Scheduler service) Redisson提供了使用Redis的最簡單和最便捷的方法。Redisson的宗旨是促進使用者對Redis的關註分離(Separation of Concern),從而讓使用者能夠將精力更集中地放在處理業務邏輯上。
Redisson底層采用的是Netty 框架。支持Redis 2.8以上版本,支持Java1.6+以上版本。
關於Redisson更多詳細介紹,可參考Redssion概述
Redisson提供的分布式鎖
可重入鎖
Redisson的分布式可重入鎖RLock Java對象實現了java.util.concurrent.locks.Lock接口,同時還支持自動過期解鎖。下面是RLock的基本使用方法:
RLock lock = redisson.getLock("anyLock"); // 最常見的使用方法 lock.lock(); // 支持過期解鎖功能 // 10秒鐘以後自動解鎖 // 無需調用unlock方法手動解鎖lock.lock(10, TimeUnit.SECONDS); // 嘗試加鎖,最多等待100秒,上鎖以後10秒自動解鎖 boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS); ... lock.unlock();
Redisson同時還為分布式鎖提供了異步執行的相關方法:
RLock lock = redisson.getLock("anyLock"); lock.lockAsync(); lock.lockAsync(10, TimeUnit.SECONDS); Future<Boolean> res = lock.tryLockAsync(100, 10, TimeUnit.SECONDS);
公平鎖
Redisson分布式可重入公平鎖也是實現了java.util.concurrent.locks.Lock接口的一種RLock對象。在提供了自動過期解鎖功能的同時,保證了當多個Redisson客戶端線程同時請求加鎖時,優先分配給先發出請求的線程。
RLock fairLock = redisson.getFairLock("anyLock"); // 最常見的使用方法 fairLock.lock(); // 支持過期解鎖功能 // 10秒鐘以後自動解鎖 // 無需調用unlock方法手動解鎖 fairLock.lock(10, TimeUnit.SECONDS); // 嘗試加鎖,最多等待100秒,上鎖以後10秒自動解鎖 boolean res = fairLock.tryLock(100, 10, TimeUnit.SECONDS); ... fairLock.unlock();
其他鎖
Redisson還提供了其他機制的鎖,如聯鎖(MultiLock)、紅鎖(RedLock)等。詳細可參考:分布式鎖和同步器
使用Redisson實現分布式鎖
- 定義回調接口
/** * 分布式鎖回調接口 */ public interface DistributedLockCallback<T> { /** * 調用者必須在此方法中實現需要加分布式鎖的業務邏輯 * * @return */ public T process(); /** * 得到分布式鎖名稱 * * @return */ public String getLockName(); }
- 定義分布式鎖模板
/** * 分布式鎖操作模板 */ public interface DistributedLockTemplate { long DEFAULT_WAIT_TIME = 30; long DEFAULT_TIMEOUT = 5; TimeUnit DEFAULT_TIME_UNIT = TimeUnit.SECONDS; /** * 使用分布式鎖,使用鎖默認超時時間。 * @param callback * @param fairLock 是否使用公平鎖 * @return */ <T> T lock(DistributedLockCallback<T> callback, boolean fairLock); /** * 使用分布式鎖。自定義鎖的超時時間 * * @param callback * @param leaseTime 鎖超時時間。超時後自動釋放鎖。 * @param timeUnit * @param fairLock 是否使用公平鎖 * @param <T> * @return */ <T> T lock(DistributedLockCallback<T> callback, long leaseTime, TimeUnit timeUnit, boolean fairLock); /** * 嘗試分布式鎖,使用鎖默認等待時間、超時時間。 * @param callback * @param fairLock 是否使用公平鎖 * @param <T> * @return */ <T> T tryLock(DistributedLockCallback<T> callback, boolean fairLock); /** * 嘗試分布式鎖,自定義等待時間、超時時間。 * @param callback * @param waitTime 獲取鎖最長等待時間 * @param leaseTime 鎖超時時間。超時後自動釋放鎖。 * @param timeUnit * @param fairLock 是否使用公平鎖 * @param <T> * @return */ <T> T tryLock(DistributedLockCallback<T> callback, long waitTime, long leaseTime, TimeUnit timeUnit, boolean fairLock); }
- 實現分布式鎖模板
public class SingleDistributedLockTemplate implements DistributedLockTemplate { private RedissonClient redisson; public SingleDistributedLockTemplate() { } public SingleDistributedLockTemplate(RedissonClient redisson) { this.redisson = redisson; } @Override public <T> T lock(DistributedLockCallback<T> callback, boolean fairLock) { return lock(callback, DEFAULT_TIMEOUT, DEFAULT_TIME_UNIT, fairLock); } @Override public <T> T lock(DistributedLockCallback<T> callback, long leaseTime, TimeUnit timeUnit, boolean fairLock) { RLock lock = getLock(redisson, callback.getLockName(), fairLock); try { lock.lock(leaseTime, timeUnit); return callback.process(); } finally { if (lock != null) { lock.unlock(); } } } @Override public <T> T tryLock(DistributedLockCallback<T> callback, boolean fairLock) { return tryLock(callback, DEFAULT_WAIT_TIME, DEFAULT_TIMEOUT, DEFAULT_TIME_UNIT, fairLock); } @Override public <T> T tryLock(DistributedLockCallback<T> callback, long waitTime, long leaseTime, TimeUnit timeUnit, boolean fairLock) { RLock lock = getLock(redisson, callback.getLockName(), fairLock); try { if (lock.tryLock(waitTime, leaseTime, timeUnit)) { return callback.process(); } } catch (InterruptedException e) { } finally { if (lock != null) { lock.unlock(); } } return null; } private RLock getLock(RedissonClient redisson, String lockName, boolean fairLock) { RLock lock; if (fairLock) { lock = redisson.getFairLock(lockName); } else { lock = redisson.getLock(lockName); } return lock; } public void setRedisson(RedissonClient redisson) { this.redisson = redisson; } }
- 使用SingleDistributedLockTemplate
DistributedLockTemplate lockTemplate = ...; final String lockName = ...; lockTemplate.lock(new DistributedLockCallback<Object>() { @Override public Object process() { //do some business return null; } @Override public String getLockName() { return lockName; } }, false);
但是每次使用分布式鎖都要寫類似上面的重復代碼,有沒有什麽方法可以只關註核心業務邏輯代碼的編寫,即上面的"do some business"。下面介紹如何使用Spring AOP來實現這一目標。
使用Spring AOP簡化分布式鎖
- 定義註解@DistributedLock
@Target({ElementType.METHOD}) @Retention(RetentionPolicy.RUNTIME) @Documented public @interface DistributedLock { String lockName() default ""; String lockNamePre() default ""; //lockName後綴 String lockNamePost() default ""; //lockName後綴 String param() default ""; //獲取註解的方法第一個參數對象的某個屬性值來作為lockName。因為有時候lockName是不固定的。 boolean fairLock() default false; //是否使用公平鎖。 boolean tryLock() default false; //是否使用嘗試鎖。 long waitTime() default 30L; long leaseTime() default 5L; TimeUnit timeUnit() default TimeUnit.SECONDS; }
- 定義切面織入的代碼
@Aspect @Component public class DistributedLockAspect { @Autowired private DistributedLockTemplate lockTemplate; @Pointcut("@annotation(cn.sprinkle.study.distributedlock.common.annotation.DistributedLock)") public void DistributedLockAspect() {} @Around(value = "DistributedLockAspect()") public Object doAround(ProceedingJoinPoint pjp) throws Throwable { //切點所在的類名 String targetName = pjp.getTarget().getClass().getName(); //使用了註解的方法 String methodName = pjp.getSignature().getName(); Class targetClass = Class.forName(targetName); Method[] methods = targetClass.getMethods(); Object[] arguments = pjp.getArgs(); //得到使用註解的方法。可使用Method.getAnnotation(Class<T> annotationClass)獲取指定的註解,然後可獲得註解的屬性 Optional<Method> optional = Arrays.stream(methods) .parallel() .filter(method -> method.getName().equals(methodName)) .findAny(); if (optional.isPresent()) { Method m = optional.get(); final String lockName = getLockName(m, arguments); return lock(pjp, m, lockName); } return null; } public String getLockName(Method method, Object[] args) throws Throwable { DistributedLock annotation = method.getAnnotation(DistributedLock.class); String lockName = annotation.lockName(), param = annotation.param(); if (StringUtils.isEmpty(lockName)) { if (!StringUtils.isEmpty(param)) { if (args.length > 0) { Object arg = args[0]; lockName = String.valueOf(getParam(arg, param)); String preLockName = annotation.lockNamePre(), postLockName = annotation.lockNamePost(); lockName = preLockName + lockName + postLockName; return lockName; } } } else { return lockName; } throw new IllegalArgumentException("lockName can‘t be empty!"); } /** * 從方法參數獲取數據 * * @param param * @param arg 方法的參數數組 * @return */ public Object getParam(Object arg, String param) throws Throwable { if (!StringUtils.isEmpty(param) && arg != null) { Object result = PropertyUtils.getProperty(arg, param); return result; } return null; } public Object lock(ProceedingJoinPoint pjp, Method method, final String lockName) { DistributedLock annotation = method.getAnnotation(DistributedLock.class); boolean fairLock = annotation.fairLock(); boolean tryLock = annotation.tryLock(); if (tryLock) { return tryLock(pjp, annotation, lockName, fairLock); } else { return lock(pjp,lockName, fairLock); } } public Object lock(ProceedingJoinPoint pjp, final String lockName, boolean fairLock) { return lockTemplate.lock(new DistributedLockCallback<Object>() { @Override public Object process() { return proceed(pjp); } @Override public String getLockName() { return lockName; } }, fairLock); } public Object tryLock(ProceedingJoinPoint pjp, DistributedLock annotation, final String lockName, boolean fairLock) { long waitTime = annotation.waitTime(), leaseTime = annotation.leaseTime(); TimeUnit timeUnit = annotation.timeUnit(); return lockTemplate.tryLock(new DistributedLockCallback<Object>() { @Override public Object process() { return proceed(pjp); } @Override public String getLockName() { return lockName; } }, waitTime, leaseTime, timeUnit, fairLock); } public Object proceed(ProceedingJoinPoint pjp) { try { return pjp.proceed(); } catch (Throwable throwable) { throwable.printStackTrace(); } return null; } }
- 使用註解@DistributedLock實現分布式鎖
有了上面兩段代碼,以後需要用到分布式鎖,只需在核心業務邏輯方法添加註解@DistributedLock,並設置LockName、fairLock等即可。
@Service public class DistributionService { @Autowired private RedissonClient redissonClient; @DistributedLock(lockName = "lock", lockNamePost = ".lock") public Integer aspect() { RMap<String, Integer> map = redissonClient.getMap("distributionTest"); Integer count = map.get("count"); if (count > 0) { count = count - 1; map.put("count", count); } return count; } @DistributedLock(param = "id", lockNamePost = ".lock") public Integer aspect(Person person) { RMap<String, Integer> map = redissonClient.getMap("distributionTest"); Integer count = map.get("count"); if (count > 0) { count = count - 1; map.put("count", count); } return count; } }
- 測試
定義一個Worker類:
public class Worker implements Runnable { private final CountDownLatch startSignal; private final CountDownLatch doneSignal; private final DistributionService service; public Worker(CountDownLatch startSignal, CountDownLatch doneSignal, DistributionService service) { this.startSignal = startSignal; this.doneSignal = doneSignal; this.service = service; } @Override public void run() { try { startSignal.await(); System.out.println(Thread.currentThread().getName() + " start"); //Integer count = service.aspect(); Integer count = service.aspect(new Person(1, "張三")); System.out.println(Thread.currentThread().getName() + ": count = " + count); doneSignal.countDown(); } catch (InterruptedException ex) { System.out.println(ex); } } }
定義Controller類:
@RestController @RequestMapping("/distributedLockTest") public class DistributedLockTestController { private int count = 10; @Autowired private RedissonClient redissonClient; @Autowired private DistributionService service; @RequestMapping(method = RequestMethod.GET) public String distributedLockTest() throws Exception { RMap<String, Integer> map = redissonClient.getMap("distributionTest"); map.put("count", 8); CountDownLatch startSignal = new CountDownLatch(1); CountDownLatch doneSignal = new CountDownLatch(count); for (int i = 0; i < count; ++i) { // create and start threads new Thread(new Worker(startSignal, doneSignal, service)).start(); } startSignal.countDown(); // let all threads proceed doneSignal.await(); System.out.println("All processors done. Shutdown connection"); return "finish"; } }
Redisson基本配置:
singleServerConfig: idleConnectionTimeout: 10000 pingTimeout: 1000 connectTimeout: 10000 timeout: 3000 retryAttempts: 3 retryInterval: 1500 reconnectionTimeout: 3000 failedAttempts: 3 password: subscriptionsPerConnection: 5 clientName: null address: "redis://127.0.0.1:6379" subscriptionConnectionMinimumIdleSize: 1 subscriptionConnectionPoolSize: 50 connectionMinimumIdleSize: 10 connectionPoolSize: 64 database: 0 dnsMonitoring: false dnsMonitoringInterval: 5000 threads: 0 nettyThreads: 0 codec: !<org.redisson.codec.JsonJacksonCodec> {} useLinuxNativeEpoll: false
工程中需要註入的對象:
@Value("classpath:/redisson-conf.yml") Resource configFile; @Bean(destroyMethod = "shutdown") RedissonClient redisson() throws IOException { Config config = Config.fromYAML(configFile.getInputStream()); return Redisson.create(config); } @Bean DistributedLockTemplate distributedLockTemplate(RedissonClient redissonClient) { return new SingleDistributedLockTemplate(redissonClient); }
需要引入的依賴:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-aop</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.5.3</version> </dependency> <dependency> <groupId>commons-beanutils</groupId> <artifactId>commons-beanutils</artifactId> <version>1.8.3</version> </dependency>
最後啟動工程,然後訪問localhost:8080/distributedLockTest,可以看到如下結果:
觀察結果,可以看出,10個線程中只有8個線程能執行count減1操作,而且多個線程是依次執行的。也就是說分布式鎖起作用了。
至此,使用Redisson實現分布式鎖,然後使用Spring AOP簡化分布式鎖介紹完畢。
若有什麽地方有錯誤的或需要改進的,歡迎留言一起討論交流。
使用Redisson實現分布式鎖,Spring AOP簡化之