Redis-分散式鎖(解決快取擊穿問題)
一. 簡介
分散式鎖在很多場景中都非常的有用,分散式鎖是一個概念,實現他的方式有很多,本篇文章是基於Redis實現的單機分散式鎖。
主要解決多併發程式設計中由於鎖競爭而帶來的資料不一致的問題。
二. 應用場景
在本篇文章中主要解決Redis中快取擊穿問題。
併發的訪問一條資料,資料庫有,但是快取中不存在(沒人訪問這條資料或者Redis中資料剛好過期),導致一瞬間多個請求訪問資料庫,資料庫壓力增大,這類資料通常為熱點資料。
三. 模擬快取擊穿
以下程式模擬100個執行緒同時去訪問一條沒有快取的資料。
1. 業務程式碼(service層)
@Override public Object listByRedis(String id) { HashMap<Object, Object> result = new HashMap<>(); //通過布隆過濾器 解決快取穿透問題. 會有誤判 但是沒有關係 不會有太多誤判。 if (!bloomFilter.isExist(id)){ result.put("status", 400); result.put("msg", "非法訪問"); return result; } //查詢快取 Object redisData = redisTemplate.opsForValue().get(id); //是否命中 if(redisData != null){ //返回結果 result.put("status", 200); result.put("msg", "快取命中"); result.put("data", redisData); return result; } try { UserInfo userInfo = userInfoMapper.selectById(id); if (userInfo != null){ redisTemplate.opsForValue().set(id, userInfo, 10, TimeUnit.MINUTES); result.put("status", 200); result.put("msg", "查詢資料庫"); result.put("data", userInfo); return result; }else{ result.put("status", 200); result.put("msg", "沒有資料"); return result; } }finally { } }
2. 併發模擬
併發訪問id=1096這條資料
public class ReadTest { private static CountDownLatch countDownLatch = new CountDownLatch(99); @Test public void test() throws InterruptedException { TicketsRunBle ticketsRunBle = new TicketsRunBle(); for (int i = 0;i<=99;i++){ new Thread(ticketsRunBle, "視窗"+i).start(); countDownLatch.countDown(); } Thread.currentThread().join(); } public class TicketsRunBle implements Runnable{ @Override public void run() { try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } RestTemplate restTemplate = new RestTemplate(); R forObject = restTemplate.getForObject("http://localhost:8082/user?id=1096", R.class); System.out.println("結果:" + forObject); } } }
3. 執行結果
擷取部分,都是資料庫查詢出來的,日誌列印也有Mybatis的記錄。
四. 單機Redis分散式鎖的實現
Redis分散式鎖原理上是使用Setnx命令實現:
SET resource_name my_random_value NX PX 30000。
這個命令僅在不存在key的時候才能被執行成功(NX選項),並且這個key有一個30秒的自動失效時間(PX屬性)。這個key的值是“my_random_value”(一個隨機值),這個值在所有的客戶端必須是唯一的,所有同一key的獲取者(競爭者)這個值都不能一樣。
當client嘗試獲取鎖時,我們將事先定義的key設定一個值,之後的client再設定時則會不成功。
釋放鎖時實現:
為什麼value要使用一個唯一的值,主要是為了更安全的釋放鎖,釋放鎖的時候使用指令碼告訴Redis:只有key存在並且儲存的值和我指定的值一樣才能告訴我刪除成功。可以通過以下Lua指令碼實現:
if redis.call("get",KEYS[1]) == ARGV[1] then
return redis.call("del",KEYS[1])
else
return 0
end使用這種方式可以避免刪除別的Client獲得的鎖。舉個栗子:
客戶端A取得資源鎖,但是緊接著被一個其他操作阻塞了,當客戶端A執行完畢其他操作後要釋放鎖時,原來的鎖早已超時並且被Redis自動釋放,並且在這期間資源鎖又被客戶端B再次獲取到。如果僅使用DEL命令將key刪除,那麼這種情況就會把客戶端B的鎖給刪除掉。使用Lua指令碼就不會存在這種情況,因為指令碼僅會刪除value等於客戶端A的value的key(value相當於客戶端的一個簽名)。
本篇文章獲取鎖有一點不同,上面說的是設定一個固定的key,而本篇文章解決的問題是基於單條資料的一個併發查詢。
所以需要對單條資料的ID作為key進行加鎖,防止查詢同一條資料多次訪問資料庫。
1. 鎖實現:
/**
* 自定義分散式鎖
* 這裡主要實現對單條資料進行加鎖,通過id進行加鎖
* 多個執行緒同時訪問該資料會阻塞
* @author
* @Date 2022/1/6
*/
@Component
public class RedisLock {
private static JedisPool jedisPool;
@Autowired
public void setJedisPool(JedisPool jedisPool){
RedisLock.jedisPool = jedisPool;
}
/**
* 鎖健
*/
private final static String KEY = "lock_key_";
/**
* 鎖過期時間
*/
private final static long LOCK_EXPIRED = 30000;
/**
* 鎖競爭超時時間
*/
private final static long LOCK_WAIT_TIME_OUT = 999999;
/**
* SET命令引數
*/
static SetParams params = SetParams.setParams().nx().px(LOCK_EXPIRED);
/**
* ThreadLocal用於儲存某個執行緒共享變數:對於同一個static ThreadLocal
* 不同的執行緒只能從中get,set到自己執行緒的副本
*/
private static ThreadLocal<String> threadLocal = new ThreadLocal<>();
/**
* 嘗試獲取鎖
* @param key
* @return
*/
public Boolean tryLock(String key){
String value = UUID.randomUUID().toString();
Jedis resource = jedisPool.getResource();
long startTime = System.currentTimeMillis();
try {
for(;;){
//SET命令返回OK,獲取鎖成功
String set = resource.set(KEY.concat(key), value, params);
if ("OK".equals(set)){
threadLocal.set(value);
return true;
}
//增加一個超時時間判斷
if(System.currentTimeMillis() - startTime > LOCK_WAIT_TIME_OUT){
return false;
}
//休眠一段時間 遞迴呼叫
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}finally {
resource.close();
}
}
/**
* 釋放鎖 通過lua指令碼實現
* @param key
* @return
*/
public boolean unLock(String key){
Jedis resource = null;
try {
resource = jedisPool.getResource();
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then" +
" return redis.call('del', KEYS[1]) " +
"else" +
" return 0 " +
"end";
Object eval = resource.eval(script, Collections.singletonList(KEY.concat(key)), Collections.singletonList(threadLocal.get()));
if ("1".equals(eval.toString())) {
return true;
}
return false;
}catch (Exception e){
e.printStackTrace();
return false;
}finally {
if (resource != null){
resource.close();
}
}
}
}
2. 業務程式碼(service層)
@Override
public Object listByRedis(String id) {
HashMap<Object, Object> result = new HashMap<>();
//通過布隆過濾器 解決快取穿透問題. 會有誤判 但是沒有關係 不會有太多誤判。
if (!bloomFilter.isExist(id)){
result.put("status", 400);
result.put("msg", "非法訪問");
return result;
}
//查詢快取
Object redisData = redisTemplate.opsForValue().get(id);
//是否命中
if(redisData != null){
//返回結果
result.put("status", 200);
result.put("msg", "快取命中");
result.put("data", redisData);
return result;
}
try {
//新增分散式鎖,進來後在查詢一次快取,如果上一個執行緒已經查詢並且存入快取
Boolean lock = redisLock.tryLock(id);
if (!lock){
result.put("status", 500);
result.put("msg", "訪問超時,稍後再試");
return result;
}
//查詢快取
redisData = redisTemplate.opsForValue().get(id);
//是否命中
if(redisData != null){
//返回結果
result.put("status", 200);
result.put("msg", "快取命中");
result.put("data", redisData);
return result;
}
UserInfo userInfo = userInfoMapper.selectById(id);
if (userInfo != null){
redisTemplate.opsForValue().set(id, userInfo, 10, TimeUnit.MINUTES);
result.put("status", 200);
result.put("msg", "查詢資料庫");
result.put("data", userInfo);
return result;
}else{
result.put("status", 200);
result.put("msg", "沒有資料");
return result;
}
}finally {
redisLock.unLock(id);
}
}
3. 測試結果
列印日誌顯示,只會有一次資料庫查詢。
返回的結果除了第一次是查詢資料庫,後面的都是快取命中
五. 總結
該鎖的實現有很多不足之處,不斷了解學習的一個過程,可使用Redisson中實現的分散式鎖可用性更高。
本文來自部落格園,作者:EchoLv,轉載請註明原文連結:https://www.cnblogs.com/lvdeyinBlog/p/15774412.html