1. 程式人生 > 其它 >Redis 實現庫存扣減操作

Redis 實現庫存扣減操作

原文連結:https://mp.weixin.qq.com/s/uyX9eRnd2xPOEr6lwax8Yw

在日常開發中有很多地方都有類似扣減庫存的操作,比如電商系統中的商品庫存,抽獎系統中的獎品庫存等。

解決方案

  • 使用mysql資料庫,使用一個欄位來儲存庫存,每次扣減庫存去更新這個欄位。
  • 還是使用資料庫,但是將庫存分層多份存到多條記錄裡面,扣減庫存的時候路由一下,這樣子增大了併發量,但是還是避免不了大量的去訪問資料庫來更新庫存。
  • 將庫存放到redis使用redis的incrby特性來扣減庫存。

分析

在上面的第一種和第二種方式都是基於資料來扣減庫存。

基於資料庫單庫存

第一種方式在所有請求都會在這裡等待鎖,獲取鎖有去扣減庫存。

在併發量不高的情況下可以使用,但是一旦併發量大了就會有大量請求阻塞在這裡,導致請求超時,進而整個系統雪崩;而且會頻繁的去訪問資料庫,大量佔用資料庫資源,所以在併發高的情況下這種方式不適用。

基於資料庫多庫存

第二種方式其實是第一種方式的優化版本,在一定程度上提高了併發量,但是在還是會大量的對資料庫做更新操作大量佔用資料庫資源。

基於資料庫來實現扣減庫存還存在的一些問題:

用資料庫扣減庫存的方式,扣減庫存的操作必須在一條語句中執行,不能先selec在update,這樣在併發下會出現超扣的情況。如:

update number set x=x-1 where x > 0

理論上即使是這樣由於MySQL事務的特性,這種方法只能降低超賣的數量,但是不可能完全避免超扣。

因為資料庫預設隔離級別是repeatable read,假如庫存是5,有A、B兩個請求分別建立了事務並且都沒有提交,當A事務提交了,改了庫存為4,但是因為是事務隔離級別是可重複讀的,所有B看不到A事務改的庫存。到時B看到的庫存還是5,所以B修改庫存為4,這樣就出現了超扣問題。

所以我們扣庫存的時候需要將事務隔離級別設定成read commit才可以。(我自己測試沒有出現這種情況)

  • MySQL自身對於高併發的處理效能就會出現問題,一般來說,MySQL的處理效能會隨著併發thread上升而上升,但是到了一定的併發度之後會出現明顯的拐點,之後一路下降,最終甚至會比單thread的效能還要差。

  • 當減庫存和高併發碰到一起的時候,由於操作的庫存數目在同一行,就會出現爭搶InnoDB行鎖的問題,導致出現互相等待甚至死鎖,從而大大降低MySQL的處理效能,最終導致前端頁面出現超時異常。

基於redis

針對上述問題的問題我們就有了第三種方案,將庫存放到快取,利用redis的incrby特性來扣減庫存,解決了超扣和效能問題。但是一旦快取丟失需要考慮恢復方案。

比如抽獎系統扣獎品庫存的時候,初始庫存=總的庫存數-已經發放的獎勵數,但是如果是非同步發獎,需要等到MQ訊息消費完了才能重啟redis初始化庫存,否則也存在庫存不一致的問題。

Redis Incrby 命令

Redis Incrby 命令將 key 中儲存的數字加上指定的增量值。

  • 如果 key 不存在,那麼 key 的值會先被初始化為 0 ,然後再執行 INCRBY 命令。
  • 如果值包含錯誤的型別,或字串型別的值不能表示為數字,那麼返回一個錯誤。

本操作的值限制在 64 位(bit)有符號數字表示之內。

語法

redis Incrby 命令基本語法如下:

redis 127.0.0.1:6379> INCRBY KEY_NAME INCR_AMOUNT
可用版本

>= 1.0.0

返回值

加上指定的增量值之後, key 的值。

基於redis實現扣減庫存的具體實現

  • 我們使用redis的lua指令碼來實現扣減庫存
  • 由於是分散式環境下所以還需要一個分散式鎖來控制只能有一個服務去初始化庫存
  • 需要提供一個回撥函式,在初始化庫存的時候去呼叫這個函式獲取初始化庫存
  • 庫存扣減完之後可以進行一個非同步的更改資料庫資料,保證一致性

具體關於lua指令碼的內容使用請移步至 redis命令參考–Script指令碼 :

http://doc.redisfans.com/script/index.html

lua指令碼注意點:

Lua指令碼,是一種輕量級的指令碼語言。設計目的是為了嵌入應用程式中,從而為應用程式提供靈活的擴充套件和定製功能。Lua指令碼的應用也很多,比如Nginx+Lua實現的OpenResty,Redis+Lua配合使用(Redisson中大量使用了Lua指令碼)。

Lua指令碼具有以下好處:

1、減少網路開銷:Lua指令碼在執行的時候,是先發送到Redis伺服器的,然後在伺服器上執行指令碼。多個命令和業務邏輯都封裝到腳本里,一次性提交到伺服器。

2、原子性操作:我們都知道redis在執行命令時是單執行緒的,但是每個命令之間就存在併發的情況,就存在先查詢再操作時,兩個命令沒辦法保證執行緒安全。但使用Lua指令碼時,redis把這個指令碼操作當成是一個命令,那麼這個指令碼中的多條操作也就保證了原子性。(注意:只保證原子性,不是事務)

雖然Lua指令碼有這麼多優點,但是也不能亂用,使用的時候要注意:

1、Lua指令碼可以在redis單機模式、主從模式、Sentinel叢集模式下正常使用,但是無法在分片叢集模式下使用。(指令碼操作的key可能不在同一個分片)。(其實叢集模式不支援問題也是可以解決的,在使用spring的RedisTemplate執行lua指令碼時,報錯EvalSha is not supported in cluster environment,不支援cluster。但是redis是支援lua指令碼的,只要拿到原redis的connection物件,通過connection去執行即可,在後面會說下這個問題)

2、Lua指令碼中儘量避免使用迴圈操作(可能引發死迴圈問題),儘量避免長時間執行。

3、redis在執行lua指令碼時,預設最長執行時間時5秒,當指令碼執行時間超過這一限制後,Redis將開始接受其他命令但不會執行(以確保指令碼的原子性,因為此時指令碼並沒有被終止),而是會返回“BUSY”錯誤。

初始化庫存回撥函式(IStockCallback )
/**
 * 獲取庫存回撥
 * @author yuhao.wang
 */
public interface IStockCallback {
 
    /**
     * 獲取庫存
     * @return
     */
    int getStock();
}

扣減庫存服務(StockService)

ackage com.xiaolyuh.service;
 
import com.xiaolyuh.lock.RedisLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisCluster;
 
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
 
/**
 * 扣庫存
 *
 * @author yuhao.wang
 */
@Service
public class StockService {
    Logger logger = LoggerFactory.getLogger(StockService.class);
 
    /**
     * 庫存還未初始化
     */
    public static final long UNINITIALIZED_STOCK = -3L;
 
    /**
     * Redis 客戶端
     */
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
 
    /**
     * 執行扣庫存的指令碼
     */
    public static final String STOCK_LUA;
 
    static {
        /**
         *
         * @desc 扣減庫存Lua指令碼
         * 庫存(stock)-1:表示不限庫存
         * 庫存(stock)0:表示沒有庫存
         * 庫存(stock)大於0:表示剩餘庫存
         *
         * @params 庫存key
         * @return
         *      -3:庫存未初始化
         *   -2:庫存不足
         *   -1:不限庫存
         *   大於等於0:剩餘庫存(扣減之後剩餘的庫存),直接返回-1
         */
        StringBuilder sb = new StringBuilder();
        // exists 判斷是否存在KEY,如果存在返回1,不存在返回0
        sb.append("if (redis.call('exists', KEYS[1]) == 1) then");
        // get 獲取KEY的快取值,tonumber 將redis資料轉成 lua 的整形
        sb.append("    local stock = tonumber(redis.call('get', KEYS[1]));");
        sb.append("    local num = tonumber(ARGV[1]);");
        // 如果拿到的快取數等於 -1,代表改商品庫存是無限的,直接返回1
        sb.append("    if (stock == -1) then");
        sb.append("        return -1;");
        sb.append("    end;");
        // incrby 特性進行庫存的扣減
        sb.append("    if (stock >= num) then");
        sb.append("        return redis.call('incrby', KEYS[1], 0-num);");
        sb.append("    end;");
        sb.append("    return -2;");
        sb.append("end;");
        sb.append("return -3;");
        STOCK_LUA = sb.toString();
    }
 
