1. 程式人生 > 資料庫 >springBoot實現redis分散式鎖

springBoot實現redis分散式鎖

 

   參考:https://blog.csdn.net/weixin_44634197/article/details/108308395

、、

 

使用redis的set命令帶NX(not exist)引數實現分散式鎖

NX:只有當不存在時,才可以set;成功set會返回OK,不成功返回null

//分散式鎖
public Map<String, List<Catelog2Vo>> getCatalogJsonFromDBWithRedisLock() {

    //1、佔分布式鎖。去redis佔坑
    Boolean aBoolean = stringRedisTemplate.opsForValue().setIfAbsent("lock", "lock");
    if(aBoolean){
        //加鎖成功 執行業務
        Map<String, List<Catelog2Vo>> dataFromDB = this.getDataFromDB();
        //刪除鎖
        stringRedisTemplate.delete("lock");
        return dataFromDB;
    }else {
        //加鎖失敗   重試 自旋
        return getCatalogJsonFromDBWithRedisLock();
    }
}

階段二 獨立加上分散式鎖的過期時間

//分散式鎖
public Map<String, List<Catelog2Vo>> getCatalogJsonFromDBWithRedisLock() {

    //1、佔分布式鎖。去redis佔坑
    Boolean aBoolean = stringRedisTemplate.opsForValue().setIfAbsent("lock", "lock");
    if(aBoolean){
        //加鎖成功 執行業務
        //2、設定過期時間
        stringRedisTemplate.expire("lock",30, TimeUnit.SECONDS);
        Map<String, List<Catelog2Vo>> dataFromDB = this.getDataFromDB();
        //刪除鎖
        stringRedisTemplate.delete("lock");
        return dataFromDB;
    }else {
        //加鎖失敗   重試 自旋
        return getCatalogJsonFromDBWithRedisLock();
    }
}

階段三 原子佔鎖和設定過期時間

//分散式鎖
public Map<String, List<Catelog2Vo>> getCatalogJsonFromDBWithRedisLock() {

    //1、佔分布式鎖。去redis佔坑  並設定過期時間 必須是同步的 原子的
    Boolean aBoolean = stringRedisTemplate.opsForValue().setIfAbsent("lock", "lock",30,TimeUnit.SECONDS);
    if(aBoolean){
        //加鎖成功 執行業務
        Map<String, List<Catelog2Vo>> dataFromDB = this.getDataFromDB();
        //刪除鎖
        stringRedisTemplate.delete("lock");
        return dataFromDB;
    }else {
        //加鎖失敗   重試 自旋
        return getCatalogJsonFromDBWithRedisLock();
    }
}

階段四 刪鎖進行許可權uuid匹配

//分散式鎖
public Map<String, List<Catelog2Vo>> getCatalogJsonFromDBWithRedisLock() {

    //1、佔分布式鎖。去redis佔坑  並設定過期時間 必須是同步的 原子的
    String uuid = UUID.randomUUID().toString();
    Boolean aBoolean = stringRedisTemplate.opsForValue().setIfAbsent("lock",uuid,30,TimeUnit.SECONDS);
    if(aBoolean){
        //加鎖成功 執行業務
        Map<String, List<Catelog2Vo>> dataFromDB = this.getDataFromDB();
        String lock = stringRedisTemplate.opsForValue().get("lock");
        if(uuid.equals(lock)){
            //刪除自己的鎖
            stringRedisTemplate.delete("lock");
        }
        return dataFromDB;
    }else {
        //加鎖失敗   重試 自旋
        return getCatalogJsonFromDBWithRedisLock();
    }
}

階段五 lua指令碼 刪鎖原子操作

//分散式鎖
public Map<String, List<Catelog2Vo>> getCatalogJsonFromDBWithRedisLock() {

    //1、佔分布式鎖。去redis佔坑  並設定過期時間 必須是同步的 原子的
    String uuid = UUID.randomUUID().toString();
    Boolean aBoolean = stringRedisTemplate.opsForValue().setIfAbsent("lock",uuid,30,TimeUnit.SECONDS);
    if(aBoolean){
        //加鎖成功 執行業務
        Map<String, List<Catelog2Vo>> dataFromDB = this.getDataFromDB();
        //獲取值 + 對比 + 刪除 必須是原子操作  lua指令碼解鎖
        String luaScript = "if redis.call('get', KEYS[1]) == ARGV[1] " +
                "then " +
                "   return  redis.call('del', KEYS[1])" +
                "else " +
                "   return 0 " +
                "end";
        Long result = stringRedisTemplate.execute(new DefaultRedisScript<Long>(luaScript, Long.class), Arrays.asList("lock"), uuid);
        
        return dataFromDB;
    }else {
        //加鎖失敗   重試 自旋
        return getCatalogJsonFromDBWithRedisLock();
    }
}

階段六 最終結果

不論業務是否正確完成都刪除自己建立的鎖

    //分散式鎖
    public Map<String, List<Catelog2Vo>> getCatalogJsonFromDBWithRedisLock() {

        //1、佔分布式鎖。去redis佔坑  並設定過期時間 必須是同步的 原子的
        String uuid = UUID.randomUUID().toString();
        Boolean aBoolean = stringRedisTemplate.opsForValue().setIfAbsent("lock",uuid,300,TimeUnit.SECONDS);
        if(aBoolean){
            //加鎖成功 執行業務
            Map<String, List<Catelog2Vo>> dataFromDB = null;
            try {
                dataFromDB = this.getDataFromDB();
            }finally {
                //獲取值 + 對比 + 刪除 必須是原子操作  lua指令碼解鎖
                String luaScript = "if redis.call('get', KEYS[1]) == ARGV[1] " +
                        "then " +
                        "   return  redis.call('del', KEYS[1])" +
                        "else " +
                        "   return 0 " +
                        "end";
                Long result = stringRedisTemplate.execute(new DefaultRedisScript<Long>(luaScript, Long.class), Arrays.asList("lock"), uuid);
            }
            return dataFromDB;
        }else {
            //加鎖失敗   重試 自旋
            //睡眠
            try {
                Thread.sleep(200);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return getCatalogJsonFromDBWithRedisLock();
        }
    }

 

  • 本博文通過進階的形式 不斷提出問題以及解決思路,一步一步完善程式碼,實現具有高可靠性的分散式鎖功能.

自己實現redis分散式鎖

 

通過redis實現分散式鎖

public class RedisLockImpl {
    private static final long EXPIRETIME = 3000l;

    public Map<String,Object> getRedisLock() {

        //分散式鎖實現
        if (redisLock.lock("redisKey", EXPIRETIME, 0, 0)) {
            try {
                //續命
               Thread thread = new Thread(new Runnable() {
                   @Override
                   public void run() {
                       try {
                           Thread.sleep(EXPIRETIME/2);//這裡休眠設定的超時時間的一般
                       }catch (InterruptedException e){
                           e.printStackTrace();
                       }

                       //判斷key是否存在 如果存在就重新設定超時時間
                       if (redisLock.hasExists(couponLock)){
                           //續命
                           boolean b = redisLock.setExpireTime(couponLock, EXPIRETIME);
                           System.out.printf("續命"+ b);

                       }
                   }
               });

                //執行業務
                businessWork();

            }  finally {
                //無論成功失敗都取解鎖
                boolean b = redisLock.releaseLock(couponLock);
                System.out.printf("解鎖"+ b);
            }

        } else {
            //加鎖失敗 重試 自旋
            try {
                Thread.sleep(200);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return getRedisLock();
        }

    }

    public void businessWork(){
        System.out.printf("這裡執行業務程式碼!");
    }


}

 

 

redis工具類

import org.apache.commons.lang.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisCommands;

import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

/**
 * @author yuerli
 * @Date 2020/7/7
 * 通過給set NX(一個有鎖,其他執行緒不能再獲取鎖),PX(設定鎖的自動過期時間) 進行保證redis的值以及過期時間的原子性
 * 通過給鎖設定一個擁有者的標識,即每次在獲取鎖的時候,生成一個隨機不唯一的串放入當前執行緒,釋放鎖的時候先去判斷對應的值是否和執行緒中的值相同(使用lua指令碼)
 * 避免刪除了其他鎖
 */

@Component
public class RedisLock {
    private RedisTemplate<Object, Object> redisTemplate = new RedisTemplate<>();

    private String UNLOCK_LUA;

    private ThreadLocal<String> lockFlag = new ThreadLocal<String>();

    @Autowired
    public RedisLock(RedisTemplate<Object, Object> redisTemplate)
    {
        // 通過Lua指令碼來達到釋放鎖的原子性
        if("".equals(this.UNLOCK_LUA) || this.UNLOCK_LUA==null )
        {
            StringBuilder sb = new StringBuilder();
            sb.append("if redis.call(\"get\",KEYS[1]) == ARGV[1] ");
            sb.append("then ");
            sb.append("    return redis.call(\"del\",KEYS[1]) ");
            sb.append("else ");
            sb.append("    return 0 ");
            sb.append("end ");
            this.UNLOCK_LUA = sb.toString();
        }
        this.redisTemplate=redisTemplate;
    }

    public boolean lock(String key, long expire, int retryTimes, long sleepMillis) {
        boolean result = setRedis(key, expire);
        // 如果獲取鎖失敗,按照傳入的重試次數進行重試
        while((!result) && retryTimes--> 0){
            try {
                Thread.sleep(sleepMillis);
            } catch (InterruptedException e) {
                return false;
            }
            result = setRedis(key, expire);
        }
        return result;
    }

    private boolean setRedis(String key, long expire) {
        //為了保證設定鎖和過期時間的兩個操作原子性 spring data 的 RedisTemplate當中沒有這樣的方法,但是jedis當中有這樣的原子操作的方法
        //需要通過RedisTemplate的execute方法獲取jedis裡操作命令物件
        // NX:表示只有當鎖定資源不存在的時候才能set成功。利用Redis的原子性,保證了只有第一個請求的執行緒才能獲得鎖,而後其他執行緒在鎖定資源釋放前都不能獲取鎖
        // PX:expire表示鎖定的資源的自動過期時間,單位是毫秒。具體過期時間根據實際場景而定。

        //通過set NX,PX的命令設定保證了Redis值和自動過期時間的原子性,避免在呼叫setIfAbsent方法的時候執行緒掛掉,沒有設定過期時間而導致死鎖,使得鎖不能釋放
        try {
            String result = redisTemplate.execute(new RedisCallback<String>() {
                @Override
                public String doInRedis(RedisConnection connection) throws DataAccessException {
                    JedisCommands commands = (JedisCommands) connection.getNativeConnection();
                    String uuid = UUID.randomUUID().toString();
                    lockFlag.set(uuid); // 鎖定的資源

                    return commands.set(key, uuid, "NX", "PX", expire);
                }
            });
            return !StringUtils.isEmpty(result);
        } catch (Exception e) {
            System.out.println(e.getMessage());
        }
        return false;
    }

    /*上面的方法通過設定set的NX,PX命令保證了Redis值和自動過期時間的原子性,但是還有一個問題是如果執行緒T1獲取鎖,但是在處理T1的業務時候,
    由於某些原因阻塞了較長時間,這個時候設定的過期時間到了,執行緒T2獲取了鎖,執行緒T1操作完後釋放了鎖(釋放了T2的鎖)
    所以也就是說T2的執行緒上面沒有提供鎖的保護機制。因此需要給鎖定一個擁有者的標識,即每次在獲取鎖的時候,生成一個隨機不唯一的串放入當前執行緒,
    釋放鎖的時候先去判斷對應的值是否和執行緒中的值相同。*/
    public boolean releaseLock(String key) {
        // 釋放鎖的時候,有可能因為持鎖之後方法執行時間大於鎖的有效期,此時有可能已經被另外一個執行緒持有鎖,所以不能直接刪除
        try {
            List<String> keys = new ArrayList<String>();
            keys.add(key);
            List<String> args = new ArrayList<String>();
            args.add(lockFlag.get());
            // 使用lua指令碼刪除redis中匹配value的key,可以避免由於方法執行時間過長而redis鎖自動過期失效的時候誤刪其他執行緒的鎖
            // spring自帶的執行指令碼方法中,叢集模式直接丟擲不支援執行指令碼的異常,所以只能拿到原redis的connection來執行指令碼
            Long result = redisTemplate.execute(new RedisCallback<Long>() {
                public Long doInRedis(RedisConnection connection) throws DataAccessException {
                    Object nativeConnection = connection.getNativeConnection();
                    // 叢集模式和單機模式雖然執行指令碼的方法一樣,但是沒有共同的介面,所以只能分開執行
                    // 叢集模式
                    if (nativeConnection instanceof JedisCluster) {
                        return (Long) ((JedisCluster) nativeConnection).eval(UNLOCK_LUA, keys, args);
                    }
                    // 單機模式
                    else if (nativeConnection instanceof Jedis) {
                        return (Long) ((Jedis) nativeConnection).eval(UNLOCK_LUA, keys, args);
                    }
                    return 0L;
                }
            });
            return result != null && result > 0;
        } catch (Exception e) {
            e.printStackTrace();
        }
        return false;
    }

    /**
     * 獲取過期時間
     * @param key
     * @return
     */
    public Long getExpireTime(String key) {
        Long expire1 = redisTemplate.getExpire(key);
        return expire1;
    }

    /**
     * 重新設定過期時間
     * @param key
     * @return
     */
    public boolean setExpireTime(String key, long expire) {

        Boolean expire1 = redisTemplate.expire(key, expire, TimeUnit.MILLISECONDS);
        return expire1;
    }


    /**
     * 判斷是否存在
     * @param key
     * @return
     */
    public boolean hasExists(String key) {
        boolean exists = redisTemplate.hasKey(key);
        return exists;
    }

}

 

redis做冪等

//利用redis做冪等 
//同一個key2秒之內只能觸發一次
if(!redisLock.lock("redisKey",2000,0,0) ) {
     System.out.printf("請勿重複提交");
 }

 

寫的有錯的地方,請大家指正,多多包涵!