1. 程式人生 > 其它 >使用Redis分散式鎖處理併發,解決超賣問題

使用Redis分散式鎖處理併發,解決超賣問題

一、使用Apache ab模擬併發壓測

1、壓測工具介紹

$ ab -n 100 -c 100 http://www.baidu.com/

-n表示發出100個請求,-c模擬100個併發,相當是100個人同時訪問。

還可以這樣寫:

$ ab -t 60 -c 100 http://www.baidu.com/

-t表示60秒,-c是100個併發,會在連續60秒內不停的發出請求。

使用ab工具模擬多執行緒併發請求,對發出負載的機器要求比較低,既不會佔用很多cpu,也不會佔用很多的記憶體,因此也是很多DDoS攻擊的必備良藥,不過要慎用,別耗光自己機器的資源。通常來說1000個請求,100個併發算是比較正常的模擬。

至於工具的使用,具體見:Apache ab 測試工具使用(一)

下載後,進入support資料夾,執行命令。

2、併發測試

我建立了兩張表,一個商品表,一個訂單記錄表;
然後寫了兩個介面,一個是查詢商品資訊,一個是下單秒殺。

查詢訂單:

秒殺下單:

當我併發測試時:

$ ab -n 500 -c 100 http://localhost:8080/seckill/1/

這TM肯定不行啊,這就超賣了,明明沒這麼多商品,結果還賣出去了。。。

二、synchronized處理併發

首先,synchronized的確是一個解決辦法,而且也很簡單,在方法前面加一個synchronized關鍵字。

但是通過壓測,發現請求變的很慢,因為:
synchronized就用一個鎖把這個方法鎖住了,每次訪問這個方法,只會有一個執行緒,所以這就是它導致慢的原因。通過這種方式,保證這個方法中的程式碼都是單執行緒來處理,不會出什麼問題。

同時,使用synchronized還是存在一些問題的,首先,它無法做到細粒度的控制,比如同一時間有秒殺A商品和B商品的請求,都進入到了這個方法,雖然秒殺A商品的人很多,但是秒殺B商品的人很少,但是即使是買B商品,進入到了這個方法,也會一樣的慢。

最重要的是,它只適合單點的情況。如果以後程式水平擴充套件了,弄了個叢集,很顯然,負載均衡之後,不同的使用者看到的結果一定是五花八門的。

所以,還是使用更好的辦法,使用redis分散式鎖。

三、redis分散式鎖

1、兩個redis的命令

setnx key value簡單來說,setnx

就是,如果沒有這個key,那麼就set一個key-value, 但是如果這個key已經存在,那麼將不會再次設定,get出來的value還是最開始set進去的那個value.
網站中還專門講到可以使用!SETNX加鎖,如果獲得鎖,返回1,如果返回0,那麼該鍵已經被其他的客戶端鎖定。
並且也提到了如何處理死鎖。

getset key value這個就更簡單了,先通過key獲取value,然後再將新的value set進去。

2、redis分散式鎖的實現

我們希望的,無非就是這一段程式碼,能夠單執行緒的去訪問,因此在這段程式碼之前給他加鎖,相應的,這段程式碼後面要給它解鎖:

2.1 引入redis依賴

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

2.2 配置redis

spring:
  redis:
    host: localhost
    port: 6379

2.3 編寫加鎖和解鎖的方法

package com.vito.service;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;


@Component
public class RedisLock {

    Logger logger = LoggerFactory.getLogger(this.getClass());

    @Autowired
    private StringRedisTemplate redisTemplate;

    /**
     * 加鎖
     * @param key   商品id
     * @param value 當前時間+超時時間
     * @return
     */
    public boolean lock(String key, String value) {
        if (redisTemplate.opsForValue().setIfAbsent(key, value)) {     //這個其實就是setnx命令,只不過在java這邊稍有變化,返回的是boolea
            return true;
        }

        //避免死鎖,且只讓一個執行緒拿到鎖
        String currentValue = redisTemplate.opsForValue().get(key);
        //如果鎖過期了
        if (!StringUtils.isEmpty(currentValue) && Long.parseLong(currentValue) < System.currentTimeMillis()) {
            //獲取上一個鎖的時間
            String oldValues = redisTemplate.opsForValue().getAndSet(key, value);

            /*
               只會讓一個執行緒拿到鎖
               如果舊的value和currentValue相等,只會有一個執行緒達成條件,因為第二個執行緒拿到的oldValue已經和currentValue不一樣了
             */
            if (!StringUtils.isEmpty(oldValues) && oldValues.equals(currentValue)) {
                return true;
            }
        }
        return false;
    }


    /**
     * 解鎖
     * @param key
     * @param value
     */
    public void unlock(String key, String value) {
        try {
            String currentValue = redisTemplate.opsForValue().get(key);
            if (!StringUtils.isEmpty(currentValue) && currentValue.equals(value)) {
                redisTemplate.opsForValue().getOperations().delete(key);
            }
        } catch (Exception e) {
            logger.error("『redis分散式鎖』解鎖異常,{}", e);
        }
    }
}

為什麼要有避免死鎖的一步呢?
假設沒有『避免死鎖』這一步,結果在執行到下單程式碼的時候出了問題,畢竟操作資料庫、網路、io的時候拋了個異常,這個異常是偶然丟擲來的,就那麼偶爾一次,那麼會導致解鎖步驟不去執行,這時候就沒有解鎖,後面的請求進來自然也或得不到鎖,這就被稱之為死鎖。
而這裡的『避免死鎖』,就是給鎖加了一個過期時間,如果鎖超時了,就返回true,解開之前的那個死鎖。

2.4 下單程式碼中引入加鎖和解鎖,確保只有一個執行緒操作

@Autowired
private RedisLock redisLock;

@Override
@Transactional
public String seckill(Integer id)throws RuntimeException {
    //加鎖
    long time = System.currentTimeMillis() + 1000*10;  //超時時間:10秒,最好設為常量

    boolean isLock = redisLock.lock(String.valueOf(id), String.valueOf(time));
    if(!isLock){
        throw new RuntimeException("人太多了,換個姿勢再試試~");
    }

    //查庫存
    Product product = productMapper.findById(id);
    if(product.getStock()==0) throw new RuntimeException("已經賣光");
    //寫入訂單表
    Order order=new Order();
    order.setProductId(product.getId());
    order.setProductName(product.getName());
    orderMapper.add(order);
    //減庫存
    product.setPrice(null);
    product.setName(null);
    product.setStock(product.getStock()-1);
    productMapper.update(product);

    //解鎖
    redisLock.unlock(String.valueOf(id),String.valueOf(time));

    return findProductInfo(id);
}

這樣再來跑幾次壓測,就不會超賣了: