1. 程式人生 > 其它 >Redis秒殺使用

Redis秒殺使用

1. 快取穿透

客戶端請求的資料在快取中和資料庫中都不存在,這樣快取永遠不會生效,請求都會直接打到資料庫。

1.1 解決方案

1. 快取Null值   -> 優點:實現方便  缺點: 額外記憶體消耗,可能造成短期的不一致
2. 布隆過濾器    -> 優點: 記憶體佔用較少,沒有多餘的key    缺點: 實現複雜,存在誤判的可能
3. 增強id的複雜度,避免被猜測到id的規律
4. 做好資料的基礎格式校驗
5. 加強使用者許可權校驗
6. 做好熱點引數的限流

2. 快取雪崩

同一時段大量的快取key同時失效或者Redis伺服器宕機,導致大量請求到達資料庫,帶來巨大壓力。

2.1 解決方案

1. 給不同的key設定不同的過期時間  -> 隨機時間
2. 利用redis叢集提高服務的可用性  -> 利用redis哨兵機制
3. 給快取業務新增降級限流策略
4. 給業務新增多級快取

3. 快取擊穿

熱點key問題,就是一個被 高併發訪問 並且 快取重建業務比較複雜 的key突然失效了,無數的請求訪問會瞬間給資料庫帶來巨大的衝擊。

3.1 常見的解決方案

3.1.1 互斥鎖

3.1.2 邏輯過期

3.2 解決方案對比

3.3 基於互斥鎖的方式解決

修改根據id查詢商品的介面,基於互斥鎖方式來解決快取擊穿問題

3.3.1 程式碼

package com.hmdp.service.impl;

import cn.hutool.core.util.BooleanUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONUtil;
import com.hmdp.dto.Result;
import com.hmdp.entity.Shop;
import com.hmdp.mapper.ShopMapper;
import com.hmdp.service.IShopService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.hmdp.utils.RedisConstants;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import javax.annotation.Resource;
import java.util.concurrent.TimeUnit;


@Service
public class ShopServiceImpl extends ServiceImpl<ShopMapper, Shop> implements IShopService {

    @Resource
    private StringRedisTemplate stringRedisTemplate;

    /**
     * 根據id查詢商鋪資訊
     * @param id
     * @return
     */
    @Override
    public Result queryById(Long id) {
        // 利用空值解決快取穿透問題
        // Shop shop = queryWithPassThrough(id);

        // 互斥鎖解決快取擊穿
        Shop shop = queryWithMutex(id);
        if (shop == null) {
            return Result.fail("商鋪不存在!");
        }
        // 7. 返回結果
        return Result.ok(shop);
    }

    /**
     * 使用互斥鎖解決快取擊穿
     * @param id
     * @return
     */
    public Shop queryWithMutex(Long id) {
        String key = RedisConstants.CACHE_SHOP_KEY + id;
        // 1. 從 redis 中查詢商品快取
        String shopJson = stringRedisTemplate.opsForValue().get(key);
        // 2. 判斷是否存在
        if (StrUtil.isNotBlank(shopJson)) {
            // 3. 存在, 直接返回結果
            return JSONUtil.toBean(shopJson, Shop.class);
        }
        // 判斷命中的是否是空值
        if(shopJson != null) {
            // 返回錯誤資訊
            return null;
        }
        // 4. 實現快取重建
        // 4.1 獲取互斥鎖
        String lockKey = "lock:shop:" + id;
        Shop shop = null;
        try {
            boolean isLock = tryLock(lockKey);
            // 4.2 判斷是否獲取成功
            if(!isLock) {
                // 4.3 失敗,則休眠並重試
                Thread.sleep(100);
                return queryWithMutex(id);
            }
            // 4.4 成功,根據id查詢資料庫
            shop = getById(id);
            // todo 模擬重建延時
            //Thread.sleep(200);
            // 5. 不存在,返回錯誤
            if (shop == null) {
                // 將空值寫入redis
                stringRedisTemplate.opsForValue().set(key, "", RedisConstants.CACHE_NULL_TTL, TimeUnit.MINUTES);
                // 返回錯誤資訊
                return null;
            }
            // 6. 存在,寫入redis
            stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(shop), RedisConstants.CACHE_SHOP_TTL, TimeUnit.MINUTES);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            // 7. 釋放互斥鎖
            unlock(lockKey);
        }
        // 8. 返回結果
        return shop;
    }

    /**
     * 利用空值解決快取穿透問題
     * @param id
     * @return
     */
    public Shop queryWithPassThrough(Long id) {
        String key = RedisConstants.CACHE_SHOP_KEY + id;
        // 1. 從 redis 中查詢商品快取
        String shopJson = stringRedisTemplate.opsForValue().get(key);
        // 2. 判斷是否存在
        if (StrUtil.isNotBlank(shopJson)) {
            // 3. 存在, 直接返回結果
            return JSONUtil.toBean(shopJson, Shop.class);
        }
        // 判斷命中的是否是空值
        if(shopJson != null) {
            // 返回錯誤資訊
            return null;
        }

        // 4. 不存在,根據id查詢資料庫
        Shop shop = getById(id);
        // 5. 不存在,返回錯誤
        if (shop == null) {
            // 將空值寫入redis
            stringRedisTemplate.opsForValue().set(key, "", RedisConstants.CACHE_NULL_TTL, TimeUnit.MINUTES);
            // 返回錯誤資訊
            return null;
        }
        // 6. 存在,寫入redis
        stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(shop), RedisConstants.CACHE_SHOP_TTL, TimeUnit.MINUTES);
        // 7. 返回結果
        return shop;
    }

    /**
     * 嘗試獲取鎖
     * @param key
     * @return
     */
    private boolean tryLock(String key) {
        Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.SECONDS);
        return BooleanUtil.isTrue(flag);
    }

    /**
     * 釋放鎖
     * @param key
     */
    private void unlock(String key) {
        stringRedisTemplate.delete(key);
    }
}

3.4 基於邏輯過期方式解決

修改根據id查詢商品的業務,基於邏輯過期方式來解決快取擊穿問題

3.4.1 程式碼

package com.hmdp.service.impl;

import cn.hutool.core.util.BooleanUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.hmdp.dto.Result;
import com.hmdp.entity.Shop;
import com.hmdp.mapper.ShopMapper;
import com.hmdp.service.IShopService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.hmdp.utils.RedisConstants;
import com.hmdp.utils.RedisData;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.util.concurrent.*;

@Service
public class ShopServiceImpl extends ServiceImpl<ShopMapper, Shop> implements IShopService {

    @Resource
    private StringRedisTemplate stringRedisTemplate;

    /**
     * 根據id查詢商鋪資訊
     * @param id
     * @return
     */
    @Override
    public Result queryById(Long id) {
        // 利用空值解決快取穿透問題
        // Shop shop = queryWithPassThrough(id);

        // 互斥鎖解決快取擊穿
        //Shop shop = queryWithMutex(id);

        // 使用邏輯過期解決快取擊穿
        Shop shop = queryWithLogicalExpire(id);
        if (shop == null) {
            return Result.fail("商鋪不存在!");
        }
        // 7. 返回結果
        return Result.ok(shop);
    }

