1. 程式人生 > 其它 >分散式儲存-Redis實戰&常見問題解決

分散式儲存-Redis實戰&常見問題解決

分散式儲存-Redis實戰&常見問題解決

前面講了一些Redis的使用場景和資料型別,本章會聊到:

  • 一個抽獎的例子來闡述redis如何被應用去解決實際問題(程式碼有點多,不適合放在博文中,如需請留言,我可以傳送),並且會使用到前面併發模組聊的CountDownLatch和springBoot中的事件去非同步快取資料,和非同步等待。
  • 常見的一些使用redis帶來的問題,比如快取穿透、快取雪崩、以及資料庫和redis中資料不一致的問題,和布隆過濾器的一些底層思想(點陣圖)
  • 常用的redis客戶端和他們的底層實現
  • 自己動手實現一個Redisclient

Redis抽獎實現

整體流程:

  設計思路:當一個請求過來的時候,會攜帶一個活動id

  • 快取獎品資訊:我們會使用這個活動id去查詢資料庫,並把查詢到的資料快取在redis中(這個步驟是非同步的,我們用一個CountDownLatch去對他進行控制,快取完成後,給count-1,後續需要redis中資料的流程就可以繼續處理)
  • 開始抽獎:這是一個簡單的區間演算法,在lottery_item中有對於每個獎品的概率比。從redis中拿到所有的獎項,如果沒有則從資料庫中獲取(因為上面快取的那一步驟是非同步的,可能這個時候還有快取成功)。我們根據隨機數去落到我們設定好的概率區間中(區間越大,抽到某個獎品的概率越大
  • 發放獎品:我們的獎品型別不同 (lottery_prize#prize_type)根據不同獎品的型別,走不同的邏輯,比如我們有的獎品要傳送簡訊,有的獎品不要,我們就定一個模板方法,然後不同型別的獎品走不同型別的傳送邏輯。
    • 扣減庫存:
      我們前面已經非同步快取了資料到redis中,那這裡直接使用incur的命令(之前說,這個命令是原子的,所以不會產生安全問題),我們可以先扣減redis中,然後進行資料的內存扣除

SpringBoot中使用方法(Lettuce)

<dependency>
         <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
  redis:
    port: 6379
    host: ip
    lettuce:
      pool:
        max-active: -1
        max-idle: 2000
        max-wait: -1
        min-idle: 1
        time-between-eviction-runs: 5000
@Autowired
 RedisTemplate<String,String> redisTemplate;

Redis的客戶端

常見的:Jedis、Redission、Lettuce(上面給的pom)

我們傳送一個命令(set n v)到redis上的時候,使用抓包工具發現

這裡的$*表示key的長度 ,比如:$3表示set的長度是3 *3 表示我們傳遞了三個引數給redis

那解析下來的命令就是:*3\r\n$3\r\nSET\r\n$1\r\nn\r\n$\r\nv,** 那是不是證明只要我們符合這樣的編碼協議就可以和redis交流了呢

定義get set 命令

public class CommandConstant {

    public static final String START="*";

    public static final String LENGTH="$";

    public static final String LINE="\r\n";

    //這裡提供兩個命令
    public enum CommandEnum{
        SET,
        GET
    }
}
View Code

封裝api

public class CustomerRedisClient {

    private CustomerRedisClientSocket customerRedisClientSocket;

    //連線的redis地址和埠
    public CustomerRedisClient(String host,int port) {
        customerRedisClientSocket=new CustomerRedisClientSocket(host,port);
    }

    //封裝一個api 傳送指令
    public String set(String key,String value){
        //傳遞給redis,同時格式化成redis認識的資料
        customerRedisClientSocket.send(convertToCommand(CommandConstant.CommandEnum.SET,key.getBytes(),value.getBytes()));
        return customerRedisClientSocket.read(); //在等待返回結果的時候,是阻塞的
    }

    //獲取指令
    public String get(String key){
        customerRedisClientSocket.send(convertToCommand(CommandConstant.CommandEnum.GET,key.getBytes()));
        return customerRedisClientSocket.read();
    }

    //這裡按照redis的要求格式化 就是前面抓包後拿到的格式 *3\r\n$3\r\nSET\r\n$1\r\nn\r\n$\r\nv
    public static String convertToCommand(CommandConstant.CommandEnum commandEnum,byte[]... bytes){
        StringBuilder stringBuilder=new StringBuilder();
        stringBuilder.append(CommandConstant.START).append(bytes.length+1).append(CommandConstant.LINE);
        stringBuilder.append(CommandConstant.LENGTH).append(commandEnum.toString().length()).append(CommandConstant.LINE);
        stringBuilder.append(commandEnum.toString()).append(CommandConstant.LINE);
        for (byte[] by:bytes){
            stringBuilder.append(CommandConstant.LENGTH).append(by.length).append(CommandConstant.LINE);
            stringBuilder.append(new String(by)).append(CommandConstant.LINE);
        }
        return stringBuilder.toString();
    }
}
View Code

連線redis

public class CustomerRedisClientSocket {

    //這裡可以使用nio
    private Socket socket;

    private InputStream inputStream;

    private OutputStream outputStream;

    public CustomerRedisClientSocket(String ip,int port){
        try {
            socket=new Socket(ip,port);
            inputStream=socket.getInputStream();
            outputStream=socket.getOutputStream();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    //傳送指令
    public void send(String cmd){
        try {
            outputStream.write(cmd.getBytes());
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    //讀取資料
    public String read(){
        byte[] bytes=new byte[1024];
        int count=0;
        try {
            count=inputStream.read(bytes);
        } catch (IOException e) {
            e.printStackTrace();
        }
        return new String(bytes,0,count);
    }
}
View Code

測試

public class MainClient {
    public static void main(String[] args) {
        CustomerRedisClient customerRedisClient=new CustomerRedisClient("ip",6379);

        System.out.println(customerRedisClient.set("customer","Define"));
        System.out.println(customerRedisClient.get("customer"));
    }
}
View Code

結果 +ok(redis返回的成功報文) $6(返回的value是6的長度)

根據上面我們自己實現的中介軟體,我們可以悟出,從這些層面選擇中介軟體:

  • 通訊層面的優化:當我們獲取返回結果的時候是阻塞的(我們自己實現的中介軟體)
  • 是否採用非同步通訊:多執行緒(效率高)
  • 針對key和value的優化 :傳遞的報文越小,傳遞的速度肯定更快
  • 連線的優化(連線池)

我們發現redisson是提供這些功能做的比較好的,整合網上很多例子,這裡不聊了。

使用redis中遇見的問題

資料庫和redis的資料一致性問題:實際上很難解決強一致性問題,常見有兩種操作

  • 先更新資料庫,再刪除快取(刪除快取就等於更新)推薦
    •  更新資料庫成功,但是刪除快取失敗
      • 當資料庫更新成功後,把更新redis的訊息放在mq中,這個時候一定能保證都更新成功。
      • 解析資料庫的binary log,然後更新快取
  • 先刪除快取,再更新資料庫(不推薦)
    • 刪除快取成功,更新資料庫失敗(看起來沒有什麼問題,但是看下面的場景):執行緒A先去刪除一個key,執行緒B去獲取這個Key,發現沒有資料,那他就去更新Redis快取,這個時候執行緒A去更新資料庫 。那就會導致資料庫的資料是最新的,但是快取不是最新的

快取雪崩

  • 原因大量的熱點資料同時失效,因為設定了相同的過期時間,剛好這個時候請求量又很大,那這個時候壓力就到了資料庫上,從而就導致了雪崩
    • 【方案】這是幾個設定過期的命令,(我們可以給key設定不同的過期時間,這樣就能有效地避免雪崩,或者熱點資料不設定過期時間
      • expire key seconds # 設定鍵在給定秒後過期 pexpire key milliseconds # 設定鍵在給定毫秒後過期 expireat key timestamp # 到達指定秒數時間戳之後鍵過期 pexpireat key timestamp # 到達指定毫秒數時間戳之後鍵過期
    • 【redis key 過期實現原理】想一下redis是如何實現過期的,如果我們儲存的資料庫十分巨大,redis怎麼精確的知道那個key過期了?並且對他進行刪除呢?
      • 想法:我們給去key每個key設定一個定時器,一個個進行輪詢。效能太差了!!
      • Redis對過期key的做法:
        • 儲存:實際上redis使用了一個hash的結構進行儲存,對你設定的過期的key單獨用一個value儲存了一個過期時間
        • 刪除
          • 被動刪除:當我們使用get命令的時候,他去查詢他儲存的我們傳遞的過期時間和電腦時間對比,如果過期,則進行刪除
          • 主動刪除:隨機抽取20個key,刪除這20key中已經過期的key,如果發現這20個key中有20%的key已經過期了,那麼就再次抽取20個key,用這個方式迴圈。

【快取穿透】:

  • 【原因】:Redis和資料庫中都不存在查詢的資料,那這就是一次無效查詢,如果有人偽造了很多請求,那可能會引發資料庫宕機,因為redis中沒有資料,請求肯定就請求到資料庫,這就叫快取穿透
  • 【方案】:使用布隆過濾器  
    • 流程】:
      • 專案在啟動的時候,把所有的資料載入到布隆過濾器中
      • 當客戶端有請求過來時,先到布隆過濾器中查詢一下當前訪問的key是否存在,如果布隆過 濾器中沒有該key,則不需要去資料庫查詢直接反饋即可
    • 【實現】:
      • 使用guava  
      • <dependency>
                <groupId>com.google.guava</groupId>
                <artifactId>guava</artifactId>
                <version>21.0</version>
        </dependency>
        View Code
      • 程式初始的時候載入資料到布隆過濾器中
      • @Slf4j
        @Component
        public class BloomFilterDataLoadApplicationRunner implements ApplicationRunner {
        
            @Autowired
            ICityService cityService;
        
            @Override
            public void run(ApplicationArguments args) {
                List<City> cities=cityService.list();
                //第一個引數指的是要儲存的資料,第二個資料是允許的容錯率
                BloomFilter<String> bloomFilter=BloomFilter.create(Funnels.stringFunnel(Charsets.UTF_8),100000000,0.03);
                cities.parallelStream().forEach(city-> bloomFilter.put(RedisKeyConstants.CITY_KEY+":"+city.getId()));
                BloomFilterCache.cityBloom=bloomFilter;
            }
        }
        View Code
      • 客戶端訪問時增加驗證
      •     @GetMapping("/bloom/{id}")
            public String filter(@PathVariable("id")Integer id){
                String key=RedisKeyConstants.CITY_KEY+":"+id;
                if(BloomFilterCache.cityBloom.mightContain(key)){
                    return redisTemplate.opsForValue().get(key).toString();
                }
                return "資料不存在";
            }
            public class BloomFilterCache {
        
            public static BloomFilter<String> cityBloom;
        }
        View Code
    • 剖析布隆過濾器:布隆過濾器是一種空間利用率極高的資料結構,他的底層實際上並不儲存我們快取的元素的內容,而是儲存快取元素的標記 都是0/1。比如:一個int型別是32位4個位元組,32位意味著我們可以儲存32個0或者1,那我現在如果要儲存32個條資料,只需要一個int型別,到底這是怎麼做到的?底層用到了點陣圖
    • 一個例子解釋點陣圖:
      • 現在有32位 【0000 0000 0000 00000000 0000 0000 0000】
      • 比如儲存5這個數字 ->5 的二進位制是101 【0000 0000 0000 00000000 0000 0010 1000】
      • 第二個數字是9 ->9的二進位制是 1001 【0000 0000 0000 00000000 0010 0110 1000】
    • 布隆過濾器引入了多個函式去生成hash數值,我們傳入的資料通過這些函式計算,則落入了這32位中。比如 有x y z 三個函式,我們傳入了一個的資料,通過這三個函式進行hash換算,落到了 32位中的5 6 9 ,那就把這5 6 9 這幾個地方變為1,當我們要查詢時候我們之前傳遞的資料是否存在的時候,再次用這些函式進行換算,如果相關位置是1 則說明資料存在,否則資料則不存在。
    • 解析布隆過濾器的引數:傳遞的100000000是我們要構建多少個int型別(一個int型別可以儲存32位),0.03是誤判率(誤判率指的就是對我們傳遞的資料進行hash換算的函式)
    • BloomFilter<String> bloomFilter=BloomFilter.create(Funnels.stringFunnel(Charsets.UTF_8),100000000,0.03);
    • 很多地方都運用了這種思想,比如
      • Redis的HyperLogLog bitmap protobuf中的zigzag壓縮演算法 執行緒池中的執行緒狀態和執行緒數量(高低位擴容我們之前聊過) ConcurrentHashMap中的資料遷移的執行緒數量儲存(線性探索我們也聊過)