    /**
     * @param key           庫存key
     * @param expire        庫存有效時間,單位秒
     * @param num           扣減數量
     * @param stockCallback 初始化庫存回撥函式
     * @return -2:庫存不足; -1:不限庫存; 大於等於0:扣減庫存之後的剩餘庫存
     */
    public long stock(String key, long expire, int num, IStockCallback stockCallback) {
        long stock = stock(key, num);
        // 初始化庫存
        if (stock == UNINITIALIZED_STOCK) {
            RedisLock redisLock = new RedisLock(redisTemplate, key);
            try {
                // 獲取鎖
                if (redisLock.tryLock()) {
                    // 雙重驗證,避免併發時重複回源到資料庫
                    stock = stock(key, num);
                    if (stock == UNINITIALIZED_STOCK) {
                        // 獲取初始化庫存
                        final int initStock = stockCallback.getStock();
                        // 將庫存設定到redis
                        redisTemplate.opsForValue().set(key, initStock, expire, TimeUnit.SECONDS);
                        // 調一次扣庫存的操作
                        stock = stock(key, num);
                    }
                }
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
            } finally {
                redisLock.unlock();
            }
 
        }
        return stock;
    }
 
    /**
     * 扣庫存
     *
     * @param key 庫存key
     * @param num 扣減庫存數量
     * @return 扣減之後剩餘的庫存【-3:庫存未初始化; -2:庫存不足; -1:不限庫存; 大於等於0:扣減庫存之後的剩餘庫存】
     */
    private Long stock(String key, int num) {
        // 腳本里的KEYS引數
        List<String> keys = new ArrayList<>();
        keys.add(key);
        // 腳本里的ARGV引數
        List<String> args = new ArrayList<>();
        args.add(Integer.toString(num));
 
        long result = redisTemplate.execute(new RedisCallback<Long>() {
            @Override
            public Long doInRedis(RedisConnection connection) throws DataAccessException {
                Object nativeConnection = connection.getNativeConnection();
                // 叢集模式和單機模式雖然執行指令碼的方法一樣,但是沒有共同的介面,所以只能分開執行
                // 叢集模式
                if (nativeConnection instanceof JedisCluster) {
                    return (Long) ((JedisCluster) nativeConnection).eval(STOCK_LUA, keys, args);
                }
 
                // 單機模式
                else if (nativeConnection instanceof Jedis) {
                    return (Long) ((Jedis) nativeConnection).eval(STOCK_LUA, keys, args);
                }
                return UNINITIALIZED_STOCK;
            }
        });
        return result;
    }
 
 
 
/**
     * 加庫存(還原庫存)
     *
     * @param key    庫存key
     * @param num    庫存數量
     * @return
     */
    public long addStock(String key, int num) {
 
        return addStock(key, null, num);
    }
 
    /**
     * 加庫存
     *
     * @param key    庫存key
     * @param expire 過期時間(秒)
     * @param num    庫存數量
     * @return
     */
    public long addStock(String key, Long expire, int num) {
        boolean hasKey = redisTemplate.hasKey(key);
        // 判斷key是否存在,存在就直接更新
        if (hasKey) {
            return redisTemplate.opsForValue().increment(key, num);
        }
 
        Assert.notNull(expire,"初始化庫存失敗,庫存過期時間不能為null");
        RedisLock redisLock = new RedisLock(redisTemplate, key);
        try {
            if (redisLock.tryLock()) {
                // 獲取到鎖後再次判斷一下是否有key
                hasKey = redisTemplate.hasKey(key);
                if (!hasKey) {
                    // 初始化庫存
                    redisTemplate.opsForValue().set(key, num, expire, TimeUnit.SECONDS);
                }
            }
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
        } finally {
            redisLock.unlock();
        }
 
        return num;
    }
 