    /**
     * 定義執行緒池, 建立10個執行緒
     */
    private static final ExecutorService CACHE_REBUILD_EXECUTOR = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<Runnable>(10), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());

    /**
     * 使用邏輯過期解決快取擊穿
     * @param id
     * @return
     */
    public Shop queryWithLogicalExpire(Long id) {
        String key = RedisConstants.CACHE_SHOP_KEY + id;
        // 1. 從 redis 中查詢商品快取
        String shopJson = stringRedisTemplate.opsForValue().get(key);
        // 2. 判斷是否存在
        if (StrUtil.isBlank(shopJson)) {
            // 3. 不存在, 直接返回
            return null;
        }
        // 4. 命中,將json反序列化為物件
        RedisData redisData = JSONUtil.toBean(shopJson, RedisData.class);
        Shop shop = JSONUtil.toBean((JSONObject) redisData.getData(), Shop.class);
        LocalDateTime expireTime = redisData.getExpireTime();
        // 5. 判斷邏輯過期時間是否過期
        if (LocalDateTime.now().isBefore(expireTime)) {
            // 5.1 未過期,直接返回商鋪資訊
            return shop;
        }
        // 5.2 已過期,需要快取重建
        // 6. 快取重建
        // 6.1 獲取互斥鎖
        String lockKey = RedisConstants.LOCK_SHOP_KEY + id;
        boolean isLock = tryLock(lockKey);
        // 6.2 判斷是否獲取鎖成功
        if (isLock) {
            // 6.3 成功,開啟獨立執行緒,實現快取重建
            CACHE_REBUILD_EXECUTOR.submit(() -> {
                try {
                    // 重建快取
                    this.saveShop2Redis(id, 20L);
                } catch (Exception e) {
                    throw new RuntimeException(e);
                } finally {
                    // 釋放鎖
                    unlock(lockKey);
                }
            });
        }
        // 6.4 返回過期的商鋪資訊
        // 7. 返回結果
        return shop;
    }

   
    /**
     * 利用空值解決快取穿透問題
     * @param id
     * @return
     */
    public Shop queryWithPassThrough(Long id) {
        String key = RedisConstants.CACHE_SHOP_KEY + id;
        // 1. 從 redis 中查詢商品快取
        String shopJson = stringRedisTemplate.opsForValue().get(key);
        // 2. 判斷是否存在
        if (StrUtil.isNotBlank(shopJson)) {
            // 3. 存在, 直接返回結果
            return JSONUtil.toBean(shopJson, Shop.class);
        }
        // 判斷命中的是否是空值
        if(shopJson != null) {
            // 返回錯誤資訊
            return null;
        }

        // 4. 不存在,根據id查詢資料庫
        Shop shop = getById(id);
        // 5. 不存在,返回錯誤
        if (shop == null) {
            // 將空值寫入redis
            stringRedisTemplate.opsForValue().set(key, "", RedisConstants.CACHE_NULL_TTL, TimeUnit.MINUTES);
            // 返回錯誤資訊
            return null;
        }
        // 6. 存在,寫入redis
        stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(shop), RedisConstants.CACHE_SHOP_TTL, TimeUnit.MINUTES);
        // 7. 返回結果
        return shop;
    }

    /**
     * 嘗試獲取鎖
     * @param key
     * @return
     */
    private boolean tryLock(String key) {
        Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.SECONDS);
        return BooleanUtil.isTrue(flag);
    }

    /**
     * 釋放鎖
     * @param key
     */
    private void unlock(String key) {
        stringRedisTemplate.delete(key);
    }

    /**
     * 存入商鋪資訊附帶過期時間到redis
     * @param id
     */
    public void saveShop2Redis(Long id, Long expireSeconds) {
        // 1. 查詢店鋪資料
        Shop shop = this.getById(id);
        // todo 模擬重建延時
        //try {
        //    Thread.sleep(200);
        //} catch (InterruptedException e) {
        //    e.printStackTrace();
        //}
        // 2. 封裝邏輯過期時間
        RedisData redisData = new RedisData();
        redisData.setExpireTime(LocalDateTime.now().plusSeconds(expireSeconds));
        redisData.setData(shop);
        // 3. 寫入redis
        stringRedisTemplate.opsForValue()
                .set(RedisConstants.CACHE_SHOP_KEY + id, JSONUtil.toJsonStr(redisData));
    }
}

4. 快取工具封裝

方法1:將任意的java物件序列化為json並存儲在String型別的key中,並且可以設定TLL過期時間

方法2:將任意的java物件序列化為json並存儲在String型別的key中,並且可以設定邏輯過期時間,用於處理快取擊穿問題

方法3:根據指定的key查詢快取,並反序列化為指定型別,利用快取空值的方式解決快取穿透問題

方法4:根據指定的key查詢快取,並反序列化為指定型別,需要利用邏輯時間解決快取擊穿問題

package com.hmdp.utils;

import cn.hutool.core.util.BooleanUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.hmdp.entity.Shop;
import com.sun.xml.internal.bind.v2.model.core.ID;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;

import java.time.LocalDateTime;
import java.util.concurrent.*;
import java.util.function.Function;


@Slf4j
@Component
public class CacheClient {

    private final StringRedisTemplate stringRedisTemplate;

    public CacheClient(StringRedisTemplate stringRedisTemplate) {
        this.stringRedisTemplate = stringRedisTemplate;
    }

    /**
     * 將任意的java物件序列化為json並存儲在String型別的key中,
     * 並且可以設定TLL過期時間
     * @param key 快取key
     * @param value 快取物件
     * @param time 過期時間
     * @param timeUnit 時間單位
     */
    public void set(String key, Object value, Long time, TimeUnit timeUnit) {
        stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(value), time, timeUnit);
    }

    /**
     * 將任意的java物件序列化為json並存儲在String型別的key中,
     * 並且可以設定TLL過期時間,並且可以設定邏輯過期時間,用於處理快取擊穿問題
     * @param key 快取key
     * @param value 快取物件
     * @param time 過期時間
     * @param timeUnit 時間單位
     */
    public void setWithLogicalExpire(String key, Object value, Long time, TimeUnit timeUnit) {
        // 設定邏輯過期時間
        RedisData redisData = new RedisData();
        redisData.setExpireTime(LocalDateTime.now().plusSeconds(timeUnit.toSeconds(time)));
        redisData.setData(value);
        // 寫入 redis
        stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(redisData));
    }


    /**
     * 根據指定的key查詢快取,並反序列化為指定型別,
     * 利用快取空值的方式解決快取穿透問題
     * @param keyPrefix key字首
     * @param id 查詢的id
     * @param type 返回值型別
     * @param dbFallBack 查詢資料庫的回撥函式
     * @param time 過期時間
     * @param timeUnit 時間單位
     * @param <R> 返回值型別
     * @param <ID> id型別
     * @return
     */
    public <R, ID> R queryWithPassThrough(
            String keyPrefix,
            ID id,
            Class<R> type,
            Function<ID, R> dbFallBack,
            Long time,
            TimeUnit timeUnit
    ) {
        String key = keyPrefix + id;
        // 1. 從 redis 中查詢快取
        String json = stringRedisTemplate.opsForValue().get(key);
        // 2. 判斷是否存在
        if (StrUtil.isNotBlank(json)) {
            // 3. 存在, 直接返回結果
            return JSONUtil.toBean(json, type);
        }
        // 判斷命中的是否是空值
        if(json != null) {
            // 返回錯誤資訊
            return null;
        }

        // 4. 不存在,根據id查詢資料庫
        R r = dbFallBack.apply(id);
        // 5. 不存在,返回錯誤
        if (r == null) {
            // 將空值寫入redis
            stringRedisTemplate.opsForValue().set(key, "", RedisConstants.CACHE_NULL_TTL, TimeUnit.MINUTES);
            // 返回錯誤資訊
            return null;
        }
        // 6. 存在,寫入redis
        this.set(key, r, time, timeUnit);
        // 7. 返回結果
        return r;
    }

    /**
     * 定義執行緒池, 建立10個執行緒
     */
    private static final ExecutorService CACHE_REBUILD_EXECUTOR = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<Runnable>(10), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());

    /**
     * 使用邏輯過期解決快取擊穿問題
     * @param keyPrefix key字首
     * @param id 查詢的id
     * @param type 返回值型別
     * @param dbFallBack 查詢資料庫的回撥函式
     * @param time 過期時間
     * @param timeUnit 時間單位
     * @param <R> 返回值型別
     * @param <ID> id型別
     * @return
     */
    public <R, ID> R queryWithLogicalExpire(
            String keyPrefix,
            ID id,
            Class<R> type,
            Function<ID, R> dbFallBack,
            Long time,
            TimeUnit timeUnit
    ) {
        String key = keyPrefix + id;
        // 1. 從 redis 中查詢快取
        String json = stringRedisTemplate.opsForValue().get(key);
        // 2. 判斷是否存在
        if (StrUtil.isBlank(json)) {
            // 3. 不存在, 直接返回
            return null;
        }
        // 4. 命中,將json反序列化為物件
        RedisData redisData = JSONUtil.toBean(json, RedisData.class);
        R r = JSONUtil.toBean((JSONObject) redisData.getData(), type);
        LocalDateTime expireTime = redisData.getExpireTime();
        // 5. 判斷邏輯過期時間是否過期
        if (LocalDateTime.now().isBefore(expireTime)) {
            // 5.1 未過期,直接返回資訊
            return r;
        }
        // 5.2 已過期,需要快取重建
        // 6. 快取重建
        // 6.1 獲取互斥鎖
        String lockKey = RedisConstants.LOCK_SHOP_KEY + id;
        boolean isLock = tryLock(lockKey);
        // 6.2 判斷是否獲取鎖成功
        if (isLock) {
            // 6.3 成功,開啟獨立執行緒,實現快取重建
            CACHE_REBUILD_EXECUTOR.submit(() -> {
                try {
                    // 重建快取
                    // 1. 查詢資料庫
                    R r1 = dbFallBack.apply(id);
                    // 2. 寫入redis
                    this.setWithLogicalExpire(key, r1, time, timeUnit);
                } catch (Exception e) {
                    throw new RuntimeException(e);
                } finally {
                    // 釋放鎖
                    unlock(lockKey);
                }
            });
        }
        // 7. 返回結果
        return r;
    }


    /**
     * 嘗試獲取鎖
     * @param key
     * @return
     */
    private boolean tryLock(String key) {
        Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.SECONDS);
        return BooleanUtil.isTrue(flag);
    }

    /**
     * 釋放鎖
     * @param key
     */
    private void unlock(String key) {
        stringRedisTemplate.delete(key);
    }
}

5. 全域性ID生成器

為了增加ID的安全性,可以不直接使用Redis自增的數值,而是拼接一些其他資訊:

5.1 工具類

package com.hmdp.utils;

import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;

import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;

/**
 * @Description: Redis id生成器
 * @Date: 2022/5/7 13:00
 */
@Component
public class RedisIdWorker {

