1. 程式人生 > 其它 >Redis應用問題解決

Redis應用問題解決

一、快取穿透

1.1 問題描述

​ key對應的資料在資料來源並不存在,每次針對此key的請求從快取獲取不到,請求都會壓到資料來源,從而可能壓垮資料來源。比如用一個不存在的使用者id獲取使用者資訊,不論快取還是資料庫都沒有,若黑客利用此漏洞進行攻擊可能壓垮資料庫。

1.2 解決方案

​ 一個一定不存在快取及查詢不到的資料,由於快取是不命中時被動寫的,並且出於容錯考慮,如果從儲存層查不到資料則不寫入快取,這將導致這個不存在的資料每次請求都要到儲存層去查詢,失去了快取的意義。

解決方案:

(1) 對空值快取:如果一個查詢返回的資料為空(不管是資料是否不存在),我們仍然把這個空結果(null)進行快取,設定空結果的過期時間會很短,最長不超過五分鐘

(2) 設定可訪問的名單(白名單):

使用bitmaps型別定義一個可以訪問的名單,名單id作為bitmaps的偏移量,每次訪問和bitmap裡面的id進行比較,如果訪問id不在bitmaps裡面,進行攔截,不允許訪問。

(3) 採用布隆過濾器:(布隆過濾器(Bloom Filter)是1970年由布隆提出的。它實際上是一個很長的二進位制向量(點陣圖)和一系列隨機對映函式(雜湊函式)。

布隆過濾器可以用於檢索一個元素是否在一個集合中。它的優點是空間效率和查詢時間都遠遠超過一般的演算法,缺點是有一定的誤識別率和刪除困難。)

將所有可能存在的資料雜湊到一個足夠大的bitmaps中,一個一定不存在的資料會被 這個bitmaps攔截掉,從而避免了對底層儲存系統的查詢壓力。

(4) 進行實時監控:當發現Redis的命中率開始急速降低,需要排查訪問物件和訪問的資料,和運維人員配合,可以設定黑名單限制服務

二、快取擊穿

2.1 問題描述

​ key對應的資料存在,但在redis中過期,此時若有大量併發請求過來,這些請求發現快取過期一般都會從後端DB載入資料並回設到快取,這個時候大併發的請求可能會瞬間把後端DB壓垮。

2.2 解決方案

​ key可能會在某些時間點被超高併發地訪問,是一種非常“熱點”的資料。這個時候,需要考慮一個問題:快取被“擊穿”的 問題。

解決問題:

(1)預先設定熱門資料:**在redis高峰訪問之前,把一些熱門資料提前存入到redis裡面,加大這些熱門資料key的時長

(2)實時調整:**現場監控哪些資料熱門,實時調整key的過期時長

(3)使用鎖:

(1) 就是在快取失效的時候(判斷拿出來的值為空),不是立即去load db。

(2) 先使用快取工具的某些帶成功操作返回值的操作(比如Redis的SETNX)去set一個mutex key

(3) 當操作返回成功時,再進行load db的操作,並回設快取,最後刪除mutex key;

(4) 當操作返回失敗,證明有執行緒在load db,當前執行緒睡眠一段時間再重試整個get快取的方法。

三、快取雪崩

3.1 問題描述

​ key對應的資料存在,但在redis中過期,此時若有大量併發請求過來,這些請求發現快取過期一般都會從後端DB載入資料並回設到快取,這個時候大併發的請求可能會瞬間把後端DB壓垮。
​ 快取雪崩與快取擊穿的區別在於這裡針對很多key快取,前者則是某一個key

正常訪問

快取失效瞬間

3.2 解決方案

快取失效時的雪崩效應對底層系統的衝擊非常可怕!

解決方案:

(1) 構建多級快取架構:nginx快取 + redis快取 +其他快取(ehcache等)

(2) 使用鎖或佇列:

用加鎖或者佇列的方式保證來保證不會有大量的執行緒對資料庫一次性進行讀寫,從而避免失效時大量的併發請求落到底層儲存系統上。不適用高併發情況

(3) 設定過期標誌更新快取:

記錄快取資料是否過期(設定提前量),如果過期會觸發通知另外的執行緒在後臺去更新實際key的快取。

(4) 將快取失效時間分散開:

比如我們可以在原有的失效時間基礎上增加一個隨機值,比如1-5分鐘隨機,這樣每一個快取的過期時間的重複率就會降低,就很難引發集體失效的事件。

四、分散式鎖

4.1 問題描述

​ 隨著業務發展的需要,原單體單機部署的系統被演化成分散式集群系統後,由於分散式系統多執行緒、多程序並且分佈在不同機器上,這將使原單機部署情況下的併發控制鎖策略失效,單純的Java API並不能提供分散式鎖的能力。為了解決這個問題就需要一種跨JVM的互斥機制來控制共享資源的訪問,這就是分散式鎖要解決的問題!

分散式鎖主流的實現方案:

  1. 基於資料庫實現分散式鎖
  2. 基於快取(Redis等)
  3. 基於Zookeeper

每一種分散式鎖解決方案都有各自的優缺點:

  1. 效能:redis最高
  2. 可靠性:zookeeper最高

這裡,我們就基於redis實現分散式鎖。

4.2 解決方案:使用redis實現分散式鎖

redis:命令

# set sku:1:info “OK” NX PX 10000

EX second :設定鍵的過期時間為 second 秒。 SET key value EX second 效果等同於 SETEX key second value 。

PX millisecond :設定鍵的過期時間為 millisecond 毫秒。 SET key value PX millisecond 效果等同於 PSETEX key millisecond value 。

NX :只在鍵不存在時,才對鍵進行設定操作。 SET key value NX 效果等同於 SETNX key value 。

XX :只在鍵已經存在時,才對鍵進行設定操作。

  1. 多個客戶端同時獲取鎖(setnx)
  2. 獲取成功,執行業務邏輯{從db獲取資料,放入快取},執行完成釋放鎖(del)
  3. 其他客戶端等待重試

編寫程式碼:

@GetMapping("testLock")
public void testLock(){
    //1獲取鎖,setne
    Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", "111");
    //2獲取鎖成功、查詢num的值
    if(lock){
        Object value = redisTemplate.opsForValue().get("num");
        //2.1判斷num為空return
        if(StringUtils.isEmpty(value)){
            return;
        }
        //2.2有值就轉成成int
        int num = Integer.parseInt(value+"");
        //2.3把redis的num加1
        redisTemplate.opsForValue().set("num", ++num);
        //2.4釋放鎖,del
        redisTemplate.delete("lock");

    }else{
        //3獲取鎖失敗、每隔0.1秒再獲取
        try {
            Thread.sleep(100);
            testLock();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

重啟,服務叢集,通過閘道器壓力測試:

ab -n 1000 -c 100 http://192.168.140.1:8080/test/testLock

檢視redis中num的值:

新問題:setnx剛好獲取到鎖,業務邏輯出現異常,導致鎖無法釋放

解決:設定過期時間,自動釋放鎖。

4.3 優化之設定鎖的過期時間

設定過期時間有兩種方式:

  1. 首先想到通過expire設定過期時間(缺乏原子性:如果在setnx和expire之間出現異常,鎖也無法釋放)
  2. 在set時指定過期時間(推薦)

壓力測試肯定也沒有問題。自行測試

問題:可能會釋放其他伺服器的鎖。

場景:如果業務邏輯的執行時間是7s。執行流程如下

  1. index1業務邏輯沒執行完,3秒後鎖被自動釋放。
  2. index2獲取到鎖,執行業務邏輯,3秒後鎖被自動釋放。
  3. index3獲取到鎖,執行業務邏輯
  4. index1業務邏輯執行完成,開始呼叫del釋放鎖,這時釋放的是index3的鎖,導致index3的業務只執行1s就被別人釋放。

最終等於沒鎖的情況。

解決:setnx獲取鎖時,設定一個指定的唯一值(例如:uuid);釋放前獲取這個值,判斷是否自己的鎖

4.4 優化之UUID防誤刪

問題:刪除操作缺乏原子性。

場景:

  1. index1執行刪除時,查詢到的lock值確實和uuid相等

    uuid=v1

    set(lock,uuid);

  1. index1執行刪除前,lock剛好過期時間已到,被redis自動釋放

    在redis中沒有了lock,沒有了鎖。

  1. index2獲取了lock

    index2執行緒獲取到了cpu的資源,開始執行方法

    uuid=v2

    set(lock,uuid);

  2. index1執行刪除,此時會把index2的lock刪除

    index1 因為已經在方法中了,所以不需要重新上鎖。index1有執行的許可權。index1已經比較完成了,這個時候,開始執行

​ 刪除的index2的鎖!

4.5優化之LUA指令碼保證刪除的原子性

@GetMapping("testLockLua")
public void testLockLua() {
    //1 宣告一個uuid ,將做為一個value 放入我們的key所對應的值中
    String uuid = UUID.randomUUID().toString();
    //2 定義一個鎖:lua 指令碼可以使用同一把鎖,來實現刪除!
    String skuId = "25"; // 訪問skuId 為25號的商品 100008348542
    String locKey = "lock:" + skuId; // 鎖住的是每個商品的資料

    // 3 獲取鎖
    Boolean lock = redisTemplate.opsForValue().setIfAbsent(locKey, uuid, 3, TimeUnit.SECONDS);

    // 第一種: lock 與過期時間中間不寫任何的程式碼。
    // redisTemplate.expire("lock",10, TimeUnit.SECONDS);//設定過期時間
    // 如果true
    if (lock) {
        // 執行的業務邏輯開始
        // 獲取快取中的num 資料
        Object value = redisTemplate.opsForValue().get("num");
        // 如果是空直接返回
        if (StringUtils.isEmpty(value)) {
            return;
        }
        // 不是空 如果說在這出現了異常! 那麼delete 就刪除失敗! 也就是說鎖永遠存在!
        int num = Integer.parseInt(value + "");
        // 使num 每次+1 放入快取
        redisTemplate.opsForValue().set("num", String.valueOf(++num));
        /*使用lua指令碼來鎖*/
        // 定義lua 指令碼
        String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
        // 使用redis執行lua執行
        DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
        redisScript.setScriptText(script);
        // 設定一下返回值型別 為Long
        // 因為刪除判斷的時候,返回的0,給其封裝為資料型別。如果不封裝那麼預設返回String 型別,
        // 那麼返回字串與0 會有發生錯誤。
        redisScript.setResultType(Long.class);
        // 第一個要是script 指令碼 ,第二個需要判斷的key,第三個就是key所對應的值。
        redisTemplate.execute(redisScript, Arrays.asList(locKey), uuid);
    } else {
        // 其他執行緒等待
        try {
            // 睡眠
            Thread.sleep(1000);
            // 睡醒了之後,呼叫方法。
            testLockLua();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

lua指令碼解釋

4.6總結

// 加鎖
// 1. 從redis中獲取鎖,set k1 v1 px 20000 nx
String uuid = UUID.randomUUID().toString();
Boolean lock = this.redisTemplate.opsForValue()
      .setIfAbsent("lock", uuid, 2, TimeUnit.SECONDS);
// 使用lua釋放鎖
// 2. 釋放鎖 del
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
// 設定lua指令碼返回的資料型別
DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
// 設定lua指令碼返回型別為Long
redisScript.setResultType(Long.class);
redisScript.setScriptText(script);
redisTemplate.execute(redisScript, Arrays.asList("lock"),uuid);
// 重試
Thread.sleep(500);
testLock();

為了確保分散式鎖可用,我們至少要確保鎖的實現同時滿足以下四個條件

  • - 互斥性。在任意時刻,只有一個客戶端能持有鎖。
  • - 不會發生死鎖。即使有一個客戶端在持有鎖的期間崩潰而沒有主動解鎖,也能保證後續其他客戶端能加鎖。
  • - 解鈴還須繫鈴人。加鎖和解鎖必須是同一個客戶端,客戶端自己不能把別人加的鎖給解了。
  • - 加鎖和解鎖必須具有原子性。