分散式儲存-Redis實戰&常見問題解決
阿新 • • 發佈:2021-08-01
分散式儲存-Redis實戰&常見問題解決
前面講了一些Redis的使用場景和資料型別,本章會聊到:
- 一個抽獎的例子來闡述redis如何被應用去解決實際問題(程式碼有點多,不適合放在博文中,如需請留言,我可以傳送),並且會使用到前面併發模組聊的CountDownLatch和springBoot中的事件去非同步快取資料,和非同步等待。
- 常見的一些使用redis帶來的問題,比如快取穿透、快取雪崩、以及資料庫和redis中資料不一致的問題,和布隆過濾器的一些底層思想(點陣圖)
- 常用的redis客戶端和他們的底層實現
- 自己動手實現一個Redisclient
Redis抽獎實現
整體流程:
設計思路:當一個請求過來的時候,會攜帶一個活動id
- 快取獎品資訊:我們會使用這個活動id去查詢資料庫,並把查詢到的資料快取在redis中(這個步驟是非同步的,我們用一個CountDownLatch去對他進行控制,快取完成後,給count-1,後續需要redis中資料的流程就可以繼續處理)
- 開始抽獎:這是一個簡單的區間演算法,在lottery_item中有對於每個獎品的概率比。從redis中拿到所有的獎項,如果沒有則從資料庫中獲取(因為上面快取的那一步驟是非同步的,可能這個時候還有快取成功)。我們根據隨機數去落到我們設定好的概率區間中(區間越大,抽到某個獎品的概率越大)
- 發放獎品:我們的獎品型別不同 (lottery_prize#prize_type)根據不同獎品的型別,走不同的邏輯,比如我們有的獎品要傳送簡訊,有的獎品不要,我們就定一個模板方法,然後不同型別的獎品走不同型別的傳送邏輯。
- 扣減庫存:
我們前面已經非同步快取了資料到redis中,那這裡直接使用incur的命令(之前說,這個命令是原子的,所以不會產生安全問題),我們可以先扣減redis中,然後進行資料的內存扣除
SpringBoot中使用方法(Lettuce)
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency>redis: port: 6379 host: ip lettuce: pool: max-active: -1 max-idle: 2000 max-wait: -1 min-idle: 1 time-between-eviction-runs: 5000@Autowired RedisTemplate<String,String> redisTemplate;
Redis的客戶端
常見的:Jedis、Redission、Lettuce(上面給的pom)
我們傳送一個命令(set n v)到redis上的時候,使用抓包工具發現
這裡的$*表示key的長度 ,比如:$3表示set的長度是3 *3 表示我們傳遞了三個引數給redis
那解析下來的命令就是:*3\r\n$3\r\nSET\r\n$1\r\nn\r\n$\r\nv,** 那是不是證明只要我們符合這樣的編碼協議就可以和redis交流了呢
定義get set 命令
public class CommandConstant { public static final String START="*"; public static final String LENGTH="$"; public static final String LINE="\r\n"; //這裡提供兩個命令 public enum CommandEnum{ SET, GET } }View Code封裝api
public class CustomerRedisClient { private CustomerRedisClientSocket customerRedisClientSocket; //連線的redis地址和埠 public CustomerRedisClient(String host,int port) { customerRedisClientSocket=new CustomerRedisClientSocket(host,port); } //封裝一個api 傳送指令 public String set(String key,String value){ //傳遞給redis,同時格式化成redis認識的資料 customerRedisClientSocket.send(convertToCommand(CommandConstant.CommandEnum.SET,key.getBytes(),value.getBytes())); return customerRedisClientSocket.read(); //在等待返回結果的時候,是阻塞的 } //獲取指令 public String get(String key){ customerRedisClientSocket.send(convertToCommand(CommandConstant.CommandEnum.GET,key.getBytes())); return customerRedisClientSocket.read(); } //這裡按照redis的要求格式化 就是前面抓包後拿到的格式 *3\r\n$3\r\nSET\r\n$1\r\nn\r\n$\r\nv public static String convertToCommand(CommandConstant.CommandEnum commandEnum,byte[]... bytes){ StringBuilder stringBuilder=new StringBuilder(); stringBuilder.append(CommandConstant.START).append(bytes.length+1).append(CommandConstant.LINE); stringBuilder.append(CommandConstant.LENGTH).append(commandEnum.toString().length()).append(CommandConstant.LINE); stringBuilder.append(commandEnum.toString()).append(CommandConstant.LINE); for (byte[] by:bytes){ stringBuilder.append(CommandConstant.LENGTH).append(by.length).append(CommandConstant.LINE); stringBuilder.append(new String(by)).append(CommandConstant.LINE); } return stringBuilder.toString(); } }View Code連線redis
public class CustomerRedisClientSocket { //這裡可以使用nio private Socket socket; private InputStream inputStream; private OutputStream outputStream; public CustomerRedisClientSocket(String ip,int port){ try { socket=new Socket(ip,port); inputStream=socket.getInputStream(); outputStream=socket.getOutputStream(); } catch (IOException e) { e.printStackTrace(); } } //傳送指令 public void send(String cmd){ try { outputStream.write(cmd.getBytes()); } catch (IOException e) { e.printStackTrace(); } } //讀取資料 public String read(){ byte[] bytes=new byte[1024]; int count=0; try { count=inputStream.read(bytes); } catch (IOException e) { e.printStackTrace(); } return new String(bytes,0,count); } }View Code測試
public class MainClient { public static void main(String[] args) { CustomerRedisClient customerRedisClient=new CustomerRedisClient("ip",6379); System.out.println(customerRedisClient.set("customer","Define")); System.out.println(customerRedisClient.get("customer")); } }View Code結果 +ok(redis返回的成功報文) $6(返回的value是6的長度)
根據上面我們自己實現的中介軟體,我們可以悟出,從這些層面選擇中介軟體:
- 通訊層面的優化:當我們獲取返回結果的時候是阻塞的(我們自己實現的中介軟體)
- 是否採用非同步通訊:多執行緒(效率高)
- 針對key和value的優化 :傳遞的報文越小,傳遞的速度肯定更快
- 連線的優化(連線池)
我們發現redisson是提供這些功能做的比較好的,整合網上很多例子,這裡不聊了。
使用redis中遇見的問題
【資料庫和redis的資料一致性問題】:實際上很難解決強一致性問題,常見有兩種操作
- 先更新資料庫,再刪除快取(刪除快取就等於更新)推薦
- 更新資料庫成功,但是刪除快取失敗
- 當資料庫更新成功後,把更新redis的訊息放在mq中,這個時候一定能保證都更新成功。
- 解析資料庫的binary log,然後更新快取
- 先刪除快取,再更新資料庫(不推薦)
- 刪除快取成功,更新資料庫失敗(看起來沒有什麼問題,但是看下面的場景):執行緒A先去刪除一個key,執行緒B去獲取這個Key,發現沒有資料,那他就去更新Redis快取,這個時候執行緒A去更新資料庫 。那就會導致資料庫的資料是最新的,但是快取不是最新的
【快取雪崩】
- 【原因】大量的熱點資料同時失效,因為設定了相同的過期時間,剛好這個時候請求量又很大,那這個時候壓力就到了資料庫上,從而就導致了雪崩。
- 【方案】這是幾個設定過期的命令,(我們可以給key設定不同的過期時間,這樣就能有效地避免雪崩,或者熱點資料不設定過期時間)
- expire key seconds # 設定鍵在給定秒後過期 pexpire key milliseconds # 設定鍵在給定毫秒後過期 expireat key timestamp # 到達指定秒數時間戳之後鍵過期 pexpireat key timestamp # 到達指定毫秒數時間戳之後鍵過期
- 【redis key 過期實現原理】想一下redis是如何實現過期的,如果我們儲存的資料庫十分巨大,redis怎麼精確的知道那個key過期了?並且對他進行刪除呢?
- 想法:我們給去key每個key設定一個定時器,一個個進行輪詢。效能太差了!!
- Redis對過期key的做法:
- 儲存:實際上redis使用了一個hash的結構進行儲存,對你設定的過期的key單獨用一個value儲存了一個過期時間
- 刪除:
- 被動刪除:當我們使用get命令的時候,他去查詢他儲存的我們傳遞的過期時間和電腦時間對比,如果過期,則進行刪除
- 主動刪除:隨機抽取20個key,刪除這20key中已經過期的key,如果發現這20個key中有20%的key已經過期了,那麼就再次抽取20個key,用這個方式迴圈。
【快取穿透】:
- 【原因】:Redis和資料庫中都不存在查詢的資料,那這就是一次無效查詢,如果有人偽造了很多請求,那可能會引發資料庫宕機,因為redis中沒有資料,請求肯定就請求到資料庫,這就叫快取穿透
- 【方案】:使用布隆過濾器
- 【流程】:
- 專案在啟動的時候,把所有的資料載入到布隆過濾器中
- 當客戶端有請求過來時,先到布隆過濾器中查詢一下當前訪問的key是否存在,如果布隆過 濾器中沒有該key,則不需要去資料庫查詢直接反饋即可
- 【實現】:
- 使用guava
<dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>21.0</version> </dependency>View Code- 程式初始的時候載入資料到布隆過濾器中
@Slf4j @Component public class BloomFilterDataLoadApplicationRunner implements ApplicationRunner { @Autowired ICityService cityService; @Override public void run(ApplicationArguments args) { List<City> cities=cityService.list(); //第一個引數指的是要儲存的資料,第二個資料是允許的容錯率 BloomFilter<String> bloomFilter=BloomFilter.create(Funnels.stringFunnel(Charsets.UTF_8),100000000,0.03); cities.parallelStream().forEach(city-> bloomFilter.put(RedisKeyConstants.CITY_KEY+":"+city.getId())); BloomFilterCache.cityBloom=bloomFilter; } }View Code- 客戶端訪問時增加驗證
@GetMapping("/bloom/{id}") public String filter(@PathVariable("id")Integer id){ String key=RedisKeyConstants.CITY_KEY+":"+id; if(BloomFilterCache.cityBloom.mightContain(key)){ return redisTemplate.opsForValue().get(key).toString(); } return "資料不存在"; } public class BloomFilterCache { public static BloomFilter<String> cityBloom; }View Code- 剖析布隆過濾器:布隆過濾器是一種空間利用率極高的資料結構,他的底層實際上並不儲存我們快取的元素的內容,而是儲存快取元素的標記 都是0/1。比如:一個int型別是32位4個位元組,32位意味著我們可以儲存32個0或者1,那我現在如果要儲存32個條資料,只需要一個int型別,到底這是怎麼做到的?底層用到了點陣圖
- 一個例子解釋點陣圖:
- 現在有32位 【0000 0000 0000 00000000 0000 0000 0000】
- 比如儲存5這個數字 ->5 的二進位制是101 【0000 0000 0000 00000000 0000 0010 1000】
- 第二個數字是9 ->9的二進位制是 1001 【0000 0000 0000 00000000 0010 0110 1000】
- 布隆過濾器引入了多個函式去生成hash數值,我們傳入的資料通過這些函式計算,則落入了這32位中。比如 有x y z 三個函式,我們傳入了一個的資料,通過這三個函式進行hash換算,落到了 32位中的5 6 9 ,那就把這5 6 9 這幾個地方變為1,當我們要查詢時候我們之前傳遞的資料是否存在的時候,再次用這些函式進行換算,如果相關位置是1 則說明資料存在,否則資料則不存在。
- 解析布隆過濾器的引數:傳遞的100000000是我們要構建多少個int型別(一個int型別可以儲存32位),0.03是誤判率(誤判率指的就是對我們傳遞的資料進行hash換算的函式)
BloomFilter<String> bloomFilter=BloomFilter.create(Funnels.stringFunnel(Charsets.UTF_8),100000000,0.03);- 很多地方都運用了這種思想,比如
- Redis的HyperLogLog bitmap protobuf中的zigzag壓縮演算法 執行緒池中的執行緒狀態和執行緒數量(高低位擴容我們之前聊過) ConcurrentHashMap中的資料遷移的執行緒數量儲存(線性探索我們也聊過)