    /**
     * 開始時間戳
     */
    public static final long BEGIN_TIMESTAMP = 1640995200L;

    /**
     * 序列號位數
     */
    public static final int COUNT_BITS = 32;

    private StringRedisTemplate stringRedisTemplate;

    public RedisIdWorker(StringRedisTemplate stringRedisTemplate) {
        this.stringRedisTemplate = stringRedisTemplate;
    }

    /**
     * 唯一id生成
     * @param keyPrefix 不同業務的key字首
     * @return
     */
    public long nextId(String keyPrefix) {
        // 1. 生成時間戳
        LocalDateTime now = LocalDateTime.now();
        long nowSecond = now.toEpochSecond(ZoneOffset.UTC);
        long timestamp = nowSecond - BEGIN_TIMESTAMP;
        // 2. 生成序列號
        // 2.1 獲取當前日期, 精確到天
        String date = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));
        // 2.2 自增長
        long count = stringRedisTemplate.opsForValue().increment("icr:" + keyPrefix + ":" + date);
        // 3. 拼接並返回
        return timestamp << COUNT_BITS | count;
    }
}

5.2 總結

5.2.1 全域性唯一ID生成策略

1. UUID
2. Redis自增
3. snowflack演算法
4. 資料庫自增

5.2.2 Redis自增ID策略

1. 每天一個key,方便統計訂單量

6. 優惠券秒殺

判斷兩點:

  1. 秒殺是否開始或結束,如果尚未開始或已經結束則無法下單
  2. 庫存是否,不足則無法下單

6.1 普通搶購優惠券的程式碼

/**
 * 搶購優惠券
 * @param voucherId
 * @return
 */
@Override
@Transactional
public Result seckillVoucher(Long voucherId) {
    // 1. 查詢優惠券
    SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
    // 2. 判斷秒殺是否開始
    if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
        // 秒殺未開始
        return Result.fail("秒殺尚未開始");
    }
    // 3. 判斷秒殺是否結束
    if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
        // 秒殺已經結束
        return Result.fail("秒殺已經結束");
    }
    // 4. 判斷庫存是否充足
    if (voucher.getStock() < 1) {
        // 庫存不足
        return Result.fail("庫存不足");
    }
    // 5. 扣減庫存
    boolean success = seckillVoucherService.update(new LambdaUpdateWrapper<SeckillVoucher>()
                                                   .eq(SeckillVoucher::getVoucherId, voucherId)
                                                   .set(SeckillVoucher::getStock, voucher.getStock() - 1));
    if (!success) {
        // 扣減庫存失敗
        return Result.fail("扣減庫存失敗");
    }
    // 6. 建立訂單
    VoucherOrder voucherOrder = new VoucherOrder();
    // 6.1 訂單id
    long orderId = redisIdWorker.nextId("order");
    voucherOrder.setId(orderId);
    // 6.2 使用者id
    Long userId = UserHolder.getUser().getId();
    voucherOrder.setUserId(userId);
    // 6.3 優惠券id
    voucherOrder.setVoucherId(voucherId);
    // 存庫
    this.save(voucherOrder);
    // 7. 返回訂單id
    return Result.ok(orderId);
}

6.2 出現超賣問題

超賣問題就是典型的多執行緒安全問題

6.3 常見解決方案

加鎖

6.3.1 樂觀鎖解決思路

樂觀鎖的關鍵是 判斷之前查詢得到的資料是否有被修改過。常見的方式有兩種:

6.3.2 悲觀鎖

略,加synchronized 或者 Lock即可

6.4 樂觀鎖 (CAS)解決

package com.hmdp.service.impl;

import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.hmdp.dto.Result;
import com.hmdp.entity.SeckillVoucher;
import com.hmdp.entity.VoucherOrder;
import com.hmdp.mapper.VoucherOrderMapper;
import com.hmdp.service.ISeckillVoucherService;
import com.hmdp.service.IVoucherOrderService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.hmdp.utils.RedisIdWorker;
import com.hmdp.utils.UserHolder;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import javax.annotation.Resource;
import java.time.LocalDateTime;


@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {

    @Resource
    private ISeckillVoucherService seckillVoucherService;

    @Resource
    private RedisIdWorker redisIdWorker;

    /**
     * 搶購優惠券
     * @param voucherId
     * @return
     */
    @Override
    @Transactional
    public Result seckillVoucher(Long voucherId) {
        // 1. 查詢優惠券
        SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
        // 2. 判斷秒殺是否開始
        if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
            // 秒殺未開始
            return Result.fail("秒殺尚未開始");
        }
        // 3. 判斷秒殺是否結束
        if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
            // 秒殺已經結束
            return Result.fail("秒殺已經結束");
        }
        // 4. 判斷庫存是否充足
        if (voucher.getStock() < 1) {
            // 庫存不足
            return Result.fail("庫存不足");
        }
        // 5. 扣減庫存
        boolean success = seckillVoucherService.update(new LambdaUpdateWrapper<SeckillVoucher>()
                .eq(SeckillVoucher::getVoucherId, voucherId)
                // 利用CAS 解決超賣問題   where id = ? and stock = ?
                //.eq(SeckillVoucher::getStock, voucher.getStock())
                // 利用CAS 解決超賣問題   where id = ? and stock > 0
                .gt(SeckillVoucher::getStock, 0)
                .setSql("stock = stock - 1"));
        if (!success) {
            // 扣減庫存失敗
            return Result.fail("扣減庫存失敗");
        }
        // 6. 建立訂單
        VoucherOrder voucherOrder = new VoucherOrder();
        // 6.1 訂單id
        long orderId = redisIdWorker.nextId("order");
        voucherOrder.setId(orderId);
        // 6.2 使用者id
        Long userId = UserHolder.getUser().getId();
        voucherOrder.setUserId(userId);
        // 6.3 優惠券id
        voucherOrder.setVoucherId(voucherId);
        // 存庫
        this.save(voucherOrder);
        // 7. 返回訂單id
        return Result.ok(orderId);
    }
}

6.5 總結

解決方案

1. 悲觀鎖: 新增同步鎖,讓執行緒序列執行
 - 優點: 簡單粗暴
 - 缺點: 效能一般
2. 樂觀鎖:不加鎖,在更新時判斷是否有其他執行緒在修改 => 通過資料庫層面解決 => InnoDB儲存引擎通過多版本併發控制(MVCC,Multiversion Concurrency Control)機制解決併發修改問題。
 - 優點: 效能好
 - 缺點: 存在成功率低的問題
       對資料庫的壓力很大

6.6 一人一單

需求: 修改秒殺業務,要求同一個優惠券,一個使用者只能下一單

6.6.1 解決一個人只能下一單(單機)

package com.hmdp.service.impl;

import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.hmdp.dto.Result;
import com.hmdp.entity.SeckillVoucher;
import com.hmdp.entity.VoucherOrder;
import com.hmdp.mapper.VoucherOrderMapper;
import com.hmdp.service.ISeckillVoucherService;
import com.hmdp.service.IVoucherOrderService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.hmdp.utils.RedisIdWorker;
import com.hmdp.utils.UserHolder;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import javax.annotation.Resource;
import java.time.LocalDateTime;


@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {

    @Resource
    private ISeckillVoucherService seckillVoucherService;

    @Resource
    private RedisIdWorker redisIdWorker;

    /**
     * 搶購優惠券
     *
     * @param voucherId
     * @return
     */
    @Override
    public Result seckillVoucher(Long voucherId) {
        // 1. 查詢優惠券
        SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
        // 2. 判斷秒殺是否開始
        if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
            // 秒殺未開始
            return Result.fail("秒殺尚未開始");
        }
        // 3. 判斷秒殺是否結束
        if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
            // 秒殺已經結束
            return Result.fail("秒殺已經結束");
        }
        // 4. 判斷庫存是否充足
        if (voucher.getStock() < 1) {
            // 庫存不足
            return Result.fail("庫存不足");
        }
        return createVoucherOrder(voucherId);
    }

    /**
     * 建立訂單
     * @param voucherId
     * @return
     */
    @Transactional
    public Result createVoucherOrder(Long voucherId) {
        // 5. 一人只能搶一份
        Long userId = UserHolder.getUser().getId();

        synchronized (userId.toString().intern()) {
            // 5.1 查詢訂單
            long count = this.count(new LambdaQueryWrapper<VoucherOrder>()
                    .eq(VoucherOrder::getUserId, userId)
                    .eq(VoucherOrder::getVoucherId, voucherId));
            // 5.2 判斷是否已經搶過
            if (count > 0) {
                // 使用者已經購買過
                return Result.fail("使用者已經購買過一次");
            }
            // 6. 扣減庫存
            boolean success = seckillVoucherService.update(new LambdaUpdateWrapper<SeckillVoucher>()
                    .eq(SeckillVoucher::getVoucherId, voucherId)
                    // 利用CAS 解決超賣問題   where id = ? and stock = ?
                    //.eq(SeckillVoucher::getStock, voucher.getStock())
                    // 利用CAS 解決超賣問題   where id = ? and stock > 0
                    .gt(SeckillVoucher::getStock, 0)
                    .setSql("stock = stock - 1"));
            if (!success) {
                // 扣減庫存失敗
                return Result.fail("扣減庫存失敗");
            }
            // 7. 建立訂單
            VoucherOrder voucherOrder = new VoucherOrder();
            // 7.1 訂單id
            long orderId = redisIdWorker.nextId("order");
            voucherOrder.setId(orderId);
            // 7.2 使用者id
            voucherOrder.setUserId(userId);
            // 7.3 優惠券id
            voucherOrder.setVoucherId(voucherId);
            // 存庫
            this.save(voucherOrder);
            // 8. 返回訂單id
            return Result.ok(orderId);
        }
    }
}

