1. 程式人生 > >使用redis實現分散式鎖(在叢集環境下讓任務排隊執行)

使用redis實現分散式鎖(在叢集環境下讓任務排隊執行)

需求:在叢集環境下,讀寫同一個資料庫表,我們為了保證資料的最終一致性,需要讓任務排隊執行。分散式鎖的實現方式,網上有很多種方式。
1.使用資料庫表實現;
2.使用zookeeper實現;
3.使用redis實現;
這裡講用redis實現的方法,其他兩種實現方式,讀者可以自行百度。
redis是個很好的NoSQL資料庫,多用於快取資料的場景,但同時也可以用來製作一個分散式事務鎖,其實現的原理基於幾個命令:
SETNX key val;
解釋:當且僅當key不存在時,set一個key為val的字串,返回1;若key存在,則什麼都不做,返回0。
expire key timeout;
為key設定一個超時時間,單位為second,超過這個時間鎖會自動釋放,避免死鎖。
delete key;
刪除key。
那麼如何實現這個分散式事務鎖呢?


1.新增專案中的redis的jar包依賴;

        <dependency>
            <groupId>commons-pool</groupId>
            <artifactId>commons-pool</artifactId>
            <version>1.6</version>
        </dependency>

        <dependency>  
            <groupId>
org.springframework.data</groupId> <artifactId>spring-data-redis</artifactId> <version>${spring-data-redis.version}</version> </dependency> <dependency> <groupId>redis.clients</groupId> <artifactId
>
jedis</artifactId> <version>${redis.version}</version> </dependency>

2.在專案的Spring配置檔案中配置redis

<bean id="poolConfig" class="redis.clients.jedis.JedisPoolConfig"> 
    <!-- 最大空閒數 -->
    <property name="maxIdle" value="300" /> 
    <!-- 最大連線數 -->
    <property name="maxTotal" value="600" /> 
    <!-- 最大建立連線等待時間,單位毫秒--> 
    <property name="maxWaitMillis" value="200000" /> 
    <!-- 指明是否在從池中取出連線前進行檢驗,如果檢驗失敗,則從池中去除連線並嘗試取出另一個 -->
    <property name="testOnBorrow" value="true" />

  </bean> 

 <bean id="jedisConnectionFactory" class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory"> 
        <property name="hostName" value="119.23.134.93" /> 
        <property name="port" value="6379"/> 
        <property name="password" value="coeexp123456"/>
        <property name="poolConfig" ref="poolConfig" /> 
        <property name="usePool" value="true"/> 
    </bean>

  <bean id="redisTemplate" class="org.springframework.data.redis.core.RedisTemplate"> 
    <property name="connectionFactory"   ref="jedisConnectionFactory" /> 
  </bean>

3.在實際開發中都是使用Spring的註解,面向切面程式設計,
這裡需要自定義兩個註解:

註解1:用於方法上
/**
 * 類說明
 * @author  ll
 * @version 建立時間:2017年10月27日上午10:10:47
 */
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
//@Component
public @interface P4jSyn {
     /** 
     * 鎖的key<br/> 
     * 如果想增加坑的個數新增非固定鎖,可以在引數上新增@P4jSynKey註解,但是本引數是必寫選項<br/> 
     * redis key的拼寫規則為 "RedisSyn+" + synKey + @P4jSynKey<br/> 
     *  
     */  
    String synKey();  

    /** 
     * 持鎖時間,超時時間,持鎖超過此時間自動丟棄鎖<br/> 
     * 單位毫秒,預設20秒<br/> 
     * 如果為0表示永遠不釋放鎖,在設定為0的情況下toWait為true是沒有意義的<br/> 
     * 但是沒有比較強的業務要求下,不建議設定為0 
     */  
    long keepMills() default 20 * 1000;  

    /** 
     * 當獲取鎖失敗,是繼續等待還是放棄<br/> 
     * 預設為繼續等待 
     */  
    boolean toWait() default true;  

    /** 
     * 沒有獲取到鎖的情況下且toWait()為繼續等待,睡眠指定毫秒數繼續獲取鎖,也就是輪訓獲取鎖的時間<br/> 
     * 預設為10毫秒 
     *  
     * @return 
     */  
    long sleepMills() default 10;  

    /** 
     * 鎖獲取超時時間:<br/> 
     * 沒有獲取到鎖的情況下且toWait()為true繼續等待,最大等待時間,如果超時丟擲 
     * {@link java.util.concurrent.TimeoutException.TimeoutException} 
     * ,可捕獲此異常做相應業務處理;<br/> 
     * 單位毫秒,預設一分鐘,如果設定為0即為沒有超時時間,一直獲取下去; 
     *  
     * @return 
     */  
    long maxSleepMills() default 60 * 1000;  
}

註解2:用於引數上

