1. 程式人生 > 實用技巧 >redis分散式鎖解決超賣問題

redis分散式鎖解決超賣問題

redis事務

  redis事務介紹:

    1. redis事務可以一次執行多個命令,本質是一組命令的集合。

    2.一個事務中的所有命令都會序列化,按順序序列化的執行而不會被其他命令插入

    作用:一個佇列中,一次性、順序性、排他性的執行一系列命令

  multi指令的使用

      1. 下面指令演示了一個完整的事物過程,所有指令在exec前不執行,而是快取在伺服器的一個事物佇列中

      2. 伺服器一旦收到exec指令才開始執行事物佇列,執行完畢後一次性返回所有結果

      3. 因為redis是單執行緒的,所以不必擔心自己在執行佇列是被打斷,可以保證這樣的“原子性”

      注:redis事物在遇到指令失敗後,後面的指令會繼續執行

# Multi 命令用於標記一個事務塊的開始事務塊內的多條命令會按照先後順序被放進一個隊列當中,最後由 EXEC 命令原子性( atomic )地執行
> multi(開始一個redis事物)
incr books
incr books
> exec (執行事物)
> discard (丟棄事物)
[root@redis ~]# redis-cli
127.0.0.1:6379> multi
OK
127.0.0.1:6379> set test 123
QUEUED
127.0.0.1:6379> exec
1) OK 127.0.0.1:6379> get test "123" 127.0.0.1:6379> multi OK 127.0.0.1:6379> set test 456 QUEUED 127.0.0.1:6379> discard OK 127.0.0.1:6379> get test "123" 127.0.0.1:6379>
redis客戶端測試事務指令
def multi_test():
    r = redis.Redis(host='127.0.0.1')
    pipe = r.pipeline()
    pipe.multi()             
#開啟事務 pipe.set('key2', 400) #儲存子命令 pipe.execute() #執行事務 print("第一次事務提交後的結果"+r.get('key2').decode("utf-8")) pipe.multi() # 開啟事務 pipe.set('key2', 100) # 儲存子命令 print("第二次未提交事務的結果"+r.get("key2").decode("utf-8")) #第一次事務提交後的結果400 #第二次未提交事務的結果400
python測試事務指令

注:mysql的rollback與redis的discard的區別

   1.mysql回滾為sql全部成功才執行,一條sql失敗則全部失敗,執行rollback後所有語句造成的影響消失

   2.redis的discard只是結束本次事務,正確命令造成的影響仍然還在.

     1)redis如果在一個事務中的命令出現錯誤,那麼所有的命令都不會執行
     2)redis如果在一個事務中出現執行錯誤,那麼正確的命令會被執行

  watch 指令作用

      實質:WATCH 只會在資料被其他客戶端搶先修改了的情況下通知執行命令的這個客戶端(通過 WatchError 異常)但不會阻止其他客戶端對資料的修改

      1.watch其實就是redis提供的一種樂觀鎖,可以解決併發修改問題

      2. watch會在事物開始前盯住一個或多個關鍵變數,當伺服器收到exec指令要順序執行快取中的事物佇列時,redis會檢查關鍵變數自watch後是否被修改

      3.WATCH 只會在資料被其他客戶端搶先修改了的情況下通知執行命令的這個客戶端(通過 WatchError 異常)但不會阻止其他客戶端對資料的修改

watch+multi實現樂觀鎖

setnx指令(redis的分散式鎖)

  1、分散式鎖

    1. 分散式鎖本質是佔一個坑,當別的程序也要來佔坑時發現已經被佔,就會放棄或者稍後重試

    2. 佔坑一般使用 setnx(set if not exists)指令,只允許一個客戶端佔坑

    3. 先來先佔,用完了在呼叫del指令釋放坑