6.6.2 一人一單的併發安全問題(叢集)

通過加鎖可以解決在單機情況下的一人一單安全問題,但是在叢集模式下 就不行了.

6.6.2.1 將服務啟動兩份,埠分別為 8081 和 8082
6.6.2.2 修改nginxnginx.conf檔案,配置反向代理和負載均衡

然後執行 ./nginx -s reload

現在,使用者的請求會在這兩個節點上負載均衡

6.6.3 叢集下一人多單併發問題

如果一個使用者同一秒類,傳送兩次以上請求,請求都會進入到建立訂單業務程式碼中,導致一個人可以下多個訂單的併發問題

出現問題的原理圖:

8081:

8082:

導致一人多單的問題

6.6.4 解決方案 -> 分散式鎖

7. 分散式鎖

7.1 分散式鎖原理

7.1.1 什麼是分散式鎖

分散式鎖: 滿足分散式系統或叢集模式下 多程序可見並且互斥的鎖

7.1.2 分散式鎖的實現

分散式鎖的核心是 實現多程序之間互斥,而滿足這一點的方式有很多,常見的有三種:

7.2 基於Redis的分散式鎖

實現分散式鎖時需要實現的兩個基本方法:

7.2.1 獲取鎖

- 互斥: 確保只能由一個執行緒獲取鎖
- 非阻塞: 嘗試一次,成功返回true,失敗返回false
# 新增鎖,EX是設定超時間、NX如果不存在才執行(互斥)
SET lock thread1 EX 10 NX

7.2.2 釋放鎖

- 手動釋放
- 超時釋放: 獲取鎖時新增一個超時時間
# 釋放鎖,刪除即可
DEL key

7.2.3 【初級】分散式鎖

需求: 定義一個類,實現下面介面,利用Redis實現分散式鎖功能

package com.hmdp.utils;

/**
 * @author codertl
 */
public interface ILock {

    /**
     * 嘗試獲取鎖
     * @param timeoutSeconds 鎖持有的超時時間,過期後自動釋放鎖
     * @return true代表獲取鎖成功,false代表獲取鎖失敗
     */
    boolean tryLock(long timeoutSeconds);

    /**
     * 釋放鎖
     */
    void unlock();
}

實現類:

package com.hmdp.utils;

import org.springframework.data.redis.core.StringRedisTemplate;
import java.util.concurrent.TimeUnit;


public class SimpleRedisLock implements ILock{

    /**
     * 鎖的key名字
     */
    private String name;

    private StringRedisTemplate stringRedisTemplate;

    /**
     * 鎖的字首
     */
    public static final String KEY_PREFIX = "lock:";

    public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {
        this.name = name;
        this.stringRedisTemplate = stringRedisTemplate;
    }

    /**
     * 嘗試獲取鎖
     * @param timeoutSeconds 鎖持有的超時時間,過期後自動釋放鎖
     * @return true代表獲取鎖成功,false代表獲取鎖失敗
     */
    @Override
    public boolean tryLock(long timeoutSeconds) {
        // 獲取執行緒標識
        long threadId = Thread.currentThread().getId();
        // 獲取鎖
        Boolean success = stringRedisTemplate.opsForValue()
                .setIfAbsent(KEY_PREFIX + name, threadId + "", timeoutSeconds, TimeUnit.SECONDS);
        return Boolean.TRUE.equals(success);
    }

    /**
     * 釋放鎖
     */
    @Override
    public void unlock() {
        // 釋放鎖完成
        stringRedisTemplate.delete(KEY_PREFIX + name);
    }
}

修改建立訂單的邏輯:

/**
 * 建立訂單
 * @param voucherId
 * @return
 */
@Transactional
public Result createVoucherOrder(Long voucherId) {
    // 5. 一人只能搶一份
    Long userId = UserHolder.getUser().getId();

    // 建立鎖物件
    SimpleRedisLock redisLock = new SimpleRedisLock("order:" + userId, stringRedisTemplate);
    // 嘗試獲取鎖
    boolean isLock = redisLock.tryLock(1200);
    // 判斷是否獲取鎖
    if (!isLock) {
        // 獲取鎖失敗,直接返回失敗/重試
        return Result.fail("不允許重複搶購!!");
    }

    try {
        // 5.1 查詢訂單
        long count = this.count(new LambdaQueryWrapper<VoucherOrder>()
                                .eq(VoucherOrder::getUserId, userId)
                                .eq(VoucherOrder::getVoucherId, voucherId));
        // 5.2 判斷是否已經搶過
        if (count > 0) {
            // 使用者已經購買過
            return Result.fail("使用者已經購買過一次");
        }
        // 6. 扣減庫存
        boolean success = seckillVoucherService.update(new LambdaUpdateWrapper<SeckillVoucher>()
                                                       .eq(SeckillVoucher::getVoucherId, voucherId)
                                                       // 利用CAS 解決超賣問題   where id = ? and stock = ?
                                                       //.eq(SeckillVoucher::getStock, voucher.getStock())
                                                       // 利用CAS 解決超賣問題   where id = ? and stock > 0
                                                       .gt(SeckillVoucher::getStock, 0)
                                                       .setSql("stock = stock - 1"));
        if (!success) {
            // 扣減庫存失敗
            return Result.fail("扣減庫存失敗");
        }
        // 7. 建立訂單
        VoucherOrder voucherOrder = new VoucherOrder();
        // 7.1 訂單id
        long orderId = redisIdWorker.nextId("order");
        voucherOrder.setId(orderId);
        // 7.2 使用者id
        voucherOrder.setUserId(userId);
        // 7.3 優惠券id
        voucherOrder.setVoucherId(voucherId);
        // 存庫
        this.save(voucherOrder);
        // 8. 返回訂單id
        return Result.ok(orderId);
    } finally {
        // 釋放鎖
        redisLock.unlock();
    }
}

7.2.4 【初級】分散式鎖存在問題

  1. 業務阻塞導致 鎖被超時自動釋放
  2. 基於上面的情況,當執行緒1業務執行完成之後,釋放鎖,可能會釋放到其他執行緒的鎖

解決方法

業務邏輯解決流程:

7.2.5 解決分散式鎖->【誤刪】問題

修改之前的分散式鎖實現

  1. 在獲取鎖時,存入執行緒標識(可以用UUID標識)
  2. 在釋放鎖時,先獲取鎖中的執行緒標識,判斷是否與當前執行緒標識一致
    1. 如果一致則釋放鎖
    2. 如果不一致則不釋放鎖
package com.hmdp.utils;

import cn.hutool.core.lang.UUID;
import org.springframework.data.redis.core.StringRedisTemplate;

import java.util.concurrent.TimeUnit;


public class SimpleRedisLock implements ILock{

    /**
     * 鎖的key名字
     */
    private String name;

    private StringRedisTemplate stringRedisTemplate;

    /**
     * 鎖的字首
     */
    public static final String KEY_PREFIX = "lock:";

    /**
     * 鎖的內容字首
     */
    public static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";

    public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {
        this.name = name;
        this.stringRedisTemplate = stringRedisTemplate;
    }

    /**
     * 嘗試獲取鎖
     * @param timeoutSeconds 鎖持有的超時時間,過期後自動釋放鎖
     * @return true代表獲取鎖成功,false代表獲取鎖失敗
     */
    @Override
    public boolean tryLock(long timeoutSeconds) {
        // 獲取執行緒標識
        String threadId = ID_PREFIX + Thread.currentThread().getId();
        // 獲取鎖
        Boolean success = stringRedisTemplate.opsForValue()
                .setIfAbsent(KEY_PREFIX + name, threadId, timeoutSeconds, TimeUnit.SECONDS);
        return Boolean.TRUE.equals(success);
    }

    /**
     * 釋放鎖
     */
    @Override
    public void unlock() {
        // 獲取執行緒標識
        String threadId = ID_PREFIX + Thread.currentThread().getId();
        // 獲取鎖種標識
        String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);
        // 判斷標識是否是當前執行緒的標識
        if (threadId.equals(id)) {
            // 釋放鎖完成
            stringRedisTemplate.delete(KEY_PREFIX + name);
        }
    }
}

7.2.6 分散式鎖的原子性問題

【注】:當前依舊存在問題,判斷鎖標識和釋放鎖,並不是原子性操作。

1. 如果執行緒1當判斷鎖成功後,在釋放鎖之前阻塞,然後鎖超時釋放了。

