Redis Template實現分散式鎖的例項程式碼
阿新 • • 發佈:2020-01-09
前言
分散式鎖一般有三種實現方式:1. 資料庫樂觀鎖;2. 基於Redis的分散式鎖;3. 基於ZooKeeper的分散式鎖。本篇部落格將介紹第二種方式,基於Redis實現分散式鎖。雖然網上已經有各種介紹Redis分散式鎖實現的部落格,然而他們的實現卻有著各種各樣的問題,為了避免誤人子弟,本篇部落格將詳細介紹如何正確地實現Redis分散式鎖。
可靠性
首先,為了確保分散式鎖可用,我們至少要確保鎖的實現同時滿足以下四個條件:
1.互斥性。在任意時刻,只有一個客戶端能持有鎖。
2.不會發生死鎖。即使有一個客戶端在持有鎖的期間崩潰而沒有主動解鎖,也能保證後續其他客戶端能加鎖。
3.具有容錯性。只要大部分的Redis節點正常執行,客戶端就可以加鎖和解鎖。
使用Redis的SETNX命令獲取分散式鎖的步驟:
•C1和C2執行緒同時檢查時間戳獲取鎖,執行SETNX命令並都返回0,此時鎖仍被C3持有,並且C3已經崩潰
•C1 DEL鎖
•C1 使用SETNX命令獲取鎖,並且成功
•C2 DEL鎖
•C2 使用SETNX命令獲取鎖,並且成功
•ERROR : 由於競態條件,C1和C2都獲取到了鎖
幸運的是,以下面的步驟完全可以避免這種情況發生,看看C4執行緒如何操作
•C4使用SETNX命令獲取鎖
•C4使用GET命令獲取鎖並檢查鎖是否已經過期,如果沒有過期,則繼續等待一段時間並重新重試
•如果鎖已經過期,C4嘗試 GETSET lock.foo <current Unix timestamp + lock timeout + 1>
•利用GETSET語法,C4可以檢查舊時間是否仍然是過期時間,如果是,則獲取鎖
•如果另一個客戶端C5率先獲取到鎖,C4執行GETSET命令後將返回非過期時間,然後C4繼續從頭開始重新嘗試獲取鎖。此操作C4將延長一點C5獲取到的鎖的過期時間,不過這不是什麼大問題。
接下來我們用程式碼的形式展現:
package com.shuige.components.cache.redis; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisCallback; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.ValueOperations; import org.springframework.stereotype.Component; import java.util.Objects; import java.util.concurrent.TimeUnit; /** * Description: 通用Redis幫助類 * User: zhouzhou * Date: 2018-09-05 * Time: 15:39 */ @Component public class CommonRedisHelper { public static final String LOCK_PREFIX = "redis_lock"; public static final int LOCK_EXPIRE = 300; // ms @Autowired RedisTemplate redisTemplate; /** * 最終加強分散式鎖 * * @param key key值 * @return 是否獲取到 */ public boolean lock(String key){ String lock = LOCK_PREFIX + key; // 利用lambda表示式 return (Boolean) redisTemplate.execute((RedisCallback) connection -> { long expireAt = System.currentTimeMillis() + LOCK_EXPIRE + 1; Boolean acquire = connection.setNX(lock.getBytes(),String.valueOf(expireAt).getBytes()); if (acquire) { return true; } else { byte[] value = connection.get(lock.getBytes()); if (Objects.nonNull(value) && value.length > 0) { long expireTime = Long.parseLong(new String(value)); if (expireTime < System.currentTimeMillis()) { // 如果鎖已經過期 byte[] oldValue = connection.getSet(lock.getBytes(),String.valueOf(System.currentTimeMillis() + LOCK_EXPIRE + 1).getBytes()); // 防止死鎖 return Long.parseLong(new String(oldValue)) < System.currentTimeMillis(); } } } return false; }); } /** * 刪除鎖 * * @param key */ public void delete(String key) { redisTemplate.delete(key); } }
如何使用呢,匯入工具類後:
boolean lock = redisHelper.lock(key); if (lock) { // 執行邏輯操作 redisHelper.delete(key); } else { // 設定失敗次數計數器,當到達5次時,返回失敗 int failCount = 1; while(failCount <= 5){ // 等待100ms重試 try { Thread.sleep(100l); } catch (InterruptedException e) { e.printStackTrace(); } if (redisHelper.lock(key)){ // 執行邏輯操作 redisHelper.delete(key); }else{ failCount ++; } } throw new RuntimeException("現在建立的人太多了,請稍等再試"); }
加鎖成功執行完邏輯後,必須解鎖,否則只能靠鎖機制來解鎖了不建議這麼做
總結
以上所述是小編給大家介紹的Redis Template實現分散式鎖的例項程式碼,希望對大家有所幫助,如果大家有任何疑問請給我留言,小編會及時回覆大家的。在此也非常感謝大家對我們網站的支援!