1. 程式人生 > 其它 >Redis-分散式鎖(解決快取擊穿問題)

Redis-分散式鎖(解決快取擊穿問題)

一. 簡介

分散式鎖在很多場景中都非常的有用,分散式鎖是一個概念,實現他的方式有很多,本篇文章是基於Redis實現的單機分散式鎖。

主要解決多併發程式設計中由於鎖競爭而帶來的資料不一致的問題。

二. 應用場景

在本篇文章中主要解決Redis中快取擊穿問題。

併發的訪問一條資料,資料庫有,但是快取中不存在(沒人訪問這條資料或者Redis中資料剛好過期),導致一瞬間多個請求訪問資料庫,資料庫壓力增大,這類資料通常為熱點資料。

三. 模擬快取擊穿

以下程式模擬100個執行緒同時去訪問一條沒有快取的資料。

1. 業務程式碼(service層)

    @Override
    public Object listByRedis(String id) {
        HashMap<Object, Object> result = new HashMap<>();
        //通過布隆過濾器 解決快取穿透問題. 會有誤判 但是沒有關係 不會有太多誤判。
        if (!bloomFilter.isExist(id)){
            result.put("status", 400);
            result.put("msg", "非法訪問");
            return result;
        }
        //查詢快取
        Object redisData = redisTemplate.opsForValue().get(id);
        //是否命中
        if(redisData != null){
            //返回結果
            result.put("status", 200);
            result.put("msg", "快取命中");
            result.put("data", redisData);
            return result;
        }
        try {
            UserInfo userInfo = userInfoMapper.selectById(id);
            if (userInfo != null){
                redisTemplate.opsForValue().set(id, userInfo, 10, TimeUnit.MINUTES);
                result.put("status", 200);
                result.put("msg", "查詢資料庫");
                result.put("data", userInfo);
                return result;
            }else{
                result.put("status", 200);
                result.put("msg", "沒有資料");
                return result;
            }
        }finally {

        }
    }

2. 併發模擬

併發訪問id=1096這條資料

public class ReadTest {

    private static CountDownLatch countDownLatch = new CountDownLatch(99);

    @Test
    public void test() throws InterruptedException {
        TicketsRunBle ticketsRunBle = new TicketsRunBle();
        for (int i = 0;i<=99;i++){
            new Thread(ticketsRunBle, "視窗"+i).start();
            countDownLatch.countDown();
        }
        Thread.currentThread().join();
    }

    public class TicketsRunBle implements Runnable{

        @Override
        public void run() {
            try {
                countDownLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            RestTemplate restTemplate = new RestTemplate();
            R forObject = restTemplate.getForObject("http://localhost:8082/user?id=1096", R.class);
            System.out.println("結果:" + forObject);
        }
    }

}

3. 執行結果

擷取部分,都是資料庫查詢出來的,日誌列印也有Mybatis的記錄。

四. 單機Redis分散式鎖的實現

Redis分散式鎖原理上是使用Setnx命令實現:

SET resource_name my_random_value NX PX 30000。

這個命令僅在不存在key的時候才能被執行成功(NX選項),並且這個key有一個30秒的自動失效時間(PX屬性)。這個key的值是“my_random_value”(一個隨機值),這個值在所有的客戶端必須是唯一的,所有同一key的獲取者(競爭者)這個值都不能一樣。
當client嘗試獲取鎖時,我們將事先定義的key設定一個值,之後的client再設定時則會不成功。

釋放鎖時實現:

為什麼value要使用一個唯一的值,主要是為了更安全的釋放鎖,釋放鎖的時候使用指令碼告訴Redis:只有key存在並且儲存的值和我指定的值一樣才能告訴我刪除成功。可以通過以下Lua指令碼實現:

if redis.call("get",KEYS[1]) == ARGV[1] then
return redis.call("del",KEYS[1])
else
return 0
end

使用這種方式可以避免刪除別的Client獲得的鎖。舉個栗子:
客戶端A取得資源鎖,但是緊接著被一個其他操作阻塞了,當客戶端A執行完畢其他操作後要釋放鎖時,原來的鎖早已超時並且被Redis自動釋放,並且在這期間資源鎖又被客戶端B再次獲取到。如果僅使用DEL命令將key刪除,那麼這種情況就會把客戶端B的鎖給刪除掉。使用Lua指令碼就不會存在這種情況,因為指令碼僅會刪除value等於客戶端A的value的key(value相當於客戶端的一個簽名)。

本篇文章獲取鎖有一點不同,上面說的是設定一個固定的key,而本篇文章解決的問題是基於單條資料的一個併發查詢。

所以需要對單條資料的ID作為key進行加鎖,防止查詢同一條資料多次訪問資料庫。

1. 鎖實現:

/**
 * 自定義分散式鎖
 * 這裡主要實現對單條資料進行加鎖,通過id進行加鎖
 * 多個執行緒同時訪問該資料會阻塞
 * @author
 * @Date 2022/1/6
 */
@Component
public class RedisLock {