2. 這個時候,別的執行緒獲取到鎖,當前的執行緒1阻塞結束,進行釋放鎖, 依舊會存在鎖誤刪的問題

7.2.7 Lua指令碼解決多條命令的原子性問題

Redis提供了Lua指令碼功能,在一個指令碼中編寫多條redis命令,確保多條命令執行時的原子性。

Lua是一種程式語言,它的語法可以參考: https://www.runoob.com/lua/lua-tutorial.html

- 這裡介紹Redis提供的呼叫函式,語法如下:
-- 執行redis命令
redis.call('命令名稱', 'key', '其他引數', ...)

例如:

-- 執行 set name jack
redis.class('set', 'name', 'jack')

例如: 先執行 set name Rose,再執行 get name,則指令碼如下:

-- 先執行 set name jack
redis.call('set', 'name', 'jack')
-- 再執行 get name
local name = redis.call('get', 'name')
-- 返回
return name

寫好指令碼之後,需要使用Redis命令來呼叫指令碼,呼叫指令碼的常見命令如下:

例如,我們要執行 redis.call('set', 'name', 'jack')這個指令碼,語法如下:

# 呼叫指令碼  ""當中為指令碼內容  0為指令碼需要的key型別的引數個數
EVAL "return redis.call('set', 'name', 'jack')" 0

如果指令碼中的key、value不想寫死,可以作為引數傳遞。key型別引數會放入KEYS陣列,其他引數會放入ARGV陣列,在指令碼中可以從KEYS和ARGV陣列獲取這些引數

# 呼叫指令碼 KEYS[1] => name  ARGV[1] => Rose   1為指令碼需要的key型別的引數個數
EVAL "return redis.call('set', KEYS[1], ARGV[1])" 1 name Rose
7.2.7.1 基於Redis的分散式鎖的Lua指令碼
-- 比較執行緒標識與鎖中的標識是否一致
if(redis.call('get', KEYS[1]) == ARGV[1]) then
  -- 釋放鎖 del key
  return redis.call('del', KEYS[1])
end
return 0

7.2.8 Java呼叫Lua指令碼改造分散式鎖

提示:RedisTemplate呼叫Lua指令碼的API如下:

修改釋放鎖的實現

指令碼內容在resource目錄下新建unlock.lua檔案:

-- 比較執行緒標識與鎖中的標識是否一致
if(redis.call('get', KEYS[1]) == ARGV[1]) then
    -- 釋放鎖 del key
    return redis.call('del', KEYS[1])
end
return 0

java程式碼:

package com.hmdp.utils;

import cn.hutool.core.lang.UUID;
import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.data.redis.core.script.RedisScript;

import java.util.Collections;
import java.util.concurrent.TimeUnit;

/**
 * @Author: tl
 * @Description:
 * @Date: 2022/5/8
 */
public class SimpleRedisLock implements ILock{

    /**
     * 鎖的key名字
     */
    private String name;

    private StringRedisTemplate stringRedisTemplate;

    /**
     * 鎖的字首
     */
    public static final String KEY_PREFIX = "lock:";

    /**
     * 鎖的內容字首
     */
    public static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";

    /**
     * 載入Lua指令碼
     */
    public static final DefaultRedisScript<Long> UNLOCK_SCRIPT;

    static {
        UNLOCK_SCRIPT = new DefaultRedisScript<>();
        // 指令碼檔案位置
        UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
        // 指令碼返回值
        UNLOCK_SCRIPT.setResultType(Long.class);
    }


    public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {
        this.name = name;
        this.stringRedisTemplate = stringRedisTemplate;
    }

    /**
     * 嘗試獲取鎖
     * @param timeoutSeconds 鎖持有的超時時間,過期後自動釋放鎖
     * @return true代表獲取鎖成功,false代表獲取鎖失敗
     */
    @Override
    public boolean tryLock(long timeoutSeconds) {
        // 獲取執行緒標識
        String threadId = ID_PREFIX + Thread.currentThread().getId();
        // 獲取鎖
        Boolean success = stringRedisTemplate.opsForValue()
                .setIfAbsent(KEY_PREFIX + name, threadId, timeoutSeconds, TimeUnit.SECONDS);
        return Boolean.TRUE.equals(success);
    }

    /**
     * 釋放鎖
     */
    @Override
    public void unlock() {
       // 呼叫 Lua 指令碼釋放鎖
       stringRedisTemplate.execute(
               UNLOCK_SCRIPT,
               Collections.singletonList(KEY_PREFIX + name),
               ID_PREFIX + Thread.currentThread().getId()
       );
    }
}

7.2.9 總結

基於redis的分散式鎖實現思路

1. 利用set nx ex 獲取鎖,並設定過期時間,儲存執行緒標識
2. 釋放鎖先判斷執行緒標識是否與自己一致,一致則刪除鎖

特性

1. 利用 set nx 滿足互斥性
2. 利用 set ex 保證故障時鎖依然能釋放,避免死鎖,提高安全性
3. 利用redis叢集保證高可用和高併發特性

7.3 Redisson實現分散式鎖

基於 set nx 實現的分散式鎖存在下面的問題:

7.3.1 概述

Redisson是一個在Redis的基礎上實現的Java駐記憶體資料網格(In-Memory data Grid)。它不僅提供了一系列的分散式的java常用物件,還提供了許多分散式服務,其中就包含了各種分散式鎖的實現。

官方網址: https://redisson.org/

GitHub地址: https://github.com/redisson/redisson

7.3.2 Redisson快速入門

1.引入依賴

<!-- redisson -->
<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.14.0</version>
</dependency>

2.配置Redisson客戶端

package com.hmdp.config;

import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RedissonConfig {

    @Bean
    public RedissonClient redissonClient() {
        // 配置類
        Config config = new Config();
        // 新增redis地址,這裡添加了單點的地址,也可以使用 config.useClusterServers() 方法新增叢集模式的地址
        config.useSingleServer().setAddress("redis://127.0.0.1:6379").setPassword("123345");
        // 建立RedissonClient物件
        return Redisson.create(config);
    }
}

3.使用Redisson的分散式鎖

package com.hmdp.service.impl;

import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.hmdp.dto.Result;
import com.hmdp.entity.SeckillVoucher;
import com.hmdp.entity.VoucherOrder;
import com.hmdp.mapper.VoucherOrderMapper;
import com.hmdp.service.ISeckillVoucherService;
import com.hmdp.service.IVoucherOrderService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.hmdp.utils.RedisIdWorker;
import com.hmdp.utils.SimpleRedisLock;
import com.hmdp.utils.UserHolder;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import javax.annotation.Resource;
import java.time.LocalDateTime;


@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {

    @Resource
    private ISeckillVoucherService seckillVoucherService;

    @Resource
    private RedisIdWorker redisIdWorker;

    @Resource
    private RedissonClient redissonClient;

    /**
     * 搶購優惠券
     *
     * @param voucherId
     * @return
     */
    @Override
    public Result seckillVoucher(Long voucherId) {
        // 1. 查詢優惠券
        SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
        // 2. 判斷秒殺是否開始
        if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
            // 秒殺未開始
            return Result.fail("秒殺尚未開始");
        }
        // 3. 判斷秒殺是否結束
        if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
            // 秒殺已經結束
            return Result.fail("秒殺已經結束");
        }
        // 4. 判斷庫存是否充足
        if (voucher.getStock() < 1) {
            // 庫存不足
            return Result.fail("庫存不足");
        }
        return createVoucherOrder(voucherId);
    }

    /**
     * 建立訂單
     * @param voucherId
     * @return
     */
    @Transactional
    public Result createVoucherOrder(Long voucherId) {
        // 5. 一人只能搶一份
        Long userId = UserHolder.getUser().getId();

        // 使用Redisson建立鎖物件
        RLock redisLock = redissonClient.getLock("lock:order:" + userId);
        // 使用Redisson嘗試獲取鎖  引數1: 獲取鎖的最大等待時間(期間會重試) 引數2: 鎖自動釋放的事件 引數3:時間單位
        boolean isLock = redisLock.tryLock();
        // 判斷是否獲取鎖
        if (!isLock) {
            // 獲取鎖失敗,直接返回失敗/重試
            return Result.fail("不允許重複搶購!!");
        }

        try {
            // 5.1 查詢訂單
            long count = this.count(new LambdaQueryWrapper<VoucherOrder>()
                    .eq(VoucherOrder::getUserId, userId)
                    .eq(VoucherOrder::getVoucherId, voucherId));
            // 5.2 判斷是否已經搶過
            if (count > 0) {
                // 使用者已經購買過
                return Result.fail("使用者已經購買過一次");
            }
            // 6. 扣減庫存
            boolean success = seckillVoucherService.update(new LambdaUpdateWrapper<SeckillVoucher>()
                    .eq(SeckillVoucher::getVoucherId, voucherId)
                    // 利用CAS 解決超賣問題   where id = ? and stock = ?
                    //.eq(SeckillVoucher::getStock, voucher.getStock())
                    // 利用CAS 解決超賣問題   where id = ? and stock > 0
                    .gt(SeckillVoucher::getStock, 0)
                    .setSql("stock = stock - 1"));
            if (!success) {
                // 扣減庫存失敗
                return Result.fail("扣減庫存失敗");
            }
            // 7. 建立訂單
            VoucherOrder voucherOrder = new VoucherOrder();
            // 7.1 訂單id
            long orderId = redisIdWorker.nextId("order");
            voucherOrder.setId(orderId);
            // 7.2 使用者id
            voucherOrder.setUserId(userId);
            // 7.3 優惠券id
            voucherOrder.setVoucherId(voucherId);
            // 存庫
            this.save(voucherOrder);
            // 8. 返回訂單id
            return Result.ok(orderId);
        } finally {
            // 使用Redisson釋放鎖
            redisLock.unlock();
        }
    }
}

