1. 程式人生 > >Redis形式的分散式鎖的實現

Redis形式的分散式鎖的實現

Redis形式的分散式鎖

1:為什麼要有分散式鎖

1.1:鎖介紹

執行緒鎖:主要用來給方法、程式碼塊加鎖。當某個方法或程式碼使用鎖,在同一時刻僅有一個執行緒執行該方法或該程式碼段。執行緒鎖只在同一JVM中有效果,因為執行緒鎖的實現在根本上是依靠執行緒之間共享記憶體實現的,比如synchronized是共享物件頭,顯示鎖Lock是共享某個變數(state)。

程序鎖:為了控制同一作業系統中多個程序訪問某個共享資源,因為程序具有獨立性,各個程序無法訪問其他程序的資源,因此無法通過synchronized等執行緒鎖實現程序鎖。

分散式鎖:當多個程序不在同一個系統中,用分散式鎖控制多個程序對資源的訪問。

1.2:分散式鎖使用場景

執行緒間併發問題和程序間併發問題都是可以通過分散式鎖解決的,但不建議這樣做!

例如一個服務之間的多執行緒併發問題的時候,雖然也是可以使用分散式鎖的,但是這個做的話一般比起直接使用執行緒鎖會更加的消耗資源,例如redis形式的分散式鎖的話,就會存在對redis進行操作的一些效能損耗

例如下面的這個場景,執行緒A和執行緒B都共享某個變數X。

如果是單機情況下(單JVM),執行緒之間共享記憶體,只要使用執行緒鎖就可以解決併發問題。

如果是分散式情況下(多JVM),執行緒A和執行緒B很可能不是在同一JVM中,這樣執行緒鎖就無法起到作用了,這時候就要用到分散式鎖來解決。

2:分散式鎖的實現原理

主要就是用到了redis裡面的setnx函式,呼叫這個函式會設定一個鍵值對到redis裡面,如果這個鍵的值在redis中已經存在了,這個時候就不會繼續設定進去返回false,如果沒有的話才會設定進去,然後返回true,而由於redis是單執行緒的,那麼當進行這個操作的時候其他的程序是不能進行操作的,這個時候可以通過判斷返回的值是否是true,來判斷某個程序是否獲取了這個鎖操作許可權,如果true代表某個程序已經對當前的資源進行操作,這個時候其他的程序肯定會返回false,然後對於這些返回false的進行可以再多次嘗試進行所獲取的操作,而對於已經成功的獲取某個資源的操作許可權的程序則可以在獲取鎖的時候進行相應的業務操作,然後等到業務操作完成之後釋放鎖(必須保證鎖被釋放掉),而在釋放鎖之後,那麼其他的程序在呼叫setnx函式的又能返回true,然後接著下一個程序進行獲取鎖,然後再次進行相關的操作

3:具體的實現

3.1:簡答的redis的分散式鎖的實現流程,下面是我現在理解的redis鎖的獲取、釋放鎖的流程

這裡寫圖片描述

3.2:下面這個是進行上鎖以及解鎖操作的實現類

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Repository;

@Repository
public class RedisLock {
    @Autowired
    RedisServiceInterface redisService;
    private static final Logger logger = LoggerFactory.getLogger(RedisLock.class);

    public RedisLock() {
    }

    public boolean tryLock(EnumRedisLockType lockType, String id) {
        try {
            String key = lockType.name() + "_" + id;
            boolean result = this.redisService.exclusiveSetWithExpire(key, 1);

            int count;
            for(count = 1; !result; result = this.redisService.exclusiveSetWithExpire(key, 1)) {
                ++count;
                if (count >= 20) {
                    logger.error(Thread.currentThread().getName() + ", 命令" + key + "獲取鎖失敗,放棄");
                    break;
                }

                logger.error(Thread.currentThread().getName() + ", 命令" + key + "佇列執行嘗試超過指定次數,啟用新一輪重試");
                Thread.sleep(300L);
            }

            if (count < 20) {
                logger.info(Thread.currentThread().getName() + ", 獲取鎖:" + key);
                return true;
            }
        } catch (Exception var6) {
            logger.error("Fatal error in tryLock: ", var6);
        }

        return false;
    }

    public void releaseLock(EnumRedisLockType lockType, String id) {
        String key = lockType.name() + "_" + id;
        logger.info(Thread.currentThread().getName() + ", 釋放鎖:" + key);
        this.redisService.removeValue(key);
    }
}

