SpringBoot整合Redis正確的實現分散式鎖的示例程式碼
前言
最近在做分塊上傳的業務,使用到了Redis來維護上傳過程中的分塊編號。
每上傳完成一個分塊就獲取一下檔案的分塊集合,加入新上傳的編號,手動介面測試下是沒有問題的,前端通過併發上傳呼叫就出現問題了,併發的get再set,就會存在覆蓋寫現象,導致最後的分塊資料不對,不能觸發分塊合併請求。
遇到併發二話不說先上鎖,針對執行程式碼塊加了一個JVM鎖之後問題就解決了。
仔細一想還是不太對,專案是分散式部署的,做了負載均衡,一個節點的程式碼被鎖住了,請求輪詢到其他節點還是可以進行覆蓋寫,並沒有解決到問題啊
沒辦法,只有用上分散式鎖了。之前對於分散式鎖的理論還是很熟悉的,沒有比較好的應用場景就沒寫過具體程式碼,趁這個機會就學習使用一下分散式鎖。
理論
分散式鎖是控制分散式系統之間同步訪問共享資源的一種方式。是為了解決分散式系統中,不同的系統或是同一個系統的不同主機共享同一個資源的問題,它通常會採用互斥來保證程式的一致性
通常的實現方式有三種:
- 基於 MySQL 的悲觀鎖來實現分散式鎖,這種方式使用的最少,這種實現方式的效能不好,且容易造成死鎖,並且MySQL本來業務壓力就很大了,再做鎖也不太合適
- 基於 Redis 實現分散式鎖,單機版可用setnx實現,多機版建議用Radission
- 基於 ZooKeeper 實現分散式鎖,利用 ZooKeeper 順序臨時節點來實現
為了確保分散式鎖可用,我們至少要確保鎖的實現同時滿足以下四個條件:
- 互斥性。在任意時刻,只有一個客戶端能持有鎖。
- 不會發生死鎖。即使有一個客戶端在持有鎖的期間崩潰而沒有主動解鎖,也能保證後續其他客戶端能加鎖。
- 具有容錯性。只要大部分的Redis節點正常執行,客戶端就可以加鎖和解鎖。
- 解鈴還須繫鈴人。加鎖和解鎖必須是同一個客戶端,客戶端自己不能把別人加的鎖給解了。
本文就使用的是Redis的setnx實現,如果Redis是多機版的可以去了解下Radssion,封裝的就特別的好,也是官方推薦的
程式碼
1. 加依賴
引入Spring Boot和Redis整合的快速使用依賴
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency>
2. 加配置
application.properties中加入Redis連線相關配置
spring.redis.host=xxx spring.redis.port=6379 spring.redis.database=0 spring.redis.password=xxx spring.redis.timeout=10000 # 設定jedis連線池 spring.redis.jedis.pool.max-active=50 spring.redis.jedis.pool.min-idle=20
3. 重寫Redis的序列化規則
預設使用的JDK的序列化,不自己設定一下Redis中的資料是看不懂的
/** * @author Chkl * @create 2020/6/7 * @since 1.0.0 */ @Component public class RedisConfig { /** * 改造RedisTemplate,重寫序列化規則,避免存入序列化內容看不懂 * @param connectionFactory * @return */ @Bean public RedisTemplate<Object,Object> redisTemplate(RedisConnectionFactory connectionFactory) { RedisTemplate redisTemplate = new RedisTemplate(); redisTemplate.setConnectionFactory(connectionFactory); // 設定key和value的序列化規則 redisTemplate.setValueSerializer(new Jackson2JsonRedisSerializer(Object.class)); redisTemplate.setKeySerializer(new StringRedisSerializer()); return redisTemplate; } }
4. 如何正確的上鎖
直接上程式碼
@Component public class RedisLock { @Autowired private StringRedisTemplate redisTemplate; private long timeout = 3000; /** * 上鎖 * @param key 鎖標識 * @param value 執行緒標識 * @return 上鎖狀態 */ public boolean lock(String key,String value) { long start = System.currentTimeMillis(); while (true) { //檢測是否超時 if (System.currentTimeMillis() - start > timeout) { return false; } //執行set命令 Boolean absent = redisTemplate.opsForValue().setIfAbsent(key,value,timeout,TimeUnit.MILLISECONDS);//1 //是否成功獲取鎖 if (absent) { return true; } return false; } } }
核心程式碼就是
Boolean absent = redisTemplate.opsForValue().setIfAbsent(key,TimeUnit.MILLISECONDS);
setIfAbsent方法就相當於命令列下的Setnx方法,指定的 key 不存在時,為 key 設定指定的值
引數分別是key、value、超時時間和時間單位
- key,表示針對於這段資源的唯一標識
- value,表示針對於這個執行緒的唯一標識。為什麼有了key了還需要設定value呢,就是為了滿足四個條件的最後一個:解鈴還須繫鈴人。只有通過key和value的組合才能保證解鎖時是同一個執行緒來解鎖
- 超時時間,必須和setnx一起進行操作,不能再setnx結束後再執行。如果加鎖成功了,還沒有設定過期時間就宕機了,鎖就永遠不會過期,變成死鎖
5. 如何正確解鎖
@Component public class RedisLock { @Autowired private StringRedisTemplate redisTemplate; @Autowired private DefaultRedisScript<Long> redisScript; private static final Long RELEASE_SUCCESS = 1L; /** * 解鎖 * @param key 鎖標識 * @param value 執行緒標識 * @return 解鎖狀態 */ public boolean unlock(String key,String value) { //使用Lua指令碼:先判斷是否是自己設定的鎖,再執行刪除 Long result = redisTemplate.execute(redisScript,Arrays.asList(key,value)); //返回最終結果 return RELEASE_SUCCESS.equals(result); } /** * @return lua指令碼 */ @Bean public DefaultRedisScript<Long> defaultRedisScript() { DefaultRedisScript<Long> defaultRedisScript = new DefaultRedisScript<>(); defaultRedisScript.setResultType(Long.class); defaultRedisScript.setScriptText("if redis.call('get',KEYS[1]) == KEYS[2] then return redis.call('del',KEYS[1]) else return 0 end"); return defaultRedisScript; } }
解鎖過程需要兩步操作
1.判斷操作執行緒是否是加鎖的執行緒
2.如果是加鎖執行緒,執行解鎖操作
這兩步操作也需要原子的進行操作,但是Redis不支援這兩步的合併的操作,所以,就只有使用lua指令碼實現來保證原子性咯
如果在判斷是加鎖的執行緒之後,並且執行解鎖之前,鎖到期了,被其他執行緒獲得鎖了,這時候再進行解鎖就會解掉其他執行緒的鎖,使得不滿足解鈴還須繫鈴人
6. 實際應用
沒有使用分散式鎖時的儲存檔案分塊的程式碼
/** * 儲存檔案分塊編號到redis * @param chunkNumber 分塊號 * @param identifier 檔案唯一編號 * @return 檔案分塊的大小 */ @Override public Integer saveChunk(Integer chunkNumber,String identifier) { //從Redis獲取已經存在的分塊編號集合 Set<Integer> oldChunkNumber = (Set<Integer>) JSON.parseObject(redisOperator.get("chunkNumberList_"+identifier),Set.class); //如果不存在分塊集合,建立一個集合 if (Objects.isNull(oldChunkNumber)) { Set<Integer> newChunkNumber = new HashSet<>(); newChunkNumber.add(chunkNumber); redisOperator.set("chunkNumberList_"+identifier,JSON.toJSONString(newChunkNumber),36000); return newChunkNumber.size(); //如果分塊集合已經存在了,就新增一個編號 } else { oldChunkNumber.add(chunkNumber); redisOperator.set("chunkNumberList_"+identifier,JSON.toJSONString(oldChunkNumber),36000); return oldChunkNumber.size(); } }
存在的問題是:當併發的請求進來之後,可能獲取同一個狀態的集合進行修改,修改後直接寫入,造成同一個狀態獲得的集合操作執行緒覆蓋寫的現象
使用分散式鎖保證同時只能有一個執行緒能獲取到集合並進行修改,避免了覆蓋寫現象
使用分散式鎖程式碼
/** * 儲存檔案分塊編號到redis * @param chunkNumber 分塊號 * @param identifier 檔案唯一編號 * @return 檔案分塊的大小 */ @Override public Integer saveChunk(Integer chunkNumber,String identifier) { //通過UUID生成一個請求執行緒識別標誌作為鎖的value String threadUUID = CoreUtil.getUUID(); //上鎖,以共享資源標識:檔案唯一編號,作為key,以執行緒標識UUID作為value redisLock.lock(identifier,threadUUID); //從Redis獲取已經存在的分塊編號集合 Set<Integer> oldChunkNumber = (Set<Integer>) JSON.parseObject(redisOperator.get("chunkNumberList_"+identifier),36000); //解鎖 redisLock.unlock(identifier,threadUUID); return newChunkNumber.size(); //如果分塊集合已經存在了,就新增一個編號 } else { oldChunkNumber.add(chunkNumber); redisOperator.set("chunkNumberList_"+identifier,threadUUID); return oldChunkNumber.size(); } }
程式碼中使用的共享資源標識是檔案唯一編號identifier,它能標識加鎖程式碼段中的唯一資源,即key為"chunkNumberList_"+identifier
的集合
程式碼中使用的執行緒唯一標識是UUID,能保證加鎖和解鎖時獲取的標識不會重複
到此這篇關於SpringBoot整合Redis正確的實現分散式鎖的示例程式碼的文章就介紹到這了,更多相關SpringBoot整合Redis分散式鎖內容請搜尋我們以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援我們!