> setnx lock:codehole true
.... do something critical ....
> del lock:codehole

    4. 但是這樣有一個問題,如果邏輯執行到中間出現異常,可能導致del指令沒有被呼叫,這樣就會陷入死鎖,鎖永遠無法釋放

    5. 為了解決死鎖問題,我們拿到鎖時可以加上一個expire過期時間,這樣即使出現異常,當到達過期時間也會自動釋放鎖

> setnx lock:codehole true
> expire lock:codehole 5
.... do something critical ....
> del lock:codehole

    6. 這樣又有一個問題,setnx和expire是兩條指令而不是原子指令,如果兩條指令之間程序掛掉依然會出現死鎖

    7. 為了治理上面亂象,在redis 2.8中加入了set指令的擴充套件引數,使setnx和expire指令可以一起執行

> set lock:codehole true ex 5 nx
''' do something '''
> del lock:codehole

redis解決超賣問題

  1、使用reids的 watch + multi 指令實現

#! /usr/bin/env python
# -*- coding: utf-8 -*-
import redis
def sale(rs):
    while True:
        with rs.pipeline() as p:
            try:
                p.watch('apple')                   # 監聽key值為apple的資料數量改變
                count = int(rs.get('apple'))
                print('拿取到了蘋果的數量: %d' % count)
                p.multi()                          # 事務開始
                if count> 0 :                      # 如果此時還有庫存
                    p.set('apple', count - 1)
                    p.execute()                    # 執行事務
                p.unwatch()
                break                              # 當庫存成功減一或沒有庫存時跳出執行迴圈
            except Exception as e:                 # 當出現watch監聽值出現修改時,WatchError異常丟擲
                print('[Error]: %s' % e)
                continue                           # 繼續嘗試執行

rs = redis.Redis(host='127.0.0.1', port=6379)      # 連線redis
rs.set('apple',1000)                               # # 首先在redis中設定某商品apple 對應數量value值為1000
sale(rs)
Views.py

  1)原理

     1.當用戶購買時,通過 WATCH 監聽使用者庫存,如果庫存在watch監聽後發生改變,就會捕獲異常而放棄對庫存減一操作

     2.如果庫存沒有監聽到變化並且數量大於1,則庫存數量減一,並執行任務

  2)弊端

     1.Redis 在嘗試完成一個事務的時候,可能會因為事務的失敗而重複嘗試重新執行

     2.保證商品的庫存量正確是一件很重要的事情,但是單純的使用 WATCH 這樣的機制對伺服器壓力過大

  2、使用reids的 watch + multi +setnx指令實現

    1)為什麼要自己構建鎖

        1.雖然有類似的 SETNX 命令可以實現 Redis 中的鎖的功能,但他鎖提供的機制並不完整

        2. 並且setnx也不具備分散式鎖的一些高階特性,還是得通過我們手動構建

    2)建立一個redis鎖

        1.在 Redis 中,可以通過使用 SETNX 命令來構建鎖:rs.setnx(lock_name, uuid值)

        2.而鎖要做的事情就是將一個隨機生成的 128 位 UUID 設定位鍵的值,防止該鎖被其他程序獲取

    3)釋放鎖

        1.鎖的刪除操作很簡單,只需要將對應鎖的 key 值獲取到的 uuid 結果進行判斷驗證

        2.符合條件(判斷uuid值)通過 delete 在 redis 中刪除即可,rs.delete(lockname)

        3.此外當其他使用者持有同名鎖時,由於 uuid 的不同,經過驗證後不會錯誤釋放掉別人的鎖

    4)解決鎖無法釋放問題

        1.在之前的鎖中,還出現這樣的問題,比如某個程序持有鎖之後突然程式崩潰,那麼會導致鎖無法釋放

        2.而其他程序無法持有鎖繼續工作,為了解決這樣的問題,可以在獲取鎖的時候加上鎖的超時功能

import redis
import uuid
import time

# 1.初始化連線函式
def get_conn(host,port=6379):
    rs = redis.Redis(host=host, port=port)
    return rs

