使用redis實現分散式鎖(在叢集環境下讓任務排隊執行)
阿新 • • 發佈:2019-01-23
需求:在叢集環境下,讀寫同一個資料庫表,我們為了保證資料的最終一致性,需要讓任務排隊執行。分散式鎖的實現方式,網上有很多種方式。
1.使用資料庫表實現;
2.使用zookeeper實現;
3.使用redis實現;
這裡講用redis實現的方法,其他兩種實現方式,讀者可以自行百度。
redis是個很好的NoSQL資料庫,多用於快取資料的場景,但同時也可以用來製作一個分散式事務鎖,其實現的原理基於幾個命令:
SETNX key val;
解釋:當且僅當key不存在時,set一個key為val的字串,返回1;若key存在,則什麼都不做,返回0。
expire key timeout;
為key設定一個超時時間,單位為second,超過這個時間鎖會自動釋放,避免死鎖。
delete key;
刪除key。
那麼如何實現這個分散式事務鎖呢?
1.新增專案中的redis的jar包依賴;
<dependency>
<groupId>commons-pool</groupId>
<artifactId>commons-pool</artifactId>
<version>1.6</version>
</dependency>
<dependency>
<groupId> org.springframework.data</groupId>
<artifactId>spring-data-redis</artifactId>
<version>${spring-data-redis.version}</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId >jedis</artifactId>
<version>${redis.version}</version>
</dependency>
2.在專案的Spring配置檔案中配置redis
<bean id="poolConfig" class="redis.clients.jedis.JedisPoolConfig">
<!-- 最大空閒數 -->
<property name="maxIdle" value="300" />
<!-- 最大連線數 -->
<property name="maxTotal" value="600" />
<!-- 最大建立連線等待時間,單位毫秒-->
<property name="maxWaitMillis" value="200000" />
<!-- 指明是否在從池中取出連線前進行檢驗,如果檢驗失敗,則從池中去除連線並嘗試取出另一個 -->
<property name="testOnBorrow" value="true" />
</bean>
<bean id="jedisConnectionFactory" class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory">
<property name="hostName" value="119.23.134.93" />
<property name="port" value="6379"/>
<property name="password" value="coeexp123456"/>
<property name="poolConfig" ref="poolConfig" />
<property name="usePool" value="true"/>
</bean>
<bean id="redisTemplate" class="org.springframework.data.redis.core.RedisTemplate">
<property name="connectionFactory" ref="jedisConnectionFactory" />
</bean>
3.在實際開發中都是使用Spring的註解,面向切面程式設計,
這裡需要自定義兩個註解:
註解1:用於方法上
/**
* 類說明
* @author ll
* @version 建立時間:2017年10月27日上午10:10:47
*/
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
//@Component
public @interface P4jSyn {
/**
* 鎖的key<br/>
* 如果想增加坑的個數新增非固定鎖,可以在引數上新增@P4jSynKey註解,但是本引數是必寫選項<br/>
* redis key的拼寫規則為 "RedisSyn+" + synKey + @P4jSynKey<br/>
*
*/
String synKey();
/**
* 持鎖時間,超時時間,持鎖超過此時間自動丟棄鎖<br/>
* 單位毫秒,預設20秒<br/>
* 如果為0表示永遠不釋放鎖,在設定為0的情況下toWait為true是沒有意義的<br/>
* 但是沒有比較強的業務要求下,不建議設定為0
*/
long keepMills() default 20 * 1000;
/**
* 當獲取鎖失敗,是繼續等待還是放棄<br/>
* 預設為繼續等待
*/
boolean toWait() default true;
/**
* 沒有獲取到鎖的情況下且toWait()為繼續等待,睡眠指定毫秒數繼續獲取鎖,也就是輪訓獲取鎖的時間<br/>
* 預設為10毫秒
*
* @return
*/
long sleepMills() default 10;
/**
* 鎖獲取超時時間:<br/>
* 沒有獲取到鎖的情況下且toWait()為true繼續等待,最大等待時間,如果超時丟擲
* {@link java.util.concurrent.TimeoutException.TimeoutException}
* ,可捕獲此異常做相應業務處理;<br/>
* 單位毫秒,預設一分鐘,如果設定為0即為沒有超時時間,一直獲取下去;
*
* @return
*/
long maxSleepMills() default 60 * 1000;
}
註解2:用於引數上
/**
* 類說明
* @author ll
* @version 建立時間:2017年10月27日上午10:12:25
*/
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface P4jSynKey {
/**
* key的拼接順序
*
* @return
*/
int index() default 0;
}
4.分散式鎖的切面類
/**
* 類說明
* @author ll
* @version 建立時間:2017年10月27日上午10:14:06
*/
@Order(1)
@Aspect
@Component("redisLockAspect")
public class RedisLockAspect {
private static final Logger logger = LoggerFactory.getLogger(RedisLockAspect.class);
@Autowired
@Qualifier("redisTemplate")
private RedisTemplate<String, Long> redisTemplate;
@Around("execution(* com.coe..*SycLock(..))")
public Object lock(ProceedingJoinPoint pjp) throws Throwable {
//獲取P4jSyn註解
P4jSyn lockInfo = getLockInfo(pjp);
if (lockInfo == null) {
throw new IllegalArgumentException("配置引數錯誤");
}
String synKey = getSynKey(pjp, lockInfo.synKey());
if (synKey == null || "".equals(synKey)) {
throw new IllegalArgumentException("配置引數synKey錯誤");
}
boolean lock = false; //標誌物,true表示獲取了到了該鎖
Object obj = null;
try {
//超時時間 (60秒),系統當前時間再往後加60秒
long maxSleepMills = System.currentTimeMillis() + lockInfo.maxSleepMills();
while (!lock) {
//持鎖時間,系統當前時間再往後加20秒
long keepMills = System.currentTimeMillis() + lockInfo.keepMills();
//為key“synKey”設定值keepMills,如果設定成功,則返回true
lock = setIfAbsent(synKey, keepMills);
//lock為true表示得到了鎖,沒有人加過相同的鎖
if(lock){
//如果獲得了該鎖,則呼叫目標方法,執行業務邏輯任務
obj = pjp.proceed();
}
// 鎖設定了沒有超時時間
/**如果沒有通過setIfAbsent拿到資料,然後判斷是否對鎖設定了超時機制
,沒有設定則判斷是否需要繼續等待*/
else if(lockInfo.keepMills() <= 0){
// 繼續等待獲取鎖
if (lockInfo.toWait()) {
// 如果超過最大等待時間丟擲異常
if(lockInfo.maxSleepMills()>0&&System.currentTimeMillis()> maxSleepMills){
throw new TimeoutException("獲取鎖資源等待超時");
}
//只要當前時間沒有大於超時時間,則繼續等待10毫秒,以便繼續嘗試去獲取鎖
TimeUnit.MILLISECONDS.sleep(lockInfo.sleepMills());
}else{
//如果註解上的“toWait()”為false,表示如果當前沒有獲取到鎖,則放棄獲取該鎖,
//即放棄執行此任務
break;
}
}
// 已過期,並且getAndSet後舊的時間戳依然是過期的,可以認為獲取到了鎖
/**
* 1.如果當前執行緒2進入的時候,
* 系統時間已經大於了上個任務的持鎖時間(由於上次任務大導致其執行時間過長),
* 則表示需要強制讓上個任務釋放鎖,讓本任務獲得鎖,以執行本次任務;
* 2.如果執行緒1釋放了鎖,剛好執行緒2過了 if(lock){ //to do something}的判斷,
* 而進入了此處判斷,需要對執行緒2任務加鎖,保證事務不衝突
*/
else if(System.currentTimeMillis()>getLock(synKey)&&(System.currentTimeMillis()> getSet(synKey, keepMills))) {
lock = true; //lock一定要設定成true,不然釋放不了鎖
obj = pjp.proceed();
}
// 沒有得到任何鎖
else {
// 繼續等待獲取鎖
if (lockInfo.toWait()) {
// 如果超過最大等待時間丟擲異常
if (lockInfo.maxSleepMills()>0&&System.currentTimeMillis() maxSleepMills) {
throw new TimeoutException("獲取鎖資源等待超時");
}
//只要當前時間沒有大於超時時間,則繼續等待10毫秒,以便繼續嘗試去獲取鎖
TimeUnit.MILLISECONDS.sleep(lockInfo.sleepMills());
}else {
// 放棄等待,放棄獲取鎖(放棄本任務的執行)
break;
}
}
}
} catch (Exception e) {
e.printStackTrace();
throw e;
} finally {
// 如果獲取到了鎖,釋放鎖
if (lock) {
releaseLock(synKey);
}
}
return obj;
}
/**
* 獲取包括方法引數上的key<br/>
* redis key的拼寫規則為 "RedisSyn+" + synKey + @P4jSynKey
*
*/
private String getSynKey(JoinPoint pjp, String synKey) {
try {
synKey = "RedisSyn+" + synKey; //指定synKey的值固定為RedisSyn+synKey
Object[] args = pjp.getArgs(); //獲取切點上的所有引數
if (args != null && args.length > 0) {
MethodSignature methodSignature = (MethodSignature) pjp.getSignature();
Annotation[][] paramAnnotationArrays = methodSignature.getMethod().getParameterAnnotations();
SortedMap<Integer, String> keys = new TreeMap<Integer, String>();
for (int ix = 0; ix < paramAnnotationArrays.length; ix++) {
P4jSynKey p4jSynKey = getAnnotation(P4jSynKey.class, paramAnnotationArrays[ix]);
if (p4jSynKey != null) {
Object arg = args[ix];
if (arg != null) {
keys.put(p4jSynKey.index(), arg.toString());
}
}
}
if (keys != null && keys.size() > 0) {
for (String key : keys.values()) {
synKey = synKey + key;
}
}
}
return synKey;
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
@SuppressWarnings("unchecked")
private static <T extends Annotation> T getAnnotation(final Class<T> annotationClass, final Annotation[] annotations) {
if (annotations != null && annotations.length > 0) {
for (final Annotation annotation : annotations) {
if (annotationClass.equals(annotation.annotationType())) {
return (T) annotation;
}
}
}
return null;
}
/**
* 獲取RedisLock註解資訊
*/
private P4jSyn getLockInfo(ProceedingJoinPoint pjp) {
try {
MethodSignature methodSignature = (MethodSignature) pjp.getSignature();
Method method = methodSignature.getMethod();
P4jSyn lockInfo = method.getAnnotation(P4jSyn.class);
return lockInfo;
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
public BoundValueOperations<String, Long> getOperations(String key) {
return redisTemplate.boundValueOps(key);
}
/**
* Set {@code value} for {@code key}, only if {@code key} does not exist.
* <p>
* See http://redis.io/commands/setnx
*
* @param key
* must not be {@literal null}.
* @param value
* must not be {@literal null}.
* @return
*/
/**
* 如果key不存在,則為key設定值value,並且返回true,否則返回false
* @param key
* @param value
* @return
*/
public boolean setIfAbsent(String key, Long value) {
return getOperations(key).setIfAbsent(value);
}
/**
* 獲取key上的值
* @param key
* @return
*/
public long getLock(String key) {
Long time = getOperations(key).get();
if (time == null) {
return 0;
}
return time;
}
/**
* 獲取key上的舊值,並且為該key設定新值value,如果舊值不存在則返回0
* @param key
* @param value
* @return
*/
public long getSet(String key, Long value) {
Long time = getOperations(key).getAndSet(value);
if (time == null) {
return 0;
}
return time;
}
/**
* 刪除key
* @param key
*/
public void releaseLock(String key) {
redisTemplate.delete(key);
}
}
鎖寫好之後,編寫測試程式碼:
定義一個成員變數i,啟動100個執行緒同時訪問這個方法,讓i++;
public class LockInfo {
private int i = 0;
@P4jSyn(synKey="getTrackno")
public void addSycLock(@P4jSynKey(index=1)String flag,@P4jSynKey(index=2) String channelCode){
i++;
System.out.println("i =====================" + i);
}
}
開啟100個執行緒同時執行這段程式碼
@Component("lockTest")
public class LockTest {
@Autowired
private LockInfo lockInfo ;
@Scheduled(fixedRate=3600000,initialDelay = 10000)
public void run(){
for (int i = 0; i < 100; i++) {
new Thread(new Runnable() {
public void run() {
try {
Thread.currentThread().sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
lockInfo .addSycLock("11111", "222222");
}
}).start();
}
}
}
測試結果:雖然同時開啟100個執行緒來呼叫這個方法,但是i的值始終是依次遞增,大家可以試試,去掉註解之後再同時開啟100個執行緒來呼叫這個方法,看是不是得到不同的結果