7.3.3 Redisson可重入鎖原理

根據重入次數來記錄可重入鎖

核心: 利用redis的Hash結構,記錄獲取鎖的執行緒以及記錄的次數

package com.hmdp;

import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.boot.test.context.SpringBootTest;

import javax.annotation.Resource;


@Slf4j
@SpringBootTest
public class RedissonTest {

    @Resource
    private RedissonClient redissonClient;

    private RLock lock;

    @BeforeEach
    void setUp() {
        lock = redissonClient.getLock("order");
    }

    @Test
    void method1() {
        // 嘗試獲取鎖
        boolean isLock = lock.tryLock();
        if (!isLock) {
            log.error("獲取鎖失敗 ....1");
            return;
        }
        try {
            log.info("獲取鎖成功 ....1");
            // 執行業務程式碼
            method2();
            log.info("執行業務程式碼 ....1");
        } finally {
            log.warn("準備釋放鎖 ....1");
            // 釋放鎖
            lock.unlock();
        }
    }

    @Test
    void method2() {
        // 嘗試獲取鎖
        boolean isLock = lock.tryLock();
        if (!isLock) {
            log.error("獲取鎖失敗 ....2");
            return;
        }
        try {
            log.info("獲取鎖成功 ....2");
            // 執行業務程式碼
            log.info("執行業務程式碼 ....2");
        } finally {
            log.warn("準備釋放鎖 ....2");
            // 釋放鎖
            lock.unlock();
        }
    }
}

7.3.3.1 Redisson獲取鎖核心原始碼
7.3.3.2 Redisson釋放鎖核心原始碼

7.3.4 Redisson的鎖重試和WatchDog機制

7.3.4.1總結
1. 可重入: 利用hash結構記錄執行緒id和重入次數
2. 可重試: 利用訊號量和PubSub功能實現等待、喚醒,獲取鎖失敗的重試機制
3. 超時續約: 利用watchDog,每隔一段時間(releaseTime / 3),重置超時時間

7.3.5 Redisson解決分散式鎖主從一致性問題

使用multiLock 聯鎖解決分散式鎖主從一致性問題

案例:我這裡啟動了三個redis節點

配置:

package com.hmdp.config;

import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @Author: tl
 * @Description:
 * @Date: 2022/5/8
 */
@Configuration
public class RedissonConfig {

    @Bean
    public RedissonClient redissonClient() {
        // 配置類
        Config config = new Config();
        // 新增redis地址,這裡添加了單點的地址,也可以使用 config.useClusterServers() 方法新增叢集模式的地址
        config.useSingleServer().setAddress("redis://127.0.0.1:6379").setPassword("123345");
        // 建立RedissonClient物件
        return Redisson.create(config);
    }

    @Bean
    public RedissonClient redissonClient2() {
        // 配置類
        Config config = new Config();
        // 新增redis地址,這裡添加了單點的地址,也可以使用 config.useClusterServers() 方法新增叢集模式的地址
        config.useSingleServer().setAddress("redis://127.0.0.1:6380").setPassword("123345");
        // 建立RedissonClient物件
        return Redisson.create(config);
    }


    @Bean
    public RedissonClient redissonClient3() {
        // 配置類
        Config config = new Config();
        // 新增redis地址,這裡添加了單點的地址,也可以使用 config.useClusterServers() 方法新增叢集模式的地址
        config.useSingleServer().setAddress("redis://127.0.0.1:6381").setPassword("123345");
        // 建立RedissonClient物件
        return Redisson.create(config);
    }
}
7.3.5.1 總結
1. 不可重入Redis分散式鎖:
 	1.1: 原理: 利用 set nx 的互斥性;利用ex避免死鎖;釋放鎖時判斷執行緒標識
 	1.2: 缺陷: 不可重入、無法重試、鎖超時失效
2. 可重入的Redis分散式鎖
	1.1: 原理: 利用hash結構,記錄執行緒標識和重入次數;利用watchDog延續鎖時間;利用訊號量控制鎖重試等待
	1.2: 缺陷: redis宕機引起鎖失效問題
3. Redisson的multiLock
	1.1: 原理: 多個獨立的Redis節點,必須在所有節點都獲取重入鎖,才算獲取鎖成功
	1.2: 缺陷: 運維成本高、實現複雜

8. Redis優化秒殺

8.1 改進秒殺業務

提高併發效能:

需求:

  1. 新增秒殺優惠券的同時,將優惠券資訊儲存到Redis中
  2. 基於Lua指令碼,判斷秒殺庫存、一人一單,決定使用者是否搶購成功
  3. 如果搶購成功,將優惠券id和使用者id封裝後存入阻塞佇列
  4. 開啟執行緒任務,不斷從阻塞佇列中獲取資訊,實現非同步下單功能

8.1.2 Lua指令碼完成秒殺資格判斷

resource資料夾下建立 Seckill.lua

-- 1. 引數列表
-- 1.1 優惠券id
local voucherId = ARGV[1]
-- 1.2 使用者id
local userId = ARGV[2]

-- 2. 資料key
-- 2.1 庫存key
local stockKey = 'seckill:stock:' .. voucherId
-- 2.2 訂單key
local orderKey = 'seckill:order:' .. voucherId

-- 3. 指令碼業務
-- 3.1 判斷庫存是否足夠  get stockKey
if(tonumber(redis.call('get', stockKey)) <= 0)then
    -- 3.2 庫存不足,返回1
    return 1
end
-- 3.2 判斷使用者是否已經下單 SISMEMBER orderKey userId
if(redis.call('sismember', orderKey, userId) == 1)then
    -- 3.3 重複下單,返回2
    return 2
end
-- 3.4 扣除庫存 incrby stockKey -1
redis.call('incrby', stockKey, -1)
-- 3.5 下單(儲存使用者) sadd orderKey userId
redis.call('sadd', orderKey, userId)
return 0

業務程式碼:

package com.hmdp.service.impl;

import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.hmdp.dto.Result;
import com.hmdp.entity.SeckillVoucher;
import com.hmdp.entity.VoucherOrder;
import com.hmdp.mapper.VoucherOrderMapper;
import com.hmdp.service.ISeckillVoucherService;
import com.hmdp.service.IVoucherOrderService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.hmdp.utils.RedisIdWorker;
import com.hmdp.utils.UserHolder;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import javax.annotation.Resource;
import java.util.Collections;


@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {

    @Resource
    private ISeckillVoucherService seckillVoucherService;

    @Resource
    private RedisIdWorker redisIdWorker;

    @Resource
    private RedissonClient redissonClient;

    @Resource
    private StringRedisTemplate stringRedisTemplate;

    /**
     * 載入Lua指令碼
     */
    public static final DefaultRedisScript<Long> SECKILL_SCRIPT;

    static {
        SECKILL_SCRIPT = new DefaultRedisScript<>();
        // 指令碼檔案位置
        SECKILL_SCRIPT.setLocation(new ClassPathResource("Seckill.lua"));
        // 指令碼返回值
        SECKILL_SCRIPT.setResultType(Long.class);
    }

    /**
     * 搶購優惠券
     *
     * @param voucherId
     * @return
     */
    @Override
    public Result seckillVoucher(Long voucherId) {
        Long userId = UserHolder.getUser().getId();
        // 1. 執行Lua指令碼
        Long result = stringRedisTemplate.execute(
                SECKILL_SCRIPT,
                Collections.emptyList(),
                voucherId.toString(),
                userId.toString()
        );
        int r = result.intValue();
        // 2. 判斷結果是否為0
        // 2.1 不為0,代表沒有購買資格
        if (r != 0) {
            return Result.fail(r == 1 ? "庫存不足" : "不能重複下單");
        }
        // 2.2 為0,有購買資格,把下單資訊儲存到阻塞佇列中
        long orderId = redisIdWorker.nextId("order");
        // TODO: 2022/5/9 這裡需要儲存到阻塞佇列
        // 3. 返回訂單id
        return Result.ok(orderId);
    }
}

8.1.3 搶購成功,將優惠券id和使用者id存入阻塞佇列

 private BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024 * 1024);

/**
 * 搶購優惠券
 *
 * @param voucherId
 * @return
 */