    private static JedisPool jedisPool;

    @Autowired
    public void setJedisPool(JedisPool jedisPool){
        RedisLock.jedisPool = jedisPool;
    }

    /**
     * 鎖健
     */
    private final static String KEY = "lock_key_";

    /**
     * 鎖過期時間
     */
    private final static long LOCK_EXPIRED = 30000;

    /**
     * 鎖競爭超時時間
     */
    private final static long LOCK_WAIT_TIME_OUT = 999999;

    /**
     * SET命令引數
     */
    static SetParams params = SetParams.setParams().nx().px(LOCK_EXPIRED);

    /**
     * ThreadLocal用於儲存某個執行緒共享變數:對於同一個static ThreadLocal
     * 不同的執行緒只能從中get,set到自己執行緒的副本
     */
    private static ThreadLocal<String> threadLocal = new ThreadLocal<>();

    /**
     * 嘗試獲取鎖
     * @param key
     * @return
     */
    public Boolean tryLock(String key){
        String value = UUID.randomUUID().toString();
        Jedis resource = jedisPool.getResource();
        long startTime = System.currentTimeMillis();
        try {
            for(;;){
                //SET命令返回OK,獲取鎖成功
                String set = resource.set(KEY.concat(key), value, params);
                if ("OK".equals(set)){
                    threadLocal.set(value);
                    return true;
                }
                //增加一個超時時間判斷
                if(System.currentTimeMillis() - startTime > LOCK_WAIT_TIME_OUT){
                    return false;
                }
                //休眠一段時間 遞迴呼叫
                try {
                    Thread.sleep(50);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }finally {
            resource.close();
        }
    }

    /**
     * 釋放鎖 通過lua指令碼實現
     * @param key
     * @return
     */
    public boolean unLock(String key){
        Jedis resource = null;
        try {
            resource = jedisPool.getResource();
            String script = "if redis.call('get', KEYS[1]) == ARGV[1] then" +
                    " return redis.call('del', KEYS[1]) " +
                    "else" +
                    " return 0 " +
                    "end";
            Object eval = resource.eval(script, Collections.singletonList(KEY.concat(key)), Collections.singletonList(threadLocal.get()));
            if ("1".equals(eval.toString())) {
                return true;
            }
            return false;
        }catch (Exception e){
            e.printStackTrace();
            return false;
        }finally {
            if (resource != null){
                resource.close();
            }
        }
    }
}

2. 業務程式碼(service層)

    @Override
    public Object listByRedis(String id) {
        HashMap<Object, Object> result = new HashMap<>();
        //通過布隆過濾器 解決快取穿透問題. 會有誤判 但是沒有關係 不會有太多誤判。
        if (!bloomFilter.isExist(id)){
            result.put("status", 400);
            result.put("msg", "非法訪問");
            return result;
        }
        //查詢快取
        Object redisData = redisTemplate.opsForValue().get(id);
        //是否命中
        if(redisData != null){
            //返回結果
            result.put("status", 200);
            result.put("msg", "快取命中");
            result.put("data", redisData);
            return result;
        }
        try {
            //新增分散式鎖,進來後在查詢一次快取,如果上一個執行緒已經查詢並且存入快取
            Boolean lock = redisLock.tryLock(id);
            if (!lock){
                result.put("status", 500);
                result.put("msg", "訪問超時,稍後再試");
                return result;
            }
            //查詢快取
            redisData = redisTemplate.opsForValue().get(id);
            //是否命中
            if(redisData != null){
                //返回結果
                result.put("status", 200);
                result.put("msg", "快取命中");
                result.put("data", redisData);
                return result;
            }
            UserInfo userInfo = userInfoMapper.selectById(id);
            if (userInfo != null){
                redisTemplate.opsForValue().set(id, userInfo, 10, TimeUnit.MINUTES);
                result.put("status", 200);
                result.put("msg", "查詢資料庫");
                result.put("data", userInfo);
                return result;
            }else{
                result.put("status", 200);
                result.put("msg", "沒有資料");
                return result;
            }
        }finally {
            redisLock.unLock(id);
        }
    }

3. 測試結果

列印日誌顯示,只會有一次資料庫查詢。

返回的結果除了第一次是查詢資料庫,後面的都是快取命中


五. 總結

該鎖的實現有很多不足之處,不斷了解學習的一個過程,可使用Redisson中實現的分散式鎖可用性更高。

本文來自部落格園,作者:EchoLv,轉載請註明原文連結:https://www.cnblogs.com/lvdeyinBlog/p/15774412.html