    /**
     * 獲取庫存
     *
     * @param key 庫存key
     * @return -1:不限庫存; 大於等於0:剩餘庫存
     */
    public int getStock(String key) {
        Integer stock = (Integer) redisTemplate.opsForValue().get(key);
        return stock == null ? -1 : stock;
    }
 
}
呼叫
@RestController
public class StockController {
 
    @Autowired
    private StockService stockService;
 
    @RequestMapping(value = "stock", produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
    public Object stock() {
        // 商品ID
        long commodityId = 1;
        // 庫存ID
        String redisKey = "redis_key:stock:" + commodityId;
        long stock = stockService.stock(redisKey, 60 * 60, 2, () -> initStock(commodityId));
        return stock >= 0;
    }
 
    /**
     * 獲取初始的庫存
     *
     * @return
     */
    private int initStock(long commodityId) {
        // TODO 這裡做一些初始化庫存的操作
        return 1000;
    }
 
    @RequestMapping(value = "getStock", produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
    public Object getStock() {
        // 商品ID
        long commodityId = 1;
        // 庫存ID
        String redisKey = "redis_key:stock:" + commodityId;
 
        return stockService.getStock(redisKey);
    }
 
    @RequestMapping(value = "addStock", produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
    public Object addStock() {
        // 商品ID
        long commodityId = 2;
        // 庫存ID
        String redisKey = "redis_key:stock:" + commodityId;
 
        return stockService.addStock(redisKey, 2);
    }
}

思路理解

庫存新增思路

庫存新增的操作一般不存在高併發的情況,因為不可能某一種商品一直在新增庫存,這屬於管理員後臺管理的一種操作。

這裡新增庫存採用了redis的

1.庫存發生新增操作,呼叫層一般傳過來商品的id標識和新增量,呼叫庫存新增服務

2.庫存新增服務

/**
 * 加庫存(還原庫存)
 * @param key    庫存key
 * @param num    庫存數量
 * @return 
 */
public long addStock(String key, int num) {

    return addStock(key, null, num);
}

庫存新增服務主要是使用了redis的increment自增操作。

3.辛苦新增服務

boolean hasKey = redisTemplate.hasKey(key);
// 判斷key是否存在,存在就直接更新
if (hasKey) {
    return redisTemplate.opsForValue().increment(key, num);
}

第一種情況是先判斷redis中是否有這個商品庫存的快取,如果存在該商品庫存,就直接進行增加操作;

Assert.notNull(expire,"初始化庫存失敗,庫存過期時間不能為null");
RedisLock redisLock = new RedisLock(redisTemplate, key);
try {
    if (redisLock.tryLock()) {
        // 獲取到鎖後再次判斷一下是否有key
        hasKey = redisTemplate.hasKey(key);
        if (!hasKey) {
            // 初始化庫存
            redisTemplate.opsForValue().set(key, num, expire, TimeUnit.SECONDS);
        }
    }
} catch (Exception e) {
    logger.error(e.getMessage(), e);
} finally {
    redisLock.unlock();
}

return num;

然後是第二種情況了,就是redis中沒有庫存快取了。所以就需要去初始化庫存。因為初始化庫存有一些非原子的操作,在分散式環境下不安全,所以這裡先通過這個商品id獲取分散式鎖,拿到鎖之後,再去判斷一下redis中是否有這個快取,確認沒有,則可以進行初始化操作,然會返回數量,初始化操作可以從資料庫查出真實庫存的值,然後更新到快取。

我這裡的案列是直接把第一次傳進來的庫存數量進行初始化。

可能設計的問題

在對某key進行increment()方法時,可能會報錯:

redis ERR value is not an integer or out of range 

這裡庫存新增我們使用的是RedisTemplateincrement的自增方法。

Spring對Redis序列化的策略有兩種,分別是StringRedisTemplateRedisTemplate,其中StringRedisTemplate用於操作字串,RedisTemplate使用的是JDK預設的二進位制序列化。

大家都知道redis序列化是將key,value值先轉換為流的形式,再儲存到redis中。

RedisTemplate是使用的JdkSerializationRedisSerializer序列化,序列化後的值包含了物件資訊,版本號,類資訊等,是一串字串,所以無法進行數值自增操作。

StringRedisTemplate序列化策略是字串的值直接轉為位元組陣列,所以儲存到redis中是數值,所以可以進行自增操作。

所以問題出在這裡,我們需要自定義序列化策略,在application啟動類中新增如下:

@Bean
public RedisTemplate<String, String> redisTemplate(RedisConnectionFactory factory) {
   StringRedisTemplate template = new StringRedisTemplate(factory);
   //定義key序列化方式
   //RedisSerializer<String> redisSerializer = new StringRedisSerializer();//Long型別會出現異常資訊;需要我們上面的自定義key生成策略,一般沒必要
   //定義value的序列化方式
   Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
   ObjectMapper om = new ObjectMapper();
   om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
   om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
   jackson2JsonRedisSerializer.setObjectMapper(om);
 
   // template.setKeySerializer(redisSerializer);
   template.setValueSerializer(jackson2JsonRedisSerializer);
   template.setHashValueSerializer(jackson2JsonRedisSerializer);
   template.afterPropertiesSet();
   return template;

庫存扣減思路

利用redis的incrby特性來扣減庫存,解決了超扣和效能問題。但是一旦快取丟失需要考慮恢復方案。

庫存發生扣減操作,呼叫層一般傳過來商品的id標識和扣減量,呼叫庫存扣減服務

long stock = stock(key, num);

第一步是進行扣減操作,在正常情況下,如果快取中存在庫存資料,則會進行正常的扣減操作,並且返回結果。

// 初始化庫存
if (stock == UNINITIALIZED_STOCK) {
    RedisLock redisLock = new RedisLock(redisTemplate, key);
    try {
        // 獲取鎖
        if (redisLock.tryLock()) {
            // 雙重驗證,避免併發時重複回源到資料庫
            stock = stock(key, num);
            if (stock == UNINITIALIZED_STOCK) {
                // 獲取初始化庫存
                final int initStock = stockCallback.getStock();
                // 將庫存設定到redis
                redisTemplate.opsForValue().set(key, initStock, expire, TimeUnit.SECONDS);
                // 調一次扣庫存的操作
                stock = stock(key, num);
            }
        }
    } catch (Exception e) {
        logger.error(e.getMessage(), e);
    } finally {
        redisLock.unlock();
    }

}

第二種是快取中還沒有資料的情況,則需要進行初始化操作。初始化庫存存在非原子操作,所以需要使用分散式鎖來實現。拿到鎖之後,在進行一次庫存扣減操作,看返回的結果還是不是沒有快取,這是進行一次雙重驗證,避免併發時重複回源到資料庫。第二次驗證的結果還是沒有快取的話,則需要進行一次初始化快取操作。初始化操作可以從資料庫查出真實庫存的值,然後更新到快取。然後再進行一次扣減操作。

可能存在的問題:

RedisTemplate執行lua指令碼,叢集模式下報錯解決

在使用spring的RedisTemplate執行lua指令碼時,報錯EvalSha is not supported in cluster environment,不支援cluster。

但是redis是支援lua指令碼的,只要拿到原redis的connection物件,通過connection去執行即可:

//spring自帶的執行指令碼方法中,叢集模式直接丟擲不支援執行指令碼異常,此處拿到原redis的connection執行指令碼
String result = (String)redisTemplate.execute(new RedisCallback<String>() {
    public String doInRedis(RedisConnection connection) throws DataAccessException {
        Object nativeConnection = connection.getNativeConnection();
        // 叢集模式和單點模式雖然執行指令碼的方法一樣,但是沒有共同的介面,所以只能分開執行
        // 叢集
        if (nativeConnection instanceof JedisCluster) {
            return (String) ((JedisCluster) nativeConnection).eval(LUA, keys, args);
        }

        // 單點
        else if (nativeConnection instanceof Jedis) {
            return (String) ((Jedis) nativeConnection).eval(LUA, keys, args);
        }
        return null;
    }
});