@Override
public Result seckillVoucher(Long voucherId) {
    Long userId = UserHolder.getUser().getId();
    // 1. 執行Lua指令碼
    Long result = stringRedisTemplate.execute(
        SECKILL_SCRIPT,
        Collections.emptyList(),
        voucherId.toString(),
        userId.toString()
    );
    int r = result.intValue();
    // 2. 判斷結果是否為0
    // 2.1 不為0,代表沒有購買資格
    if (r != 0) {
        return Result.fail(r == 1 ? "庫存不足" : "不能重複下單");
    }
    // 2.2 為0,有購買資格,把下單資訊儲存到阻塞佇列中
    VoucherOrder voucherOrder = new VoucherOrder();
    // 2.3 訂單id
    long orderId = redisIdWorker.nextId("order");
    voucherOrder.setId(orderId);
    // 2.4 使用者id
    voucherOrder.setUserId(userId);
    // 2.5 優惠券id
    voucherOrder.setVoucherId(voucherId);
    // 2.6 放入阻塞佇列
    orderTasks.add(voucherOrder);

    // 3. 返回訂單id
    return Result.ok(orderId);
}

8.1.4 開啟執行緒任務,不斷從阻塞佇列中獲取資訊,實現非同步下單功能

package com.hmdp.service.impl;

import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.hmdp.dto.Result;
import com.hmdp.entity.SeckillVoucher;
import com.hmdp.entity.VoucherOrder;
import com.hmdp.mapper.VoucherOrderMapper;
import com.hmdp.service.ISeckillVoucherService;
import com.hmdp.service.IVoucherOrderService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.hmdp.utils.RedisIdWorker;
import com.hmdp.utils.UserHolder;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.Collections;
import java.util.concurrent.*;


@Slf4j
@Service
@Transactional(rollbackFor = Exception.class)
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {

    @Resource
    private ISeckillVoucherService seckillVoucherService;

    @Resource
    private RedisIdWorker redisIdWorker;

    @Resource
    private RedissonClient redissonClient;

    @Resource
    private StringRedisTemplate stringRedisTemplate;

    /**
     * 載入Lua指令碼
     */
    public static final DefaultRedisScript<Long> SECKILL_SCRIPT;

    static {
        SECKILL_SCRIPT = new DefaultRedisScript<>();
        // 指令碼檔案位置
        SECKILL_SCRIPT.setLocation(new ClassPathResource("Seckill.lua"));
        // 指令碼返回值
        SECKILL_SCRIPT.setResultType(Long.class);
    }

    private BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024 * 1024);
    private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();

    @PostConstruct
    private void init() {
        SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
    }

    private class VoucherOrderHandler implements Runnable {
        @Override
        public void run() {
            while (true) {
                try {
                    // 1. 獲取佇列中的訂單資訊
                    VoucherOrder voucherOrder = orderTasks.take();
                    // 2. 建立訂單
                    createVoucherOrder(voucherOrder);
                } catch (InterruptedException e) {
                    log.error("處理訂單異常", e);
                }
            }
        }
    }

    /**
     * 建立訂單
     * @param voucherOrder
     * @return
     */
    public void createVoucherOrder(VoucherOrder voucherOrder) {
        // 5. 一人只能搶一份
        Long userId = voucherOrder.getUserId();
        Long voucherId = voucherOrder.getVoucherId();

        // 使用Redisson建立鎖物件
        RLock redisLock = redissonClient.getLock("lock:order:" + userId);
        // 使用Redisson嘗試獲取鎖
        boolean isLock = redisLock.tryLock();
        // 判斷是否獲取鎖
        if (!isLock) {
            // 獲取鎖失敗,直接返回失敗/重試
            log.error("不允許重複搶購!!");
            return;
        }

        try {
            // 5.1 查詢訂單
            long count = this.count(new LambdaQueryWrapper<VoucherOrder>()
                                    .eq(VoucherOrder::getUserId, userId)
                                    .eq(VoucherOrder::getVoucherId, voucherId));
            // 5.2 判斷是否已經搶過
            if (count > 0) {
                // 使用者已經購買過
                log.error("使用者已經購買過!!");
                return;
            }
            // 6. 扣減庫存
            boolean success = seckillVoucherService.update(new LambdaUpdateWrapper<SeckillVoucher>()
                                                           .eq(SeckillVoucher::getVoucherId, voucherId)
                                                           .gt(SeckillVoucher::getStock, 0)
                                                           .setSql("stock = stock - 1"));
            if (!success) {
                // 扣減庫存失敗
                log.error("扣減庫存失敗!!");
                return ;
            }
            // 存庫
            this.save(voucherOrder);
        } finally {
            // 使用Redisson釋放鎖
            redisLock.unlock();
        }
    }

    /**
     * 搶購優惠券
     *
     * @param voucherId
     * @return
     */
    @Override
    public Result seckillVoucher(Long voucherId) {
        Long userId = UserHolder.getUser().getId();
        // 1. 執行Lua指令碼
        Long result = stringRedisTemplate.execute(
            SECKILL_SCRIPT,
            Collections.emptyList(),
            voucherId.toString(),
            userId.toString()
        );
        int r = result.intValue();
        // 2. 判斷結果是否為0
        // 2.1 不為0,代表沒有購買資格
        if (r != 0) {
            return Result.fail(r == 1 ? "庫存不足" : "不能重複下單");
        }
        // 2.2 為0,有購買資格,把下單資訊儲存到阻塞佇列中
        VoucherOrder voucherOrder = new VoucherOrder();
        // 2.3 訂單id
        long orderId = redisIdWorker.nextId("order");
        voucherOrder.setId(orderId);
        // 2.4 使用者id
        voucherOrder.setUserId(userId);
        // 2.5 優惠券id
        voucherOrder.setVoucherId(voucherId);
        // 2.6 放入阻塞佇列
        orderTasks.add(voucherOrder);

        // 3. 返回訂單id
        return Result.ok(orderId);
    }
}

8.1.5 總結

優化思路

  • 先利用Redis完成庫存餘量、一人一單判斷,完成搶單業務
  • 再講下單業務放入阻塞佇列,利用獨立執行緒非同步下單

基於阻塞佇列的非同步秒殺存在哪些問題?

  • 記憶體限制問題
  • 資料安全問題

9. Redis訊息佇列實現非同步秒殺

9.1 什麼是訊息佇列

訊息佇列(Message Queue),字面意思就是存放訊息的佇列。最簡單的訊息佇列模型包括3個角色

- 訊息佇列: 儲存和管理訊息,也被稱為訊息代理 (Message Broker)
- 生產者: 傳送訊息到訊息佇列
- 消費者: 從訊息佇列獲取訊息並處理訊息

9.2 基於List結構模擬訊息佇列

訊息佇列(Message Queue),字面意思就是存放訊息的佇列。而Redis的List資料結構是一個雙向連結串列,很容以模擬出佇列效果。

佇列:是入口和出口不在一遍,因此可以利用: LPUSH結合RPOP、或者RPUSH結合LPOP來實現

要注意的是,當佇列中沒有訊息時 RPOP或LPOP操作會返回null, 並不像JVM的阻塞佇列那樣會阻塞並等待訊息。

因此這裡應該使用 BRPOP 或者 BLPOP 來實現阻塞效果。

9.2.1基於list的訊息佇列的優缺點

9.2.1.1 優點
- 利用redis儲存,不受限與JVM記憶體的上限
- 基於Redis的持久化機制,資料安全性有保證
- 可以滿足訊息有序性
9.2.1.2 缺點
- 無法避免訊息丟失
- 只支援單消費者

9.4 基於PubSub的訊息佇列

PubSub(釋出訂閱)是Redis2.0版本引入的訊息傳遞模型。顧名思義,消費者可以訂閱一個或多個Channel,生產者向對應的Channel傳送訊息後,所有訂閱者都能收到相關訊息。

- SubScribe channel [channel] : 訂閱一個或多個頻道
- Publish channel msg : 向一個頻道傳送訊息
- PsubScribe pattern[pattern] : 訂閱與pattern格式匹配的所有頻道

9.4.1 基於PubSub的優缺點

9.4.1.1 優點
- 採用釋出訂閱模型,支援多生產、多消費
9.4.1.2 缺點
- 不支援資料持久化
- 無法避免訊息丟失
- 訊息堆積有上限,超出時資料丟失

9.6 基於Stream的訊息佇列=>(單消費模式)

Stream是Redis5.0引入的一種新資料型別,可以實現一個功能非常完善的訊息佇列

傳送訊息的命令 xadd:

讀取訊息的方式之一 xread:

例如: 使用xread讀取第一個訊息

9.6.1 Xread阻塞方式,讀取訊息

在業務開發中,我們可以迴圈的呼叫Xread阻塞方式來查詢最新訊息,從而實現持續監聽佇列的效果,虛擬碼如下:

9.6.2 Stream型別訊息佇列的Xread命令特點

- 訊息可回溯
- 一條訊息可以被多個消費者讀取
- 可以阻塞讀取
- 有訊息漏讀的風險

9.7 基於Stream的訊息佇列=>(消費組模式)

消費組(Consumer Group):將多個消費者劃分到一個組中,監聽同一個佇列。具備下列特點:

建立消費者組:

其他常見命令

從消費者組讀取訊息