# 2. 構建redis鎖
def acquire_lock(rs, lock_name, expire_time=10):
    '''
    rs: 連線物件
    lock_name: 鎖標識
    acquire_time: 過期超時時間
    return -> False 獲鎖失敗 or True 獲鎖成功
    '''
    identifier = str(uuid.uuid4())
    end = time.time() + expire_time
    while time.time() < end:
        # 當獲取鎖的行為超過有效時間,則退出迴圈,本次取鎖失敗,返回False
        if rs.setnx(lock_name, identifier): # 嘗試取得鎖
            return identifier
        # time.sleep(.001)
        return False

# 3. 釋放鎖
def release_lock(rs, lockname, identifier):
    '''
    rs: 連線物件
    lockname: 鎖標識
    identifier: 鎖的value值,用來校驗
    '''


    if rs.get(lockname).decode() == identifier:  # 防止其他程序同名鎖被誤刪
        rs.delete(lockname)
        return True            # 刪除鎖
    else:
        return False           # 刪除失敗

#有過期時間的鎖
def acquire_expire_lock(rs, lock_name, expire_time=10, locked_time=10):
    '''
    rs: 連線物件
    lock_name: 鎖標識
    acquire_time: 過期超時時間
    locked_time: 鎖的有效時間
    return -> False 獲鎖失敗 or True 獲鎖成功
    '''
    identifier = str(uuid.uuid4())
    end = time.time() + expire_time
    while time.time() < end:
        # 當獲取鎖的行為超過有效時間,則退出迴圈,本次取鎖失敗,返回False
        if rs.setnx(lock_name, identifier): # 嘗試取得鎖
            # print('鎖已設定: %s' % identifier)
            rs.expire(lock_name, locked_time)
            return identifier
        time.sleep(.001)
    return False


'''在業務函式中使用上面的鎖'''
def sale(rs):
    start = time.time()            # 程式啟動時間
    with rs.pipeline() as p:
        '''
        通過管道方式進行連線
        多條命令執行結束,一次性獲取結果
        '''

        while 1:
            lock = acquire_lock(rs, 'lock')
            if not lock: # 持鎖失敗
                continue

            #開始監測"lock"
            p.watch("lock")
            try:
                #開啟事務
                p.multi()
                count = int(rs.get('apple')) # 取量
                p.set('apple', count-1)      # 減量
                # time.sleep(5)
                #提交事務
                p.execute()
                print('當前庫存量: %s' % count)
                #成功則跳出迴圈
                break
            except:
                #事務失敗對應處理
                print("修改資料失敗")

            #無論成功與否最終都需要釋放鎖
            finally:

                res = release_lock(rs, 'lock', lock)
                #釋放鎖成功,
                if res:
                    print("刪除鎖成功")
                #釋放鎖失敗,強制刪除
                else:
                    print("刪除鎖失敗,強制刪除鎖")
                    res = rs.delete('lock')
                    print(res)

        print('[time]: %.2f' % (time.time() - start))

rs = redis.Redis(host='127.0.0.1', port=6379)      # 連線redis
# rs.set('apple',1000)                               # # 首先在redis中設定某商品apple 對應數量value值為1000
sale(rs)
views.py

優化鎖無法釋放的問題,為鎖新增過期時間

def acquire_expire_lock(rs, lock_name, expire_time=10, locked_time=10):
    '''
    rs: 連線物件
    lock_name: 鎖標識
    acquire_time: 過期超時時間
    locked_time: 鎖的有效時間
    return -> False 獲鎖失敗 or True 獲鎖成功
    '''
    identifier = str(uuid.uuid4())
    end = time.time() + expire_time
    while time.time() < end:
        # 當獲取鎖的行為超過有效時間,則退出迴圈,本次取鎖失敗,返回False
        if rs.setnx(lock_name, identifier): # 嘗試取得鎖
            # print('鎖已設定: %s' % identifier)
            rs.expire(lock_name, locked_time)
            return identifier
        time.sleep(.001)
    return False