3.3:下面的這個則是redis操作的一些額外的封裝,基於spring的RedisTemplate進行進一步封裝

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.RedisScriptingCommands;
import org.springframework.data.redis.core.ListOperations;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.SetOperations;
import org.springframework.data.redis.core.ValueOperations;
import org.springframework.stereotype.Repository;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;

/**
 * 預設redis操作服務類
 */
@Repository
public class DefaultRedisService implements RedisServiceInterface {
    private final RedisTemplate<String, String> redisTemplate;
    private Logger logger = LoggerFactory.getLogger(this.getClass());

    @Autowired
    public DefaultRedisService(RedisTemplate<String, String> redisTemplate) {
        this.redisTemplate = redisTemplate;
    }

    /**
     * 在某段時間後失效
     * @param key    key
     * @param v    value
     * @param time time
     * @return boolean
     */
    @Override
    public boolean expireValue(String key, String v, long time) {
        try {
            ValueOperations<String, String> valueOps = redisTemplate.opsForValue();
            valueOps.set(key, v);
            if (time > 0) {
                redisTemplate.expire(key, time, TimeUnit.SECONDS);
                return true;
            }
        } catch (Throwable t) {
            logger.error("快取[" + key + "]失敗, value[" + v + "]", t);
        }
        return false;
    }

    @Override
    public boolean exclusiveSetWithExpire(String key, Object value) {
        return exclusiveSetWithExpire(key, value, 10, 9);
    }


    /**
     * 快取一個key-value鍵值對,如果key已經存在,休眠5ms重試retryCount次,
     * 如果成功,設定該值的超時時間為expiredSeconds
     * 預設重試10次,如果始終失敗,該函式需要佔用執行緒約100ms時間
     * @param key
     * @param value
     * @param expiredSeconds 等待超時的時間
     * @return 是否設定成功
     */
    @Override
    public boolean exclusiveSetWithExpire(String key, Object value, int retryCount, int expiredSeconds) {
        if (retryCount <= 0) {
            retryCount = 10;
        }

        int count = 0;
        boolean result = false;
        try {
            ValueOperations<String, String> valueOps = redisTemplate.opsForValue();
            while (count <= retryCount) {
                // Redis一個來回大約佔用5ms
                result = valueOps.setIfAbsent(key, value.toString());
                if (result) {
                    break;
                }

                // 如果沒有成功,休眠5ms(相當於另一個操作例項操作redis的時間)
                Thread.sleep(5);
                count++;
            }
            if (result) {
                valueOps.getOperations().expire(key, expiredSeconds, TimeUnit.SECONDS);
            }
        } catch (Exception e) {
            logger.error("redis exception :", e);
        }
        return result;
    }

    /**
     * 快取value操作
     * @param key key
     * @param v value
     * @return boolean
     */
    @Override
    public boolean cacheValue(String key, String v) {
        return expireValue(key, v, -1);
    }

    @Override
    public boolean containsKey(String key) {
        try {
            return redisTemplate.hasKey(key);
        } catch (Throwable t) {
            logger.error("判斷快取存在失敗key[" + key + ", Codeor[" + t + "]");
        }
        return false;
    }

    /**
     * 獲取快取
     *
     * @param key key
     * @return string
     */
    @Override
    public String getValue(String key) {
        try {
            ValueOperations<String, String> valueOps = redisTemplate.opsForValue();
            return valueOps.get(key);
        } catch (Throwable t) {
            logger.error("獲取快取失敗key[" + key + ", Codeor[" + t + "]");
        }
        return null;
    }

    /**
     * 移除快取
     *
     * @param key key
     * @return boolean
     */
    @Override
    public boolean removeValue(String key) {
        return remove(key);
    }

    @Override
    public boolean removeSet(String key) {
        return remove(key);
    }

    @Override
    public boolean removeList(String key) {
        return remove(key);
    }


    /**
     * 如果time>0 在某段時間後失效
     * @param key    key
     * @param v    value
     * @param time time
     * @return boolean
     */
    @Override
    public boolean expireSet(String key, String v, long time) {
        try {
            SetOperations<String, String> valueOps = redisTemplate.opsForSet();
            valueOps.add(key, v);
            if (time > 0) redisTemplate.expire(key, time, TimeUnit.SECONDS);
            return true;
        } catch (Throwable t) {
            logger.error("快取[" + key + "]失敗, value[" + v + "]", t);
        }
        return false;
    }

    /**
     * 快取set
     *
     * @param key key
     * @param v value
     * @return boolean
     */
    @Override
    public boolean cacheSet(String key, String v) {
        return expireSet(key, v, -1);
    }