9.7.1 消費者監聽訊息的基本思路

9.7.2 XREADGROUP命令特點

- 訊息可回溯
- 可以多消費者爭搶訊息,加快消費速度
- 可以阻塞讀取
- 沒有訊息漏讀的風險
- 有訊息確認機制,保證訊息至少被消費一次

9.8 Redis訊息佇列

9.9 基於Redis的Stream結構作為訊息佇列,實現非同步秒殺下單

需求:

  1. 建立一個 Stream型別的訊息佇列命名為stream.orders
  2. 修改之前的秒殺下單Lua指令碼,在認定有搶購資格後,直接向stream.orders中新增訊息,內容包含voucherIduserIdorderId
  3. 專案啟動時,開啟一個執行緒任務,嘗試獲取stream.orders中的訊息,完成下單

9.9.1 建立stream.orders

9.9.2 修改lua指令碼和搶購優惠券的邏輯

-- 1. 引數列表
-- 1.1 優惠券id
local voucherId = ARGV[1]
-- 1.2 使用者id
local userId = ARGV[2]
-- 1.3 訂單id
local orderId = ARGV[3]

-- 2. 資料key
-- 2.1 庫存key
local stockKey = 'seckill:stock:' .. voucherId
-- 2.2 訂單key
local orderKey = 'seckill:order:' .. voucherId

-- 3. 指令碼業務
-- 3.1 判斷庫存是否足夠  get stockKey
if(tonumber(redis.call('get', stockKey)) <= 0)then
    -- 3.2 庫存不足,返回1
    return 1
end
-- 3.2 判斷使用者是否已經下單 SISMEMBER orderKey userId
if(redis.call('sismember', orderKey, userId) == 1)then
    -- 3.3 重複下單,返回2
    return 2
end
-- 3.4 扣除庫存 incrby stockKey -1
redis.call('incrby', stockKey, -1)
-- 3.5 下單(儲存使用者) sadd orderKey userId
redis.call('sadd', orderKey, userId)
-- 3.6 傳送訊息到佇列中, XADD stream.orders * k1 v1 k2 v2 ...
redis.call('xadd', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId)
return 0
/**
 * 搶購優惠券
 *
 * @param voucherId
 * @return
 */
@Override
public Result seckillVoucher(Long voucherId) {
    Long userId = UserHolder.getUser().getId();
    long orderId = redisIdWorker.nextId("order");
    // 1. 執行Lua指令碼
    Long result = stringRedisTemplate.execute(
        SECKILL_SCRIPT,
        Collections.emptyList(),
        voucherId.toString(),
        userId.toString(),
        String.valueOf(orderId)
    );
    int r = result.intValue();
    // 2. 判斷結果是否為0
    // 2.1 不為0,代表沒有購買資格
    if (r != 0) {
        return Result.fail(r == 1 ? "庫存不足" : "不能重複下單");
    }
    // 3. 返回訂單id
    return Result.ok(orderId);
}

9.9.3 獲取stream.orders中的訊息,完成下單

package com.hmdp.service.impl;

import cn.hutool.core.bean.BeanUtil;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.hmdp.dto.Result;
import com.hmdp.entity.SeckillVoucher;
import com.hmdp.entity.VoucherOrder;
import com.hmdp.mapper.VoucherOrderMapper;
import com.hmdp.service.ISeckillVoucherService;
import com.hmdp.service.IVoucherOrderService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.hmdp.utils.RedisIdWorker;
import com.hmdp.utils.UserHolder;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.connection.stream.*;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;


@Slf4j
@Service
@Transactional(rollbackFor = Exception.class)
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {

    @Resource
    private ISeckillVoucherService seckillVoucherService;

    @Resource
    private RedisIdWorker redisIdWorker;

    @Resource
    private RedissonClient redissonClient;

    @Resource
    private StringRedisTemplate stringRedisTemplate;

    /**
     * 載入Lua指令碼
     */
    public static final DefaultRedisScript<Long> SECKILL_SCRIPT;

    static {
        SECKILL_SCRIPT = new DefaultRedisScript<>();
        // 指令碼檔案位置
        SECKILL_SCRIPT.setLocation(new ClassPathResource("Seckill.lua"));
        // 指令碼返回值
        SECKILL_SCRIPT.setResultType(Long.class);
    }

    private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();

    @PostConstruct
    private void init() {
        SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
    }

    private class VoucherOrderHandler implements Runnable {
        @Override
        public void run() {
            String queueName = "stream.orders";
            while (true) {
                try {
                    // 1. 獲取訊息佇列中的訂單資訊 XREADGROUP GROUP G1 C1 COUNT 1 BLOCK 2000 STREAMS S1 >
                    List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
                            Consumer.from("g1", "c1"),
                            StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
                            StreamOffset.create(queueName, ReadOffset.lastConsumed())
                    );
                    // 2. 判斷訂單資訊是否為空
                    if (list == null || list.isEmpty()) {
                        // 2.1 如果為null,說明沒有訊息,繼續下一次迴圈
                        continue;
                    }
                    // 2.2 解析訊息
                    MapRecord<String, Object, Object> record = list.get(0);
                    Map<Object, Object> value = record.getValue();
                    VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
                    // 3. 建立訂單
                    createVoucherOrder(voucherOrder);
                    // 4. 確認訊息 XACK s1 g1 id
                    stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());
                } catch (Exception e) {
                    log.error("處理訂單異常", e);
                    hanldependdingList();
                }
            }
        }

        private void hanldependdingList() {
            String queueName = "stream.orders";
            while (true) {
                try {
                    // 1. 獲取pending-list中的訂單資訊 XREADGROUP GROUP G1 C1 COUNT 1 BLOCK 2000 STREAMS S1 >
                    List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
                            Consumer.from("g1", "c1"),
                            StreamReadOptions.empty().count(1),
                            StreamOffset.create(queueName, ReadOffset.from("0"))
                    );
                    // 2. 判斷訂單資訊是否為空
                    if (list == null || list.isEmpty()) {
                        // 2.1 如果為null,說明pendding-list沒有異常訊息,結束迴圈
                        break;
                    }
                    // 2.2 解析訊息
                    MapRecord<String, Object, Object> record = list.get(0);
                    Map<Object, Object> value = record.getValue();
                    VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
                    // 3. 建立訂單
                    createVoucherOrder(voucherOrder);
                    // 4. 確認訊息 XACK s1 g1 id
                    stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());
                } catch (Exception e) {
                    log.error("處理pendding訂單異常", e);
                    try {
                        Thread.sleep(200);
                    } catch (InterruptedException ex) {
                        ex.printStackTrace();
                    }
                }
            }
        }
    }

    /**
     * 建立訂單
     * @param voucherOrder
     * @return
     */
    public void createVoucherOrder(VoucherOrder voucherOrder) {
        // 5. 一人只能搶一份
        Long userId = voucherOrder.getUserId();
        Long voucherId = voucherOrder.getVoucherId();

        // 使用Redisson建立鎖物件
        RLock redisLock = redissonClient.getLock("lock:order:" + userId);
        // 使用Redisson嘗試獲取鎖
        boolean isLock = redisLock.tryLock();
        // 判斷是否獲取鎖
        if (!isLock) {
            // 獲取鎖失敗,直接返回失敗/重試
            log.error("不允許重複搶購!!");
            return;
        }

        try {
            // 5.1 查詢訂單
            long count = this.count(new LambdaQueryWrapper<VoucherOrder>()
                    .eq(VoucherOrder::getUserId, userId)
                    .eq(VoucherOrder::getVoucherId, voucherId));
            // 5.2 判斷是否已經搶過
            if (count > 0) {
                // 使用者已經購買過
                log.error("使用者已經購買過!!");
                return;
            }
            // 6. 扣減庫存
            boolean success = seckillVoucherService.update(new LambdaUpdateWrapper<SeckillVoucher>()
                    .eq(SeckillVoucher::getVoucherId, voucherId)
                    .gt(SeckillVoucher::getStock, 0)
                    .setSql("stock = stock - 1"));
            if (!success) {
                // 扣減庫存失敗
                log.error("扣減庫存失敗!!");
                return ;
            }
            // 存庫
            this.save(voucherOrder);
        } finally {
            // 使用Redisson釋放鎖
            redisLock.unlock();
        }
    }

    /**
     * 搶購優惠券
     *
     * @param voucherId
     * @return
     */
    @Override
    public Result seckillVoucher(Long voucherId) {
        Long userId = UserHolder.getUser().getId();
        long orderId = redisIdWorker.nextId("order");
        // 1. 執行Lua指令碼
        Long result = stringRedisTemplate.execute(
                SECKILL_SCRIPT,
                Collections.emptyList(),
                voucherId.toString(),
                userId.toString(),
                String.valueOf(orderId)
        );
        int r = result.intValue();
        // 2. 判斷結果是否為0
        // 2.1 不為0,代表沒有購買資格
        if (r != 0) {
            return Result.fail(r == 1 ? "庫存不足" : "不能重複下單");
        }
        // 3. 返回訂單id
        return Result.ok(orderId);
    }


}