/**
 * 類說明
 * @author  ll
 * @version 建立時間:2017年10月27日上午10:12:25
 */
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface P4jSynKey {
    /** 
     * key的拼接順序 
     *  
     * @return 
     */  
    int index() default 0; 
}

4.分散式鎖的切面類

/**
* 類說明
* @author   ll
* @version 建立時間:2017年10月27日上午10:14:06
*/
@Order(1)
@Aspect  
@Component("redisLockAspect")
public class RedisLockAspect {
private static final Logger logger = LoggerFactory.getLogger(RedisLockAspect.class);
    @Autowired  
   @Qualifier("redisTemplate")  
   private RedisTemplate<String, Long> redisTemplate;  

   @Around("execution(* com.coe..*SycLock(..))")  
   public Object lock(ProceedingJoinPoint  pjp) throws Throwable {  
    //獲取P4jSyn註解
       P4jSyn lockInfo = getLockInfo(pjp);  
       if (lockInfo == null) {  
           throw new IllegalArgumentException("配置引數錯誤");  
       }  
       String synKey = getSynKey(pjp, lockInfo.synKey());  
       if (synKey == null || "".equals(synKey)) {  
           throw new IllegalArgumentException("配置引數synKey錯誤");  
       }  
       boolean lock = false;  //標誌物,true表示獲取了到了該鎖
       Object obj = null;  
       try {  
           //超時時間 (60秒),系統當前時間再往後加60秒
           long maxSleepMills = System.currentTimeMillis() + lockInfo.maxSleepMills();  
           while (!lock) {
            //持鎖時間,系統當前時間再往後加20秒
               long keepMills = System.currentTimeMillis() + lockInfo.keepMills();  
               //為key“synKey”設定值keepMills,如果設定成功,則返回true
               lock = setIfAbsent(synKey, keepMills);  
               //lock為true表示得到了鎖,沒有人加過相同的鎖  
               if(lock){  
                //如果獲得了該鎖,則呼叫目標方法,執行業務邏輯任務
                   obj = pjp.proceed();  
               }  
               // 鎖設定了沒有超時時間  
               /**如果沒有通過setIfAbsent拿到資料,然後判斷是否對鎖設定了超時機制
               ,沒有設定則判斷是否需要繼續等待*/
               else if(lockInfo.keepMills() <= 0){  
                   // 繼續等待獲取鎖  
                   if (lockInfo.toWait()) {  
                       // 如果超過最大等待時間丟擲異常  
                       if(lockInfo.maxSleepMills()>0&&System.currentTimeMillis()> maxSleepMills){  
                           throw new TimeoutException("獲取鎖資源等待超時");  
                       }
                       //只要當前時間沒有大於超時時間,則繼續等待10毫秒,以便繼續嘗試去獲取鎖
                       TimeUnit.MILLISECONDS.sleep(lockInfo.sleepMills());  
                   }else{ 
                    //如果註解上的“toWait()”為false,表示如果當前沒有獲取到鎖,則放棄獲取該鎖,
                    //即放棄執行此任務
                       break;  
                   }  
               }  
               // 已過期,並且getAndSet後舊的時間戳依然是過期的,可以認為獲取到了鎖  
              /**
               * 1.如果當前執行緒2進入的時候,
               * 系統時間已經大於了上個任務的持鎖時間(由於上次任務大導致其執行時間過長),
               * 則表示需要強制讓上個任務釋放鎖,讓本任務獲得鎖,以執行本次任務;
               * 2.如果執行緒1釋放了鎖,剛好執行緒2過了 if(lock){ //to do something}的判斷,
               * 而進入了此處判斷,需要對執行緒2任務加鎖,保證事務不衝突
               */
               else if(System.currentTimeMillis()>getLock(synKey)&&(System.currentTimeMillis()> getSet(synKey, keepMills))) {  
                   lock = true;             //lock一定要設定成true,不然釋放不了鎖
                   obj = pjp.proceed();  
               }  
               // 沒有得到任何鎖  
               else {  
                   // 繼續等待獲取鎖  
                   if (lockInfo.toWait()) {  
                       // 如果超過最大等待時間丟擲異常  
                       if (lockInfo.maxSleepMills()>0&&System.currentTimeMillis() maxSleepMills) {  
                           throw new TimeoutException("獲取鎖資源等待超時");  
                       }  
                       //只要當前時間沒有大於超時時間,則繼續等待10毫秒,以便繼續嘗試去獲取鎖
                       TimeUnit.MILLISECONDS.sleep(lockInfo.sleepMills());  
                   }else {  
                    // 放棄等待,放棄獲取鎖(放棄本任務的執行) 
                       break;  
                   }  
               }  
           }  
       } catch (Exception e) {  
           e.printStackTrace(); 
           throw e;  
       } finally {  
           // 如果獲取到了鎖,釋放鎖  
           if (lock) {  
               releaseLock(synKey);  
           }  
       }  
       return obj;  
   }  

