Redis秒殺使用
1. 快取穿透
客戶端請求的資料在快取中和資料庫中都不存在,這樣快取永遠不會生效,請求都會直接打到資料庫。
1.1 解決方案
1. 快取Null值 -> 優點:實現方便 缺點: 額外記憶體消耗,可能造成短期的不一致
2. 布隆過濾器 -> 優點: 記憶體佔用較少,沒有多餘的key 缺點: 實現複雜,存在誤判的可能
3. 增強id的複雜度,避免被猜測到id的規律
4. 做好資料的基礎格式校驗
5. 加強使用者許可權校驗
6. 做好熱點引數的限流
2. 快取雪崩
同一時段大量的快取key同時失效或者Redis伺服器宕機,導致大量請求到達資料庫,帶來巨大壓力。
2.1 解決方案
1. 給不同的key設定不同的過期時間 -> 隨機時間
2. 利用redis叢集提高服務的可用性 -> 利用redis哨兵機制
3. 給快取業務新增降級限流策略
4. 給業務新增多級快取
3. 快取擊穿
熱點key問題,就是一個被 高併發訪問 並且 快取重建業務比較複雜 的key突然失效了,無數的請求訪問會瞬間給資料庫帶來巨大的衝擊。
3.1 常見的解決方案
3.1.1 互斥鎖
3.1.2 邏輯過期
3.2 解決方案對比
3.3 基於互斥鎖的方式解決
修改根據id查詢商品的介面,基於互斥鎖方式來解決快取擊穿問題
3.3.1 程式碼
package com.hmdp.service.impl; import cn.hutool.core.util.BooleanUtil; import cn.hutool.core.util.StrUtil; import cn.hutool.json.JSONUtil; import com.hmdp.dto.Result; import com.hmdp.entity.Shop; import com.hmdp.mapper.ShopMapper; import com.hmdp.service.IShopService; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.hmdp.utils.RedisConstants; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import javax.annotation.Resource; import java.util.concurrent.TimeUnit; @Service public class ShopServiceImpl extends ServiceImpl<ShopMapper, Shop> implements IShopService { @Resource private StringRedisTemplate stringRedisTemplate; /** * 根據id查詢商鋪資訊 * @param id * @return */ @Override public Result queryById(Long id) { // 利用空值解決快取穿透問題 // Shop shop = queryWithPassThrough(id); // 互斥鎖解決快取擊穿 Shop shop = queryWithMutex(id); if (shop == null) { return Result.fail("商鋪不存在!"); } // 7. 返回結果 return Result.ok(shop); } /** * 使用互斥鎖解決快取擊穿 * @param id * @return */ public Shop queryWithMutex(Long id) { String key = RedisConstants.CACHE_SHOP_KEY + id; // 1. 從 redis 中查詢商品快取 String shopJson = stringRedisTemplate.opsForValue().get(key); // 2. 判斷是否存在 if (StrUtil.isNotBlank(shopJson)) { // 3. 存在, 直接返回結果 return JSONUtil.toBean(shopJson, Shop.class); } // 判斷命中的是否是空值 if(shopJson != null) { // 返回錯誤資訊 return null; } // 4. 實現快取重建 // 4.1 獲取互斥鎖 String lockKey = "lock:shop:" + id; Shop shop = null; try { boolean isLock = tryLock(lockKey); // 4.2 判斷是否獲取成功 if(!isLock) { // 4.3 失敗,則休眠並重試 Thread.sleep(100); return queryWithMutex(id); } // 4.4 成功,根據id查詢資料庫 shop = getById(id); // todo 模擬重建延時 //Thread.sleep(200); // 5. 不存在,返回錯誤 if (shop == null) { // 將空值寫入redis stringRedisTemplate.opsForValue().set(key, "", RedisConstants.CACHE_NULL_TTL, TimeUnit.MINUTES); // 返回錯誤資訊 return null; } // 6. 存在,寫入redis stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(shop), RedisConstants.CACHE_SHOP_TTL, TimeUnit.MINUTES); } catch (InterruptedException e) { e.printStackTrace(); } finally { // 7. 釋放互斥鎖 unlock(lockKey); } // 8. 返回結果 return shop; } /** * 利用空值解決快取穿透問題 * @param id * @return */ public Shop queryWithPassThrough(Long id) { String key = RedisConstants.CACHE_SHOP_KEY + id; // 1. 從 redis 中查詢商品快取 String shopJson = stringRedisTemplate.opsForValue().get(key); // 2. 判斷是否存在 if (StrUtil.isNotBlank(shopJson)) { // 3. 存在, 直接返回結果 return JSONUtil.toBean(shopJson, Shop.class); } // 判斷命中的是否是空值 if(shopJson != null) { // 返回錯誤資訊 return null; } // 4. 不存在,根據id查詢資料庫 Shop shop = getById(id); // 5. 不存在,返回錯誤 if (shop == null) { // 將空值寫入redis stringRedisTemplate.opsForValue().set(key, "", RedisConstants.CACHE_NULL_TTL, TimeUnit.MINUTES); // 返回錯誤資訊 return null; } // 6. 存在,寫入redis stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(shop), RedisConstants.CACHE_SHOP_TTL, TimeUnit.MINUTES); // 7. 返回結果 return shop; } /** * 嘗試獲取鎖 * @param key * @return */ private boolean tryLock(String key) { Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.SECONDS); return BooleanUtil.isTrue(flag); } /** * 釋放鎖 * @param key */ private void unlock(String key) { stringRedisTemplate.delete(key); } }
3.4 基於邏輯過期方式解決
修改根據id查詢商品的業務,基於邏輯過期方式來解決快取擊穿問題
3.4.1 程式碼
package com.hmdp.service.impl; import cn.hutool.core.util.BooleanUtil; import cn.hutool.core.util.StrUtil; import cn.hutool.json.JSONObject; import cn.hutool.json.JSONUtil; import com.hmdp.dto.Result; import com.hmdp.entity.Shop; import com.hmdp.mapper.ShopMapper; import com.hmdp.service.IShopService; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.hmdp.utils.RedisConstants; import com.hmdp.utils.RedisData; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import javax.annotation.Resource; import java.time.LocalDateTime; import java.util.concurrent.*; @Service public class ShopServiceImpl extends ServiceImpl<ShopMapper, Shop> implements IShopService { @Resource private StringRedisTemplate stringRedisTemplate; /** * 根據id查詢商鋪資訊 * @param id * @return */ @Override public Result queryById(Long id) { // 利用空值解決快取穿透問題 // Shop shop = queryWithPassThrough(id); // 互斥鎖解決快取擊穿 //Shop shop = queryWithMutex(id); // 使用邏輯過期解決快取擊穿 Shop shop = queryWithLogicalExpire(id); if (shop == null) { return Result.fail("商鋪不存在!"); } // 7. 返回結果 return Result.ok(shop); } /** * 定義執行緒池, 建立10個執行緒 */ private static final ExecutorService CACHE_REBUILD_EXECUTOR = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(10), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); /** * 使用邏輯過期解決快取擊穿 * @param id * @return */ public Shop queryWithLogicalExpire(Long id) { String key = RedisConstants.CACHE_SHOP_KEY + id; // 1. 從 redis 中查詢商品快取 String shopJson = stringRedisTemplate.opsForValue().get(key); // 2. 判斷是否存在 if (StrUtil.isBlank(shopJson)) { // 3. 不存在, 直接返回 return null; } // 4. 命中,將json反序列化為物件 RedisData redisData = JSONUtil.toBean(shopJson, RedisData.class); Shop shop = JSONUtil.toBean((JSONObject) redisData.getData(), Shop.class); LocalDateTime expireTime = redisData.getExpireTime(); // 5. 判斷邏輯過期時間是否過期 if (LocalDateTime.now().isBefore(expireTime)) { // 5.1 未過期,直接返回商鋪資訊 return shop; } // 5.2 已過期,需要快取重建 // 6. 快取重建 // 6.1 獲取互斥鎖 String lockKey = RedisConstants.LOCK_SHOP_KEY + id; boolean isLock = tryLock(lockKey); // 6.2 判斷是否獲取鎖成功 if (isLock) { // 6.3 成功,開啟獨立執行緒,實現快取重建 CACHE_REBUILD_EXECUTOR.submit(() -> { try { // 重建快取 this.saveShop2Redis(id, 20L); } catch (Exception e) { throw new RuntimeException(e); } finally { // 釋放鎖 unlock(lockKey); } }); } // 6.4 返回過期的商鋪資訊 // 7. 返回結果 return shop; } /** * 利用空值解決快取穿透問題 * @param id * @return */ public Shop queryWithPassThrough(Long id) { String key = RedisConstants.CACHE_SHOP_KEY + id; // 1. 從 redis 中查詢商品快取 String shopJson = stringRedisTemplate.opsForValue().get(key); // 2. 判斷是否存在 if (StrUtil.isNotBlank(shopJson)) { // 3. 存在, 直接返回結果 return JSONUtil.toBean(shopJson, Shop.class); } // 判斷命中的是否是空值 if(shopJson != null) { // 返回錯誤資訊 return null; } // 4. 不存在,根據id查詢資料庫 Shop shop = getById(id); // 5. 不存在,返回錯誤 if (shop == null) { // 將空值寫入redis stringRedisTemplate.opsForValue().set(key, "", RedisConstants.CACHE_NULL_TTL, TimeUnit.MINUTES); // 返回錯誤資訊 return null; } // 6. 存在,寫入redis stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(shop), RedisConstants.CACHE_SHOP_TTL, TimeUnit.MINUTES); // 7. 返回結果 return shop; } /** * 嘗試獲取鎖 * @param key * @return */ private boolean tryLock(String key) { Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.SECONDS); return BooleanUtil.isTrue(flag); } /** * 釋放鎖 * @param key */ private void unlock(String key) { stringRedisTemplate.delete(key); } /** * 存入商鋪資訊附帶過期時間到redis * @param id */ public void saveShop2Redis(Long id, Long expireSeconds) { // 1. 查詢店鋪資料 Shop shop = this.getById(id); // todo 模擬重建延時 //try { // Thread.sleep(200); //} catch (InterruptedException e) { // e.printStackTrace(); //} // 2. 封裝邏輯過期時間 RedisData redisData = new RedisData(); redisData.setExpireTime(LocalDateTime.now().plusSeconds(expireSeconds)); redisData.setData(shop); // 3. 寫入redis stringRedisTemplate.opsForValue() .set(RedisConstants.CACHE_SHOP_KEY + id, JSONUtil.toJsonStr(redisData)); } }
4. 快取工具封裝
方法1:將任意的java物件序列化為json並存儲在String型別的key中,並且可以設定TLL過期時間
方法2:將任意的java物件序列化為json並存儲在String型別的key中,並且可以設定邏輯過期時間,用於處理快取擊穿問題
方法3:根據指定的key查詢快取,並反序列化為指定型別,利用快取空值的方式解決快取穿透問題
方法4:根據指定的key查詢快取,並反序列化為指定型別,需要利用邏輯時間解決快取擊穿問題
package com.hmdp.utils;
import cn.hutool.core.util.BooleanUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.hmdp.entity.Shop;
import com.sun.xml.internal.bind.v2.model.core.ID;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.util.concurrent.*;
import java.util.function.Function;
@Slf4j
@Component
public class CacheClient {
private final StringRedisTemplate stringRedisTemplate;
public CacheClient(StringRedisTemplate stringRedisTemplate) {
this.stringRedisTemplate = stringRedisTemplate;
}
/**
* 將任意的java物件序列化為json並存儲在String型別的key中,
* 並且可以設定TLL過期時間
* @param key 快取key
* @param value 快取物件
* @param time 過期時間
* @param timeUnit 時間單位
*/
public void set(String key, Object value, Long time, TimeUnit timeUnit) {
stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(value), time, timeUnit);
}
/**
* 將任意的java物件序列化為json並存儲在String型別的key中,
* 並且可以設定TLL過期時間,並且可以設定邏輯過期時間,用於處理快取擊穿問題
* @param key 快取key
* @param value 快取物件
* @param time 過期時間
* @param timeUnit 時間單位
*/
public void setWithLogicalExpire(String key, Object value, Long time, TimeUnit timeUnit) {
// 設定邏輯過期時間
RedisData redisData = new RedisData();
redisData.setExpireTime(LocalDateTime.now().plusSeconds(timeUnit.toSeconds(time)));
redisData.setData(value);
// 寫入 redis
stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(redisData));
}
/**
* 根據指定的key查詢快取,並反序列化為指定型別,
* 利用快取空值的方式解決快取穿透問題
* @param keyPrefix key字首
* @param id 查詢的id
* @param type 返回值型別
* @param dbFallBack 查詢資料庫的回撥函式
* @param time 過期時間
* @param timeUnit 時間單位
* @param <R> 返回值型別
* @param <ID> id型別
* @return
*/
public <R, ID> R queryWithPassThrough(
String keyPrefix,
ID id,
Class<R> type,
Function<ID, R> dbFallBack,
Long time,
TimeUnit timeUnit
) {
String key = keyPrefix + id;
// 1. 從 redis 中查詢快取
String json = stringRedisTemplate.opsForValue().get(key);
// 2. 判斷是否存在
if (StrUtil.isNotBlank(json)) {
// 3. 存在, 直接返回結果
return JSONUtil.toBean(json, type);
}
// 判斷命中的是否是空值
if(json != null) {
// 返回錯誤資訊
return null;
}
// 4. 不存在,根據id查詢資料庫
R r = dbFallBack.apply(id);
// 5. 不存在,返回錯誤
if (r == null) {
// 將空值寫入redis
stringRedisTemplate.opsForValue().set(key, "", RedisConstants.CACHE_NULL_TTL, TimeUnit.MINUTES);
// 返回錯誤資訊
return null;
}
// 6. 存在,寫入redis
this.set(key, r, time, timeUnit);
// 7. 返回結果
return r;
}
/**
* 定義執行緒池, 建立10個執行緒
*/
private static final ExecutorService CACHE_REBUILD_EXECUTOR = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(10), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
/**
* 使用邏輯過期解決快取擊穿問題
* @param keyPrefix key字首
* @param id 查詢的id
* @param type 返回值型別
* @param dbFallBack 查詢資料庫的回撥函式
* @param time 過期時間
* @param timeUnit 時間單位
* @param <R> 返回值型別
* @param <ID> id型別
* @return
*/
public <R, ID> R queryWithLogicalExpire(
String keyPrefix,
ID id,
Class<R> type,
Function<ID, R> dbFallBack,
Long time,
TimeUnit timeUnit
) {
String key = keyPrefix + id;
// 1. 從 redis 中查詢快取
String json = stringRedisTemplate.opsForValue().get(key);
// 2. 判斷是否存在
if (StrUtil.isBlank(json)) {
// 3. 不存在, 直接返回
return null;
}
// 4. 命中,將json反序列化為物件
RedisData redisData = JSONUtil.toBean(json, RedisData.class);
R r = JSONUtil.toBean((JSONObject) redisData.getData(), type);
LocalDateTime expireTime = redisData.getExpireTime();
// 5. 判斷邏輯過期時間是否過期
if (LocalDateTime.now().isBefore(expireTime)) {
// 5.1 未過期,直接返回資訊
return r;
}
// 5.2 已過期,需要快取重建
// 6. 快取重建
// 6.1 獲取互斥鎖
String lockKey = RedisConstants.LOCK_SHOP_KEY + id;
boolean isLock = tryLock(lockKey);
// 6.2 判斷是否獲取鎖成功
if (isLock) {
// 6.3 成功,開啟獨立執行緒,實現快取重建
CACHE_REBUILD_EXECUTOR.submit(() -> {
try {
// 重建快取
// 1. 查詢資料庫
R r1 = dbFallBack.apply(id);
// 2. 寫入redis
this.setWithLogicalExpire(key, r1, time, timeUnit);
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
// 釋放鎖
unlock(lockKey);
}
});
}
// 7. 返回結果
return r;
}
/**
* 嘗試獲取鎖
* @param key
* @return
*/
private boolean tryLock(String key) {
Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.SECONDS);
return BooleanUtil.isTrue(flag);
}
/**
* 釋放鎖
* @param key
*/
private void unlock(String key) {
stringRedisTemplate.delete(key);
}
}
5. 全域性ID生成器
為了增加ID的安全性,可以不直接使用Redis自增的數值,而是拼接一些其他資訊:
5.1 工具類
package com.hmdp.utils;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
/**
* @Description: Redis id生成器
* @Date: 2022/5/7 13:00
*/
@Component
public class RedisIdWorker {
/**
* 開始時間戳
*/
public static final long BEGIN_TIMESTAMP = 1640995200L;
/**
* 序列號位數
*/
public static final int COUNT_BITS = 32;
private StringRedisTemplate stringRedisTemplate;
public RedisIdWorker(StringRedisTemplate stringRedisTemplate) {
this.stringRedisTemplate = stringRedisTemplate;
}
/**
* 唯一id生成
* @param keyPrefix 不同業務的key字首
* @return
*/
public long nextId(String keyPrefix) {
// 1. 生成時間戳
LocalDateTime now = LocalDateTime.now();
long nowSecond = now.toEpochSecond(ZoneOffset.UTC);
long timestamp = nowSecond - BEGIN_TIMESTAMP;
// 2. 生成序列號
// 2.1 獲取當前日期, 精確到天
String date = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));
// 2.2 自增長
long count = stringRedisTemplate.opsForValue().increment("icr:" + keyPrefix + ":" + date);
// 3. 拼接並返回
return timestamp << COUNT_BITS | count;
}
}
5.2 總結
5.2.1 全域性唯一ID生成策略
1. UUID
2. Redis自增
3. snowflack演算法
4. 資料庫自增
5.2.2 Redis自增ID策略
1. 每天一個key,方便統計訂單量
6. 優惠券秒殺
判斷兩點:
- 秒殺是否開始或結束,如果尚未開始或已經結束則無法下單
- 庫存是否,不足則無法下單
6.1 普通搶購優惠券的程式碼
/**
* 搶購優惠券
* @param voucherId
* @return
*/
@Override
@Transactional
public Result seckillVoucher(Long voucherId) {
// 1. 查詢優惠券
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
// 2. 判斷秒殺是否開始
if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
// 秒殺未開始
return Result.fail("秒殺尚未開始");
}
// 3. 判斷秒殺是否結束
if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
// 秒殺已經結束
return Result.fail("秒殺已經結束");
}
// 4. 判斷庫存是否充足
if (voucher.getStock() < 1) {
// 庫存不足
return Result.fail("庫存不足");
}
// 5. 扣減庫存
boolean success = seckillVoucherService.update(new LambdaUpdateWrapper<SeckillVoucher>()
.eq(SeckillVoucher::getVoucherId, voucherId)
.set(SeckillVoucher::getStock, voucher.getStock() - 1));
if (!success) {
// 扣減庫存失敗
return Result.fail("扣減庫存失敗");
}
// 6. 建立訂單
VoucherOrder voucherOrder = new VoucherOrder();
// 6.1 訂單id
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
// 6.2 使用者id
Long userId = UserHolder.getUser().getId();
voucherOrder.setUserId(userId);
// 6.3 優惠券id
voucherOrder.setVoucherId(voucherId);
// 存庫
this.save(voucherOrder);
// 7. 返回訂單id
return Result.ok(orderId);
}
6.2 出現超賣問題
超賣問題就是典型的多執行緒安全問題
6.3 常見解決方案
加鎖
6.3.1 樂觀鎖解決思路
樂觀鎖的關鍵是 判斷之前查詢得到的資料是否有被修改過。常見的方式有
兩種
:
6.3.2 悲觀鎖
略,加synchronized 或者 Lock即可
6.4 樂觀鎖 (CAS)解決
package com.hmdp.service.impl;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.hmdp.dto.Result;
import com.hmdp.entity.SeckillVoucher;
import com.hmdp.entity.VoucherOrder;
import com.hmdp.mapper.VoucherOrderMapper;
import com.hmdp.service.ISeckillVoucherService;
import com.hmdp.service.IVoucherOrderService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.hmdp.utils.RedisIdWorker;
import com.hmdp.utils.UserHolder;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
import java.time.LocalDateTime;
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
@Resource
private ISeckillVoucherService seckillVoucherService;
@Resource
private RedisIdWorker redisIdWorker;
/**
* 搶購優惠券
* @param voucherId
* @return
*/
@Override
@Transactional
public Result seckillVoucher(Long voucherId) {
// 1. 查詢優惠券
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
// 2. 判斷秒殺是否開始
if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
// 秒殺未開始
return Result.fail("秒殺尚未開始");
}
// 3. 判斷秒殺是否結束
if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
// 秒殺已經結束
return Result.fail("秒殺已經結束");
}
// 4. 判斷庫存是否充足
if (voucher.getStock() < 1) {
// 庫存不足
return Result.fail("庫存不足");
}
// 5. 扣減庫存
boolean success = seckillVoucherService.update(new LambdaUpdateWrapper<SeckillVoucher>()
.eq(SeckillVoucher::getVoucherId, voucherId)
// 利用CAS 解決超賣問題 where id = ? and stock = ?
//.eq(SeckillVoucher::getStock, voucher.getStock())
// 利用CAS 解決超賣問題 where id = ? and stock > 0
.gt(SeckillVoucher::getStock, 0)
.setSql("stock = stock - 1"));
if (!success) {
// 扣減庫存失敗
return Result.fail("扣減庫存失敗");
}
// 6. 建立訂單
VoucherOrder voucherOrder = new VoucherOrder();
// 6.1 訂單id
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
// 6.2 使用者id
Long userId = UserHolder.getUser().getId();
voucherOrder.setUserId(userId);
// 6.3 優惠券id
voucherOrder.setVoucherId(voucherId);
// 存庫
this.save(voucherOrder);
// 7. 返回訂單id
return Result.ok(orderId);
}
}
6.5 總結
解決方案
1. 悲觀鎖: 新增同步鎖,讓執行緒序列執行
- 優點: 簡單粗暴
- 缺點: 效能一般
2. 樂觀鎖:不加鎖,在更新時判斷是否有其他執行緒在修改 => 通過資料庫層面解決 => InnoDB儲存引擎通過多版本併發控制(MVCC,Multiversion Concurrency Control)機制解決併發修改問題。
- 優點: 效能好
- 缺點: 存在成功率低的問題
對資料庫的壓力很大
6.6 一人一單
需求: 修改秒殺業務,要求同一個優惠券,一個使用者只能下一單
6.6.1 解決一個人只能下一單(單機)
package com.hmdp.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.hmdp.dto.Result;
import com.hmdp.entity.SeckillVoucher;
import com.hmdp.entity.VoucherOrder;
import com.hmdp.mapper.VoucherOrderMapper;
import com.hmdp.service.ISeckillVoucherService;
import com.hmdp.service.IVoucherOrderService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.hmdp.utils.RedisIdWorker;
import com.hmdp.utils.UserHolder;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
import java.time.LocalDateTime;
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
@Resource
private ISeckillVoucherService seckillVoucherService;
@Resource
private RedisIdWorker redisIdWorker;
/**
* 搶購優惠券
*
* @param voucherId
* @return
*/
@Override
public Result seckillVoucher(Long voucherId) {
// 1. 查詢優惠券
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
// 2. 判斷秒殺是否開始
if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
// 秒殺未開始
return Result.fail("秒殺尚未開始");
}
// 3. 判斷秒殺是否結束
if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
// 秒殺已經結束
return Result.fail("秒殺已經結束");
}
// 4. 判斷庫存是否充足
if (voucher.getStock() < 1) {
// 庫存不足
return Result.fail("庫存不足");
}
return createVoucherOrder(voucherId);
}
/**
* 建立訂單
* @param voucherId
* @return
*/
@Transactional
public Result createVoucherOrder(Long voucherId) {
// 5. 一人只能搶一份
Long userId = UserHolder.getUser().getId();
synchronized (userId.toString().intern()) {
// 5.1 查詢訂單
long count = this.count(new LambdaQueryWrapper<VoucherOrder>()
.eq(VoucherOrder::getUserId, userId)
.eq(VoucherOrder::getVoucherId, voucherId));
// 5.2 判斷是否已經搶過
if (count > 0) {
// 使用者已經購買過
return Result.fail("使用者已經購買過一次");
}
// 6. 扣減庫存
boolean success = seckillVoucherService.update(new LambdaUpdateWrapper<SeckillVoucher>()
.eq(SeckillVoucher::getVoucherId, voucherId)
// 利用CAS 解決超賣問題 where id = ? and stock = ?
//.eq(SeckillVoucher::getStock, voucher.getStock())
// 利用CAS 解決超賣問題 where id = ? and stock > 0
.gt(SeckillVoucher::getStock, 0)
.setSql("stock = stock - 1"));
if (!success) {
// 扣減庫存失敗
return Result.fail("扣減庫存失敗");
}
// 7. 建立訂單
VoucherOrder voucherOrder = new VoucherOrder();
// 7.1 訂單id
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
// 7.2 使用者id
voucherOrder.setUserId(userId);
// 7.3 優惠券id
voucherOrder.setVoucherId(voucherId);
// 存庫
this.save(voucherOrder);
// 8. 返回訂單id
return Result.ok(orderId);
}
}
}
6.6.2 一人一單的併發安全問題(叢集)
通過加鎖可以解決在單機情況下的一人一單安全問題,但是在叢集模式下 就不行了.
6.6.2.1 將服務啟動兩份,埠分別為 8081 和 8082
6.6.2.2 修改nginx
的nginx.conf
檔案,配置反向代理和負載均衡
然後執行 ./nginx -s reload
現在,使用者的請求會在這兩個節點上負載均衡
6.6.3 叢集下一人多單併發問題
如果一個使用者同一秒類,傳送兩次以上請求,請求都會進入到建立訂單業務程式碼中,導致一個人可以下多個訂單的併發問題
出現問題的原理圖:
8081:
8082:
導致一人多單的問題
6.6.4 解決方案 -> 分散式鎖
7. 分散式鎖
7.1 分散式鎖原理
7.1.1 什麼是分散式鎖
分散式鎖: 滿足
分散式系統或叢集
模式下多程序可見並且互斥
的鎖
7.1.2 分散式鎖的實現
分散式鎖的核心是 實現多程序之間互斥,而滿足這一點的方式有很多,常見的有三種:
7.2 基於Redis的分散式鎖
實現分散式鎖時需要實現的兩個基本方法:
7.2.1 獲取鎖
- 互斥: 確保只能由一個執行緒獲取鎖
- 非阻塞: 嘗試一次,成功返回true,失敗返回false
# 新增鎖,EX是設定超時間、NX如果不存在才執行(互斥)
SET lock thread1 EX 10 NX
7.2.2 釋放鎖
- 手動釋放
- 超時釋放: 獲取鎖時新增一個超時時間
# 釋放鎖,刪除即可
DEL key
7.2.3 【初級】分散式鎖
需求: 定義一個類,實現下面介面,利用Redis實現分散式鎖功能
package com.hmdp.utils;
/**
* @author codertl
*/
public interface ILock {
/**
* 嘗試獲取鎖
* @param timeoutSeconds 鎖持有的超時時間,過期後自動釋放鎖
* @return true代表獲取鎖成功,false代表獲取鎖失敗
*/
boolean tryLock(long timeoutSeconds);
/**
* 釋放鎖
*/
void unlock();
}
實現類:
package com.hmdp.utils;
import org.springframework.data.redis.core.StringRedisTemplate;
import java.util.concurrent.TimeUnit;
public class SimpleRedisLock implements ILock{
/**
* 鎖的key名字
*/
private String name;
private StringRedisTemplate stringRedisTemplate;
/**
* 鎖的字首
*/
public static final String KEY_PREFIX = "lock:";
public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {
this.name = name;
this.stringRedisTemplate = stringRedisTemplate;
}
/**
* 嘗試獲取鎖
* @param timeoutSeconds 鎖持有的超時時間,過期後自動釋放鎖
* @return true代表獲取鎖成功,false代表獲取鎖失敗
*/
@Override
public boolean tryLock(long timeoutSeconds) {
// 獲取執行緒標識
long threadId = Thread.currentThread().getId();
// 獲取鎖
Boolean success = stringRedisTemplate.opsForValue()
.setIfAbsent(KEY_PREFIX + name, threadId + "", timeoutSeconds, TimeUnit.SECONDS);
return Boolean.TRUE.equals(success);
}
/**
* 釋放鎖
*/
@Override
public void unlock() {
// 釋放鎖完成
stringRedisTemplate.delete(KEY_PREFIX + name);
}
}
修改建立訂單的邏輯:
/**
* 建立訂單
* @param voucherId
* @return
*/
@Transactional
public Result createVoucherOrder(Long voucherId) {
// 5. 一人只能搶一份
Long userId = UserHolder.getUser().getId();
// 建立鎖物件
SimpleRedisLock redisLock = new SimpleRedisLock("order:" + userId, stringRedisTemplate);
// 嘗試獲取鎖
boolean isLock = redisLock.tryLock(1200);
// 判斷是否獲取鎖
if (!isLock) {
// 獲取鎖失敗,直接返回失敗/重試
return Result.fail("不允許重複搶購!!");
}
try {
// 5.1 查詢訂單
long count = this.count(new LambdaQueryWrapper<VoucherOrder>()
.eq(VoucherOrder::getUserId, userId)
.eq(VoucherOrder::getVoucherId, voucherId));
// 5.2 判斷是否已經搶過
if (count > 0) {
// 使用者已經購買過
return Result.fail("使用者已經購買過一次");
}
// 6. 扣減庫存
boolean success = seckillVoucherService.update(new LambdaUpdateWrapper<SeckillVoucher>()
.eq(SeckillVoucher::getVoucherId, voucherId)
// 利用CAS 解決超賣問題 where id = ? and stock = ?
//.eq(SeckillVoucher::getStock, voucher.getStock())
// 利用CAS 解決超賣問題 where id = ? and stock > 0
.gt(SeckillVoucher::getStock, 0)
.setSql("stock = stock - 1"));
if (!success) {
// 扣減庫存失敗
return Result.fail("扣減庫存失敗");
}
// 7. 建立訂單
VoucherOrder voucherOrder = new VoucherOrder();
// 7.1 訂單id
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
// 7.2 使用者id
voucherOrder.setUserId(userId);
// 7.3 優惠券id
voucherOrder.setVoucherId(voucherId);
// 存庫
this.save(voucherOrder);
// 8. 返回訂單id
return Result.ok(orderId);
} finally {
// 釋放鎖
redisLock.unlock();
}
}
7.2.4 【初級】分散式鎖存在問題
- 業務阻塞導致 鎖被超時自動釋放
- 基於上面的情況,當執行緒1業務執行完成之後,釋放鎖,可能會釋放到其他執行緒的鎖
解決方法
業務邏輯解決流程:
7.2.5 解決分散式鎖->【誤刪】問題
修改之前的分散式鎖實現
- 在獲取鎖時,存入執行緒標識(可以用UUID標識)
- 在釋放鎖時,先獲取鎖中的執行緒標識,判斷是否與當前執行緒標識一致
- 如果一致則釋放鎖
- 如果不一致則不釋放鎖
package com.hmdp.utils;
import cn.hutool.core.lang.UUID;
import org.springframework.data.redis.core.StringRedisTemplate;
import java.util.concurrent.TimeUnit;
public class SimpleRedisLock implements ILock{
/**
* 鎖的key名字
*/
private String name;
private StringRedisTemplate stringRedisTemplate;
/**
* 鎖的字首
*/
public static final String KEY_PREFIX = "lock:";
/**
* 鎖的內容字首
*/
public static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";
public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {
this.name = name;
this.stringRedisTemplate = stringRedisTemplate;
}
/**
* 嘗試獲取鎖
* @param timeoutSeconds 鎖持有的超時時間,過期後自動釋放鎖
* @return true代表獲取鎖成功,false代表獲取鎖失敗
*/
@Override
public boolean tryLock(long timeoutSeconds) {
// 獲取執行緒標識
String threadId = ID_PREFIX + Thread.currentThread().getId();
// 獲取鎖
Boolean success = stringRedisTemplate.opsForValue()
.setIfAbsent(KEY_PREFIX + name, threadId, timeoutSeconds, TimeUnit.SECONDS);
return Boolean.TRUE.equals(success);
}
/**
* 釋放鎖
*/
@Override
public void unlock() {
// 獲取執行緒標識
String threadId = ID_PREFIX + Thread.currentThread().getId();
// 獲取鎖種標識
String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);
// 判斷標識是否是當前執行緒的標識
if (threadId.equals(id)) {
// 釋放鎖完成
stringRedisTemplate.delete(KEY_PREFIX + name);
}
}
}
7.2.6 分散式鎖的原子性問題
【注】:當前依舊存在問題,判斷鎖標識和釋放鎖,並不是原子性操作。
1. 如果執行緒1當判斷鎖成功後,在釋放鎖之前阻塞,然後鎖超時釋放了。
2. 這個時候,別的執行緒獲取到鎖,當前的執行緒1阻塞結束,進行釋放鎖, 依舊會存在鎖誤刪的問題
7.2.7 Lua指令碼解決多條命令的原子性問題
Redis提供了Lua指令碼功能,在一個指令碼中編寫多條redis命令,確保多條命令執行時的原子性。
Lua是一種程式語言,它的語法可以參考: https://www.runoob.com/lua/lua-tutorial.html
- 這裡介紹Redis提供的呼叫函式,語法如下:
-- 執行redis命令
redis.call('命令名稱', 'key', '其他引數', ...)
例如:
-- 執行 set name jack
redis.class('set', 'name', 'jack')
例如: 先執行 set name Rose,再執行 get name,則指令碼如下:
-- 先執行 set name jack
redis.call('set', 'name', 'jack')
-- 再執行 get name
local name = redis.call('get', 'name')
-- 返回
return name
寫好指令碼之後,需要使用Redis命令來呼叫指令碼,呼叫指令碼的常見命令如下:
例如,我們要執行 redis.call('set', 'name', 'jack')
這個指令碼,語法如下:
# 呼叫指令碼 ""當中為指令碼內容 0為指令碼需要的key型別的引數個數
EVAL "return redis.call('set', 'name', 'jack')" 0
如果指令碼中的key、value不想寫死,可以作為引數傳遞。key型別引數會放入KEYS
陣列,其他引數會放入ARGV
陣列,在指令碼中可以從KEYS和ARGV陣列獲取這些引數
# 呼叫指令碼 KEYS[1] => name ARGV[1] => Rose 1為指令碼需要的key型別的引數個數
EVAL "return redis.call('set', KEYS[1], ARGV[1])" 1 name Rose
7.2.7.1 基於Redis的分散式鎖的Lua指令碼
-- 比較執行緒標識與鎖中的標識是否一致
if(redis.call('get', KEYS[1]) == ARGV[1]) then
-- 釋放鎖 del key
return redis.call('del', KEYS[1])
end
return 0
7.2.8 Java呼叫Lua指令碼改造分散式鎖
提示:RedisTemplate呼叫Lua指令碼的API如下:
修改釋放鎖的實現
指令碼內容在resource目錄下新建unlock.lua檔案:
-- 比較執行緒標識與鎖中的標識是否一致
if(redis.call('get', KEYS[1]) == ARGV[1]) then
-- 釋放鎖 del key
return redis.call('del', KEYS[1])
end
return 0
java程式碼:
package com.hmdp.utils;
import cn.hutool.core.lang.UUID;
import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.data.redis.core.script.RedisScript;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
/**
* @Author: tl
* @Description:
* @Date: 2022/5/8
*/
public class SimpleRedisLock implements ILock{
/**
* 鎖的key名字
*/
private String name;
private StringRedisTemplate stringRedisTemplate;
/**
* 鎖的字首
*/
public static final String KEY_PREFIX = "lock:";
/**
* 鎖的內容字首
*/
public static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";
/**
* 載入Lua指令碼
*/
public static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
static {
UNLOCK_SCRIPT = new DefaultRedisScript<>();
// 指令碼檔案位置
UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
// 指令碼返回值
UNLOCK_SCRIPT.setResultType(Long.class);
}
public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {
this.name = name;
this.stringRedisTemplate = stringRedisTemplate;
}
/**
* 嘗試獲取鎖
* @param timeoutSeconds 鎖持有的超時時間,過期後自動釋放鎖
* @return true代表獲取鎖成功,false代表獲取鎖失敗
*/
@Override
public boolean tryLock(long timeoutSeconds) {
// 獲取執行緒標識
String threadId = ID_PREFIX + Thread.currentThread().getId();
// 獲取鎖
Boolean success = stringRedisTemplate.opsForValue()
.setIfAbsent(KEY_PREFIX + name, threadId, timeoutSeconds, TimeUnit.SECONDS);
return Boolean.TRUE.equals(success);
}
/**
* 釋放鎖
*/
@Override
public void unlock() {
// 呼叫 Lua 指令碼釋放鎖
stringRedisTemplate.execute(
UNLOCK_SCRIPT,
Collections.singletonList(KEY_PREFIX + name),
ID_PREFIX + Thread.currentThread().getId()
);
}
}
7.2.9 總結
基於redis的分散式鎖實現思路
1. 利用set nx ex 獲取鎖,並設定過期時間,儲存執行緒標識
2. 釋放鎖先判斷執行緒標識是否與自己一致,一致則刪除鎖
特性
1. 利用 set nx 滿足互斥性
2. 利用 set ex 保證故障時鎖依然能釋放,避免死鎖,提高安全性
3. 利用redis叢集保證高可用和高併發特性
7.3 Redisson實現分散式鎖
基於 set nx 實現的分散式鎖存在下面的問題:
7.3.1 概述
Redisson是一個在Redis的基礎上實現的Java駐記憶體資料網格(In-Memory data Grid)。它不僅提供了一系列的分散式的java常用物件,還提供了許多分散式服務,其中就包含了各種分散式鎖的實現。
官方網址: https://redisson.org/
GitHub地址: https://github.com/redisson/redisson
7.3.2 Redisson快速入門
1.引入依賴
<!-- redisson -->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.14.0</version>
</dependency>
2.配置Redisson客戶端
package com.hmdp.config;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RedissonConfig {
@Bean
public RedissonClient redissonClient() {
// 配置類
Config config = new Config();
// 新增redis地址,這裡添加了單點的地址,也可以使用 config.useClusterServers() 方法新增叢集模式的地址
config.useSingleServer().setAddress("redis://127.0.0.1:6379").setPassword("123345");
// 建立RedissonClient物件
return Redisson.create(config);
}
}
3.使用Redisson的分散式鎖
package com.hmdp.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.hmdp.dto.Result;
import com.hmdp.entity.SeckillVoucher;
import com.hmdp.entity.VoucherOrder;
import com.hmdp.mapper.VoucherOrderMapper;
import com.hmdp.service.ISeckillVoucherService;
import com.hmdp.service.IVoucherOrderService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.hmdp.utils.RedisIdWorker;
import com.hmdp.utils.SimpleRedisLock;
import com.hmdp.utils.UserHolder;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
import java.time.LocalDateTime;
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
@Resource
private ISeckillVoucherService seckillVoucherService;
@Resource
private RedisIdWorker redisIdWorker;
@Resource
private RedissonClient redissonClient;
/**
* 搶購優惠券
*
* @param voucherId
* @return
*/
@Override
public Result seckillVoucher(Long voucherId) {
// 1. 查詢優惠券
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
// 2. 判斷秒殺是否開始
if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
// 秒殺未開始
return Result.fail("秒殺尚未開始");
}
// 3. 判斷秒殺是否結束
if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
// 秒殺已經結束
return Result.fail("秒殺已經結束");
}
// 4. 判斷庫存是否充足
if (voucher.getStock() < 1) {
// 庫存不足
return Result.fail("庫存不足");
}
return createVoucherOrder(voucherId);
}
/**
* 建立訂單
* @param voucherId
* @return
*/
@Transactional
public Result createVoucherOrder(Long voucherId) {
// 5. 一人只能搶一份
Long userId = UserHolder.getUser().getId();
// 使用Redisson建立鎖物件
RLock redisLock = redissonClient.getLock("lock:order:" + userId);
// 使用Redisson嘗試獲取鎖 引數1: 獲取鎖的最大等待時間(期間會重試) 引數2: 鎖自動釋放的事件 引數3:時間單位
boolean isLock = redisLock.tryLock();
// 判斷是否獲取鎖
if (!isLock) {
// 獲取鎖失敗,直接返回失敗/重試
return Result.fail("不允許重複搶購!!");
}
try {
// 5.1 查詢訂單
long count = this.count(new LambdaQueryWrapper<VoucherOrder>()
.eq(VoucherOrder::getUserId, userId)
.eq(VoucherOrder::getVoucherId, voucherId));
// 5.2 判斷是否已經搶過
if (count > 0) {
// 使用者已經購買過
return Result.fail("使用者已經購買過一次");
}
// 6. 扣減庫存
boolean success = seckillVoucherService.update(new LambdaUpdateWrapper<SeckillVoucher>()
.eq(SeckillVoucher::getVoucherId, voucherId)
// 利用CAS 解決超賣問題 where id = ? and stock = ?
//.eq(SeckillVoucher::getStock, voucher.getStock())
// 利用CAS 解決超賣問題 where id = ? and stock > 0
.gt(SeckillVoucher::getStock, 0)
.setSql("stock = stock - 1"));
if (!success) {
// 扣減庫存失敗
return Result.fail("扣減庫存失敗");
}
// 7. 建立訂單
VoucherOrder voucherOrder = new VoucherOrder();
// 7.1 訂單id
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
// 7.2 使用者id
voucherOrder.setUserId(userId);
// 7.3 優惠券id
voucherOrder.setVoucherId(voucherId);
// 存庫
this.save(voucherOrder);
// 8. 返回訂單id
return Result.ok(orderId);
} finally {
// 使用Redisson釋放鎖
redisLock.unlock();
}
}
}
7.3.3 Redisson可重入鎖原理
根據重入次數來記錄可重入鎖
核心: 利用redis的Hash結構,記錄獲取鎖的執行緒以及記錄的次數
package com.hmdp;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.boot.test.context.SpringBootTest;
import javax.annotation.Resource;
@Slf4j
@SpringBootTest
public class RedissonTest {
@Resource
private RedissonClient redissonClient;
private RLock lock;
@BeforeEach
void setUp() {
lock = redissonClient.getLock("order");
}
@Test
void method1() {
// 嘗試獲取鎖
boolean isLock = lock.tryLock();
if (!isLock) {
log.error("獲取鎖失敗 ....1");
return;
}
try {
log.info("獲取鎖成功 ....1");
// 執行業務程式碼
method2();
log.info("執行業務程式碼 ....1");
} finally {
log.warn("準備釋放鎖 ....1");
// 釋放鎖
lock.unlock();
}
}
@Test
void method2() {
// 嘗試獲取鎖
boolean isLock = lock.tryLock();
if (!isLock) {
log.error("獲取鎖失敗 ....2");
return;
}
try {
log.info("獲取鎖成功 ....2");
// 執行業務程式碼
log.info("執行業務程式碼 ....2");
} finally {
log.warn("準備釋放鎖 ....2");
// 釋放鎖
lock.unlock();
}
}
}
7.3.3.1 Redisson獲取鎖核心原始碼
7.3.3.2 Redisson釋放鎖核心原始碼
7.3.4 Redisson的鎖重試和WatchDog機制
7.3.4.1總結
1. 可重入: 利用hash結構記錄執行緒id和重入次數
2. 可重試: 利用訊號量和PubSub功能實現等待、喚醒,獲取鎖失敗的重試機制
3. 超時續約: 利用watchDog,每隔一段時間(releaseTime / 3),重置超時時間
7.3.5 Redisson解決分散式鎖主從一致性問題
使用multiLock 聯鎖解決分散式鎖主從一致性問題
案例:我這裡啟動了三個redis節點
配置:
package com.hmdp.config;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @Author: tl
* @Description:
* @Date: 2022/5/8
*/
@Configuration
public class RedissonConfig {
@Bean
public RedissonClient redissonClient() {
// 配置類
Config config = new Config();
// 新增redis地址,這裡添加了單點的地址,也可以使用 config.useClusterServers() 方法新增叢集模式的地址
config.useSingleServer().setAddress("redis://127.0.0.1:6379").setPassword("123345");
// 建立RedissonClient物件
return Redisson.create(config);
}
@Bean
public RedissonClient redissonClient2() {
// 配置類
Config config = new Config();
// 新增redis地址,這裡添加了單點的地址,也可以使用 config.useClusterServers() 方法新增叢集模式的地址
config.useSingleServer().setAddress("redis://127.0.0.1:6380").setPassword("123345");
// 建立RedissonClient物件
return Redisson.create(config);
}
@Bean
public RedissonClient redissonClient3() {
// 配置類
Config config = new Config();
// 新增redis地址,這裡添加了單點的地址,也可以使用 config.useClusterServers() 方法新增叢集模式的地址
config.useSingleServer().setAddress("redis://127.0.0.1:6381").setPassword("123345");
// 建立RedissonClient物件
return Redisson.create(config);
}
}
7.3.5.1 總結
1. 不可重入Redis分散式鎖:
1.1: 原理: 利用 set nx 的互斥性;利用ex避免死鎖;釋放鎖時判斷執行緒標識
1.2: 缺陷: 不可重入、無法重試、鎖超時失效
2. 可重入的Redis分散式鎖
1.1: 原理: 利用hash結構,記錄執行緒標識和重入次數;利用watchDog延續鎖時間;利用訊號量控制鎖重試等待
1.2: 缺陷: redis宕機引起鎖失效問題
3. Redisson的multiLock
1.1: 原理: 多個獨立的Redis節點,必須在所有節點都獲取重入鎖,才算獲取鎖成功
1.2: 缺陷: 運維成本高、實現複雜
8. Redis優化秒殺
8.1 改進秒殺業務
提高併發效能:
需求:
- 新增秒殺優惠券的同時,將優惠券資訊儲存到Redis中
- 基於Lua指令碼,判斷秒殺庫存、一人一單,決定使用者是否搶購成功
- 如果搶購成功,將優惠券id和使用者id封裝後存入阻塞佇列
- 開啟執行緒任務,不斷從阻塞佇列中獲取資訊,實現非同步下單功能
8.1.2 Lua指令碼完成秒殺資格判斷
resource資料夾下建立
Seckill.lua
-- 1. 引數列表
-- 1.1 優惠券id
local voucherId = ARGV[1]
-- 1.2 使用者id
local userId = ARGV[2]
-- 2. 資料key
-- 2.1 庫存key
local stockKey = 'seckill:stock:' .. voucherId
-- 2.2 訂單key
local orderKey = 'seckill:order:' .. voucherId
-- 3. 指令碼業務
-- 3.1 判斷庫存是否足夠 get stockKey
if(tonumber(redis.call('get', stockKey)) <= 0)then
-- 3.2 庫存不足,返回1
return 1
end
-- 3.2 判斷使用者是否已經下單 SISMEMBER orderKey userId
if(redis.call('sismember', orderKey, userId) == 1)then
-- 3.3 重複下單,返回2
return 2
end
-- 3.4 扣除庫存 incrby stockKey -1
redis.call('incrby', stockKey, -1)
-- 3.5 下單(儲存使用者) sadd orderKey userId
redis.call('sadd', orderKey, userId)
return 0
業務程式碼:
package com.hmdp.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.hmdp.dto.Result;
import com.hmdp.entity.SeckillVoucher;
import com.hmdp.entity.VoucherOrder;
import com.hmdp.mapper.VoucherOrderMapper;
import com.hmdp.service.ISeckillVoucherService;
import com.hmdp.service.IVoucherOrderService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.hmdp.utils.RedisIdWorker;
import com.hmdp.utils.UserHolder;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
import java.util.Collections;
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
@Resource
private ISeckillVoucherService seckillVoucherService;
@Resource
private RedisIdWorker redisIdWorker;
@Resource
private RedissonClient redissonClient;
@Resource
private StringRedisTemplate stringRedisTemplate;
/**
* 載入Lua指令碼
*/
public static final DefaultRedisScript<Long> SECKILL_SCRIPT;
static {
SECKILL_SCRIPT = new DefaultRedisScript<>();
// 指令碼檔案位置
SECKILL_SCRIPT.setLocation(new ClassPathResource("Seckill.lua"));
// 指令碼返回值
SECKILL_SCRIPT.setResultType(Long.class);
}
/**
* 搶購優惠券
*
* @param voucherId
* @return
*/
@Override
public Result seckillVoucher(Long voucherId) {
Long userId = UserHolder.getUser().getId();
// 1. 執行Lua指令碼
Long result = stringRedisTemplate.execute(
SECKILL_SCRIPT,
Collections.emptyList(),
voucherId.toString(),
userId.toString()
);
int r = result.intValue();
// 2. 判斷結果是否為0
// 2.1 不為0,代表沒有購買資格
if (r != 0) {
return Result.fail(r == 1 ? "庫存不足" : "不能重複下單");
}
// 2.2 為0,有購買資格,把下單資訊儲存到阻塞佇列中
long orderId = redisIdWorker.nextId("order");
// TODO: 2022/5/9 這裡需要儲存到阻塞佇列
// 3. 返回訂單id
return Result.ok(orderId);
}
}
8.1.3 搶購成功,將優惠券id和使用者id存入阻塞佇列
private BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024 * 1024);
/**
* 搶購優惠券
*
* @param voucherId
* @return
*/
@Override
public Result seckillVoucher(Long voucherId) {
Long userId = UserHolder.getUser().getId();
// 1. 執行Lua指令碼
Long result = stringRedisTemplate.execute(
SECKILL_SCRIPT,
Collections.emptyList(),
voucherId.toString(),
userId.toString()
);
int r = result.intValue();
// 2. 判斷結果是否為0
// 2.1 不為0,代表沒有購買資格
if (r != 0) {
return Result.fail(r == 1 ? "庫存不足" : "不能重複下單");
}
// 2.2 為0,有購買資格,把下單資訊儲存到阻塞佇列中
VoucherOrder voucherOrder = new VoucherOrder();
// 2.3 訂單id
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
// 2.4 使用者id
voucherOrder.setUserId(userId);
// 2.5 優惠券id
voucherOrder.setVoucherId(voucherId);
// 2.6 放入阻塞佇列
orderTasks.add(voucherOrder);
// 3. 返回訂單id
return Result.ok(orderId);
}
8.1.4 開啟執行緒任務,不斷從阻塞佇列中獲取資訊,實現非同步下單功能
package com.hmdp.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.hmdp.dto.Result;
import com.hmdp.entity.SeckillVoucher;
import com.hmdp.entity.VoucherOrder;
import com.hmdp.mapper.VoucherOrderMapper;
import com.hmdp.service.ISeckillVoucherService;
import com.hmdp.service.IVoucherOrderService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.hmdp.utils.RedisIdWorker;
import com.hmdp.utils.UserHolder;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.Collections;
import java.util.concurrent.*;
@Slf4j
@Service
@Transactional(rollbackFor = Exception.class)
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
@Resource
private ISeckillVoucherService seckillVoucherService;
@Resource
private RedisIdWorker redisIdWorker;
@Resource
private RedissonClient redissonClient;
@Resource
private StringRedisTemplate stringRedisTemplate;
/**
* 載入Lua指令碼
*/
public static final DefaultRedisScript<Long> SECKILL_SCRIPT;
static {
SECKILL_SCRIPT = new DefaultRedisScript<>();
// 指令碼檔案位置
SECKILL_SCRIPT.setLocation(new ClassPathResource("Seckill.lua"));
// 指令碼返回值
SECKILL_SCRIPT.setResultType(Long.class);
}
private BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024 * 1024);
private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();
@PostConstruct
private void init() {
SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
}
private class VoucherOrderHandler implements Runnable {
@Override
public void run() {
while (true) {
try {
// 1. 獲取佇列中的訂單資訊
VoucherOrder voucherOrder = orderTasks.take();
// 2. 建立訂單
createVoucherOrder(voucherOrder);
} catch (InterruptedException e) {
log.error("處理訂單異常", e);
}
}
}
}
/**
* 建立訂單
* @param voucherOrder
* @return
*/
public void createVoucherOrder(VoucherOrder voucherOrder) {
// 5. 一人只能搶一份
Long userId = voucherOrder.getUserId();
Long voucherId = voucherOrder.getVoucherId();
// 使用Redisson建立鎖物件
RLock redisLock = redissonClient.getLock("lock:order:" + userId);
// 使用Redisson嘗試獲取鎖
boolean isLock = redisLock.tryLock();
// 判斷是否獲取鎖
if (!isLock) {
// 獲取鎖失敗,直接返回失敗/重試
log.error("不允許重複搶購!!");
return;
}
try {
// 5.1 查詢訂單
long count = this.count(new LambdaQueryWrapper<VoucherOrder>()
.eq(VoucherOrder::getUserId, userId)
.eq(VoucherOrder::getVoucherId, voucherId));
// 5.2 判斷是否已經搶過
if (count > 0) {
// 使用者已經購買過
log.error("使用者已經購買過!!");
return;
}
// 6. 扣減庫存
boolean success = seckillVoucherService.update(new LambdaUpdateWrapper<SeckillVoucher>()
.eq(SeckillVoucher::getVoucherId, voucherId)
.gt(SeckillVoucher::getStock, 0)
.setSql("stock = stock - 1"));
if (!success) {
// 扣減庫存失敗
log.error("扣減庫存失敗!!");
return ;
}
// 存庫
this.save(voucherOrder);
} finally {
// 使用Redisson釋放鎖
redisLock.unlock();
}
}
/**
* 搶購優惠券
*
* @param voucherId
* @return
*/
@Override
public Result seckillVoucher(Long voucherId) {
Long userId = UserHolder.getUser().getId();
// 1. 執行Lua指令碼
Long result = stringRedisTemplate.execute(
SECKILL_SCRIPT,
Collections.emptyList(),
voucherId.toString(),
userId.toString()
);
int r = result.intValue();
// 2. 判斷結果是否為0
// 2.1 不為0,代表沒有購買資格
if (r != 0) {
return Result.fail(r == 1 ? "庫存不足" : "不能重複下單");
}
// 2.2 為0,有購買資格,把下單資訊儲存到阻塞佇列中
VoucherOrder voucherOrder = new VoucherOrder();
// 2.3 訂單id
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
// 2.4 使用者id
voucherOrder.setUserId(userId);
// 2.5 優惠券id
voucherOrder.setVoucherId(voucherId);
// 2.6 放入阻塞佇列
orderTasks.add(voucherOrder);
// 3. 返回訂單id
return Result.ok(orderId);
}
}
8.1.5 總結
優化思路
- 先利用Redis完成庫存餘量、一人一單判斷,完成搶單業務
- 再講下單業務放入阻塞佇列,利用獨立執行緒非同步下單
基於阻塞佇列的非同步秒殺存在哪些問題?
- 記憶體限制問題
- 資料安全問題
9. Redis訊息佇列實現非同步秒殺
9.1 什麼是訊息佇列
訊息佇列(Message Queue),字面意思就是存放訊息的佇列。最簡單的訊息佇列模型包括3個角色
- 訊息佇列: 儲存和管理訊息,也被稱為訊息代理 (Message Broker)
- 生產者: 傳送訊息到訊息佇列
- 消費者: 從訊息佇列獲取訊息並處理訊息
9.2 基於List結構模擬訊息佇列
訊息佇列(Message Queue),字面意思就是存放訊息的佇列。而Redis的List資料結構是一個雙向連結串列,很容以模擬出佇列效果。
佇列:是入口和出口不在一遍,因此可以利用: LPUSH結合RPOP、或者RPUSH結合LPOP來實現
要注意的是,當佇列中沒有訊息時 RPOP或LPOP操作會返回null, 並不像JVM的阻塞佇列那樣會阻塞並等待訊息。
因此這裡應該使用 BRPOP 或者 BLPOP 來實現阻塞效果。
9.2.1基於list的訊息佇列的優缺點
9.2.1.1 優點
- 利用redis儲存,不受限與JVM記憶體的上限
- 基於Redis的持久化機制,資料安全性有保證
- 可以滿足訊息有序性
9.2.1.2 缺點
- 無法避免訊息丟失
- 只支援單消費者
9.4 基於PubSub的訊息佇列
PubSub(釋出訂閱)是Redis2.0版本引入的訊息傳遞模型。顧名思義,消費者可以訂閱一個或多個Channel,生產者向對應的Channel傳送訊息後,所有訂閱者都能收到相關訊息。
- SubScribe channel [channel] : 訂閱一個或多個頻道
- Publish channel msg : 向一個頻道傳送訊息
- PsubScribe pattern[pattern] : 訂閱與pattern格式匹配的所有頻道
9.4.1 基於PubSub的優缺點
9.4.1.1 優點
- 採用釋出訂閱模型,支援多生產、多消費
9.4.1.2 缺點
- 不支援資料持久化
- 無法避免訊息丟失
- 訊息堆積有上限,超出時資料丟失
9.6 基於Stream的訊息佇列=>(單消費模式)
Stream是Redis5.0引入的一種新資料型別,可以實現一個功能非常完善的訊息佇列
傳送訊息的命令 xadd
:
讀取訊息的方式之一 xread
:
例如: 使用xread讀取第一個訊息
9.6.1 Xread阻塞方式,讀取訊息
在業務開發中,我們可以迴圈的呼叫Xread阻塞方式來查詢最新訊息,從而實現持續監聽佇列的效果,虛擬碼如下:
9.6.2 Stream型別訊息佇列的Xread命令特點
- 訊息可回溯
- 一條訊息可以被多個消費者讀取
- 可以阻塞讀取
- 有訊息漏讀的風險
9.7 基於Stream的訊息佇列=>(消費組模式)
消費組(Consumer Group):將多個消費者劃分到一個組中,監聽同一個佇列。具備下列特點:
建立消費者組:
其他常見命令
從消費者組讀取訊息
9.7.1 消費者監聽訊息的基本思路
9.7.2 XREADGROUP命令特點
- 訊息可回溯
- 可以多消費者爭搶訊息,加快消費速度
- 可以阻塞讀取
- 沒有訊息漏讀的風險
- 有訊息確認機制,保證訊息至少被消費一次
9.8 Redis訊息佇列
9.9 基於Redis的Stream結構作為訊息佇列,實現非同步秒殺下單
需求:
- 建立一個 Stream型別的訊息佇列命名為
stream.orders
- 修改之前的秒殺下單Lua指令碼,在認定有搶購資格後,直接向
stream.orders
中新增訊息,內容包含voucherId
、userId
、orderId
- 專案啟動時,開啟一個執行緒任務,嘗試獲取stream.orders中的訊息,完成下單
9.9.1 建立stream.orders
9.9.2 修改lua指令碼和搶購優惠券的邏輯
-- 1. 引數列表
-- 1.1 優惠券id
local voucherId = ARGV[1]
-- 1.2 使用者id
local userId = ARGV[2]
-- 1.3 訂單id
local orderId = ARGV[3]
-- 2. 資料key
-- 2.1 庫存key
local stockKey = 'seckill:stock:' .. voucherId
-- 2.2 訂單key
local orderKey = 'seckill:order:' .. voucherId
-- 3. 指令碼業務
-- 3.1 判斷庫存是否足夠 get stockKey
if(tonumber(redis.call('get', stockKey)) <= 0)then
-- 3.2 庫存不足,返回1
return 1
end
-- 3.2 判斷使用者是否已經下單 SISMEMBER orderKey userId
if(redis.call('sismember', orderKey, userId) == 1)then
-- 3.3 重複下單,返回2
return 2
end
-- 3.4 扣除庫存 incrby stockKey -1
redis.call('incrby', stockKey, -1)
-- 3.5 下單(儲存使用者) sadd orderKey userId
redis.call('sadd', orderKey, userId)
-- 3.6 傳送訊息到佇列中, XADD stream.orders * k1 v1 k2 v2 ...
redis.call('xadd', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId)
return 0
/**
* 搶購優惠券
*
* @param voucherId
* @return
*/
@Override
public Result seckillVoucher(Long voucherId) {
Long userId = UserHolder.getUser().getId();
long orderId = redisIdWorker.nextId("order");
// 1. 執行Lua指令碼
Long result = stringRedisTemplate.execute(
SECKILL_SCRIPT,
Collections.emptyList(),
voucherId.toString(),
userId.toString(),
String.valueOf(orderId)
);
int r = result.intValue();
// 2. 判斷結果是否為0
// 2.1 不為0,代表沒有購買資格
if (r != 0) {
return Result.fail(r == 1 ? "庫存不足" : "不能重複下單");
}
// 3. 返回訂單id
return Result.ok(orderId);
}
9.9.3 獲取stream.orders中的訊息,完成下單
package com.hmdp.service.impl;
import cn.hutool.core.bean.BeanUtil;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.hmdp.dto.Result;
import com.hmdp.entity.SeckillVoucher;
import com.hmdp.entity.VoucherOrder;
import com.hmdp.mapper.VoucherOrderMapper;
import com.hmdp.service.ISeckillVoucherService;
import com.hmdp.service.IVoucherOrderService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.hmdp.utils.RedisIdWorker;
import com.hmdp.utils.UserHolder;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.connection.stream.*;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
@Slf4j
@Service
@Transactional(rollbackFor = Exception.class)
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
@Resource
private ISeckillVoucherService seckillVoucherService;
@Resource
private RedisIdWorker redisIdWorker;
@Resource
private RedissonClient redissonClient;
@Resource
private StringRedisTemplate stringRedisTemplate;
/**
* 載入Lua指令碼
*/
public static final DefaultRedisScript<Long> SECKILL_SCRIPT;
static {
SECKILL_SCRIPT = new DefaultRedisScript<>();
// 指令碼檔案位置
SECKILL_SCRIPT.setLocation(new ClassPathResource("Seckill.lua"));
// 指令碼返回值
SECKILL_SCRIPT.setResultType(Long.class);
}
private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();
@PostConstruct
private void init() {
SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
}
private class VoucherOrderHandler implements Runnable {
@Override
public void run() {
String queueName = "stream.orders";
while (true) {
try {
// 1. 獲取訊息佇列中的訂單資訊 XREADGROUP GROUP G1 C1 COUNT 1 BLOCK 2000 STREAMS S1 >
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
StreamOffset.create(queueName, ReadOffset.lastConsumed())
);
// 2. 判斷訂單資訊是否為空
if (list == null || list.isEmpty()) {
// 2.1 如果為null,說明沒有訊息,繼續下一次迴圈
continue;
}
// 2.2 解析訊息
MapRecord<String, Object, Object> record = list.get(0);
Map<Object, Object> value = record.getValue();
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
// 3. 建立訂單
createVoucherOrder(voucherOrder);
// 4. 確認訊息 XACK s1 g1 id
stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());
} catch (Exception e) {
log.error("處理訂單異常", e);
hanldependdingList();
}
}
}
private void hanldependdingList() {
String queueName = "stream.orders";
while (true) {
try {
// 1. 獲取pending-list中的訂單資訊 XREADGROUP GROUP G1 C1 COUNT 1 BLOCK 2000 STREAMS S1 >
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1),
StreamOffset.create(queueName, ReadOffset.from("0"))
);
// 2. 判斷訂單資訊是否為空
if (list == null || list.isEmpty()) {
// 2.1 如果為null,說明pendding-list沒有異常訊息,結束迴圈
break;
}
// 2.2 解析訊息
MapRecord<String, Object, Object> record = list.get(0);
Map<Object, Object> value = record.getValue();
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
// 3. 建立訂單
createVoucherOrder(voucherOrder);
// 4. 確認訊息 XACK s1 g1 id
stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());
} catch (Exception e) {
log.error("處理pendding訂單異常", e);
try {
Thread.sleep(200);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
}
}
}
/**
* 建立訂單
* @param voucherOrder
* @return
*/
public void createVoucherOrder(VoucherOrder voucherOrder) {
// 5. 一人只能搶一份
Long userId = voucherOrder.getUserId();
Long voucherId = voucherOrder.getVoucherId();
// 使用Redisson建立鎖物件
RLock redisLock = redissonClient.getLock("lock:order:" + userId);
// 使用Redisson嘗試獲取鎖
boolean isLock = redisLock.tryLock();
// 判斷是否獲取鎖
if (!isLock) {
// 獲取鎖失敗,直接返回失敗/重試
log.error("不允許重複搶購!!");
return;
}
try {
// 5.1 查詢訂單
long count = this.count(new LambdaQueryWrapper<VoucherOrder>()
.eq(VoucherOrder::getUserId, userId)
.eq(VoucherOrder::getVoucherId, voucherId));
// 5.2 判斷是否已經搶過
if (count > 0) {
// 使用者已經購買過
log.error("使用者已經購買過!!");
return;
}
// 6. 扣減庫存
boolean success = seckillVoucherService.update(new LambdaUpdateWrapper<SeckillVoucher>()
.eq(SeckillVoucher::getVoucherId, voucherId)
.gt(SeckillVoucher::getStock, 0)
.setSql("stock = stock - 1"));
if (!success) {
// 扣減庫存失敗
log.error("扣減庫存失敗!!");
return ;
}
// 存庫
this.save(voucherOrder);
} finally {
// 使用Redisson釋放鎖
redisLock.unlock();
}
}
/**
* 搶購優惠券
*
* @param voucherId
* @return
*/
@Override
public Result seckillVoucher(Long voucherId) {
Long userId = UserHolder.getUser().getId();
long orderId = redisIdWorker.nextId("order");
// 1. 執行Lua指令碼
Long result = stringRedisTemplate.execute(
SECKILL_SCRIPT,
Collections.emptyList(),
voucherId.toString(),
userId.toString(),
String.valueOf(orderId)
);
int r = result.intValue();
// 2. 判斷結果是否為0
// 2.1 不為0,代表沒有購買資格
if (r != 0) {
return Result.fail(r == 1 ? "庫存不足" : "不能重複下單");
}
// 3. 返回訂單id
return Result.ok(orderId);
}
}