    /**
     * 快取set
     *
     * @param key    key
     * @param v    value
     * @param time time
     * @return boolean
     */
    @Override
    public boolean expireSet(String key, Set<String> v, long time) {
        try{
            SetOperations<String, String> setOps = redisTemplate.opsForSet();
            setOps.add(key, v.toArray(new String[v.size()]));
            if (time > 0) redisTemplate.expire(key, time, TimeUnit.SECONDS);
            return true;
        } catch (Throwable t) {
            logger.error("快取[" + key + "]失敗, value[" + v + "]", t);
        }
        return false;
    }

    /**
     * 快取set
     *
     * @param key key
     * @param v value
     * @return boolean
     */
    @Override
    public boolean cacheSet(String key, Set<String> v) {
        return expireSet(key, v, -1);
    }

    /**
     * 獲取快取set資料
     *
     * @param key key
     * @return set
     */
    @Override
    public Set<String> getSet(String key) {
        try {
            SetOperations<String, String> setOps = redisTemplate.opsForSet();
            return setOps.members(key);
        } catch (Throwable t) {
            logger.error("獲取set快取失敗key[" + key + ", Codeor[" + t + "]");
        }
        return null;
    }

    /**
     * list快取
     *
     * @param key    key
     * @param v    value
     * @param time time
     * @return boolean
     */
    @Override
    public boolean expireList(String key, String v, long time) {
        try {
            ListOperations<String, String> listOps = redisTemplate.opsForList();
            listOps.rightPush(key, v);
            if (time > 0) redisTemplate.expire(key, time, TimeUnit.SECONDS);
            return true;
        } catch (Throwable t) {
            logger.error("快取[" + key + "]失敗, value[" + v + "]", t);
        }
        return false;
    }

    /**
     * 快取list
     *
     * @param key key
     * @param v value
     * @return boolean
     */
    @Override
    public boolean cacheList(String key, String v) {
        return expireList(key, v, -1);
    }

    /**
     * 快取list
     *
     * @param key    key
     * @param v    value
     * @param time time
     * @return boolean
     */
    @Override
    public boolean expireList(String key, List<String> v, long time) {
        try {
            ListOperations<String, String> listOps = redisTemplate.opsForList();
            listOps.rightPushAll(key, v);
            if (time > 0) redisTemplate.expire(key, time, TimeUnit.SECONDS);
            return true;
        } catch (Throwable t) {
            logger.error("快取[" + key + "]失敗, value[" + v + "]", t);
        }
        return false;
    }

    /**
     * 快取list
     *
     * @param key key
     * @param v value
     * @return boolean
     */
    @Override
    public boolean cacheList(String key, List<String> v) {
        return expireList(key, v, -1);
    }

    /**
     * 獲取list快取
     *
     * @param key     key
     * @param start start
     * @param end   end
     * @return list
     */
    @Override
    public List<String> getList(String key, long start, long end) {
        try {
            ListOperations<String, String> listOps = redisTemplate.opsForList();
            return listOps.range(key, start, end);
        } catch (Throwable t) {
            logger.error("獲取list快取失敗key[" + key + ", Codeor[" + t + "]");
        }
        return null;
    }

    /**
     * 獲取總條數, 可用於分頁
     *
     * @param key key
     * @return long
     */
    @Override
    public long getListSize(String key) {
        try {
            ListOperations<String, String> listOps = redisTemplate.opsForList();
            return listOps.size(key);
        } catch (Throwable t) {
            logger.error("獲取list長度失敗key[" + key + "], Codeor[" + t + "]");
        }
        return 0;
    }

    /**
     * 獲取總條數, 可用於分頁
     *
     * @param listOps listOps
     * @param key     key
     * @return long
     */
    @Override
    public long getListSize(ListOperations<String, String> listOps, String key) {
        try {
            return listOps.size(key);
        } catch (Throwable t) {
            logger.error("獲取list長度失敗key[" + key + "], Codeor[" + t + "]");
        }
        return 0;
    }

    /**
     * 移除list快取
     *
     * @param key key
     * @return boolean
     */
    @Override
    public boolean removeOneOfList(String key) {
        try {
            ListOperations<String, String> listOps = redisTemplate.opsForList();
            listOps.rightPop(key);
            return true;
        } catch (Throwable t) {
            logger.error("移除list快取失敗key[" + key + ", Codeor[" + t + "]");
        }
        return false;
    }

    /**
     * 移除快取
     *
     * @param key key
     * @return boolean
     */
    private boolean remove(String key) {
        try {
            redisTemplate.delete(key);
            return true;
        } catch (Throwable t) {
            logger.error("獲取快取失敗key[" + key + ", Codeor[" + t + "]");
        }
        return false;
    }
}