   /** 
    * 獲取包括方法引數上的key<br/> 
    * redis key的拼寫規則為 "RedisSyn+" + synKey + @P4jSynKey 
    *  
    */  
   private String getSynKey(JoinPoint pjp, String synKey) {  
       try {  
           synKey = "RedisSyn+" + synKey;  //指定synKey的值固定為RedisSyn+synKey
           Object[] args = pjp.getArgs();  //獲取切點上的所有引數
           if (args != null && args.length > 0) {  
               MethodSignature methodSignature = (MethodSignature) pjp.getSignature();  
               Annotation[][] paramAnnotationArrays = methodSignature.getMethod().getParameterAnnotations();  
               SortedMap<Integer, String> keys = new TreeMap<Integer, String>();  
               for (int ix = 0; ix < paramAnnotationArrays.length; ix++) {  
                   P4jSynKey p4jSynKey = getAnnotation(P4jSynKey.class, paramAnnotationArrays[ix]);  
                   if (p4jSynKey != null) {  
                       Object arg = args[ix];  
                       if (arg != null) {  
                           keys.put(p4jSynKey.index(), arg.toString());  
                       }  
                   }  
               }  
               if (keys != null && keys.size() > 0) {  
                   for (String key : keys.values()) {  
                       synKey = synKey + key;  
                   }  
               }  
           }  
           return synKey;  
       } catch (Exception e) {  
           e.printStackTrace();  
       }  
       return null;  
   }  

   @SuppressWarnings("unchecked")  
   private static <T extends Annotation> T getAnnotation(final Class<T> annotationClass, final Annotation[] annotations) {  
       if (annotations != null && annotations.length > 0) {  
           for (final Annotation annotation : annotations) {  
               if (annotationClass.equals(annotation.annotationType())) {  
                   return (T) annotation;  
               }  
           }  
       }  
       return null;  
   }  

   /** 
    * 獲取RedisLock註解資訊 
    */  
   private P4jSyn getLockInfo(ProceedingJoinPoint  pjp) {  
       try {  
           MethodSignature methodSignature = (MethodSignature) pjp.getSignature();  
           Method method = methodSignature.getMethod();  
           P4jSyn lockInfo = method.getAnnotation(P4jSyn.class);  
           return lockInfo;  
       } catch (Exception e) {  
           e.printStackTrace(); 
       }  
       return null;  
   }  

   public BoundValueOperations<String, Long> getOperations(String key) {  
       return redisTemplate.boundValueOps(key);  
   }  

   /** 
    * Set {@code value} for {@code key}, only if {@code key} does not exist. 
    * <p> 
    * See http://redis.io/commands/setnx 
    *  
    * @param key 
    *            must not be {@literal null}. 
    * @param value 
    *            must not be {@literal null}. 
    * @return 
    */  
   /**
    * 如果key不存在,則為key設定值value,並且返回true,否則返回false
    * @param key
    * @param value
    * @return
    */
   public boolean setIfAbsent(String key, Long value) {  
       return getOperations(key).setIfAbsent(value);   
   }  

   /**
    * 獲取key上的值
    * @param key
    * @return
    */
   public long getLock(String key) {  
       Long time = getOperations(key).get();  
       if (time == null) {  
           return 0;  
       }  
       return time;  
   }  

   /**
    * 獲取key上的舊值,並且為該key設定新值value,如果舊值不存在則返回0
    * @param key
    * @param value
    * @return
    */
   public long getSet(String key, Long value) {  
       Long time = getOperations(key).getAndSet(value);  
       if (time == null) {  
           return 0;  
       }  
       return time;  
   }  

   /**
    * 刪除key
    * @param key
    */
   public void releaseLock(String key) { 
       redisTemplate.delete(key);  
   } 
}

鎖寫好之後,編寫測試程式碼:
定義一個成員變數i,啟動100個執行緒同時訪問這個方法,讓i++;

public class LockInfo {
private int i = 0;

    @P4jSyn(synKey="getTrackno")
    public void addSycLock(@P4jSynKey(index=1)String flag,@P4jSynKey(index=2) String channelCode){
        i++;
        System.out.println("i =====================" + i);
    }
}

開啟100個執行緒同時執行這段程式碼

@Component("lockTest")
public class LockTest {
    @Autowired
    private LockInfo lockInfo ;

    @Scheduled(fixedRate=3600000,initialDelay = 10000)
    public void run(){
        for (int i = 0; i < 100; i++) {
            new Thread(new Runnable() {
                public void run() {
                    try {
                        Thread.currentThread().sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    lockInfo .addSycLock("11111", "222222");
                }
            }).start();
        }
    }
}

測試結果:雖然同時開啟100個執行緒來呼叫這個方法,但是i的值始終是依次遞增,大家可以試試,去掉註解之後再同時開啟100個執行緒來呼叫這個方法,看是不是得到不同的結果
這裡寫圖片描述