魅族春季新品即將到來,釋出會時間明日公佈
阿新 • • 發佈:2022-04-13
之前寫過一篇部落格,裡面吭哧吭哧半天,使用Redis實現了一個分散式鎖。
今天閒來沒事看原始碼,突然發現redis set命令的用法可以直接指定nx和ex,文件中沒有明說這是個原子方法,但是後面給出了一個例子使用set nx ex的方法實現了redis鎖。
感覺應該是原子性的,挺好。
相比這篇文章裡的方法,有兩個優點:
1)簡單,之前的那篇文章裡使用getSet方法,折騰了一頓,就是怕setnx之後expire成功不了,這裡直接原子性的話,省事多了。
2)解決了超時誤刪鎖引入的競態問題,之前我們在value中保留了時間,這裡我們可以保留一個uuid,判斷是自己的鎖再刪除,避免誤刪。
直接粘在下面,以後實現了例項再回來。api出處: http://doc.redisfans.com/string/set.html
命令 SET resource-name anystring NX EX max-lock-time 是一種在 Redis 中實現鎖的簡單方法。
客戶端執行以上的命令:
- 如果伺服器返回 OK ,那麼這個客戶端獲得鎖。
- 如果伺服器返回 NIL ,那麼客戶端獲取鎖失敗,可以在稍後再重試。
設定的過期時間到達之後,鎖將自動釋放。
可以通過以下修改,讓這個鎖實現更健壯:
- 不使用固定的字串作為鍵的值,而是設定一個不可猜測(non-guessable)的長隨機字串,作為口令串(token)。
- 不使用 DEL 命令來釋放鎖,而是傳送一個 Lua 指令碼,這個指令碼只在客戶端傳入的值和鍵的口令串相匹配時,才對鍵進行刪除。
這兩個改動可以防止持有過期鎖的客戶端誤刪現有鎖的情況出現。
以下是一個簡單的解鎖指令碼示例:
if redis.call("get",KEYS[1]) == ARGV[1]
then
return redis.call("del",KEYS[1])
else
return 0
end
這個指令碼可以通過 EVAL ...script... 1 resource-name token-value 命令來呼叫。
提供一個分散式鎖的實現
# -*- coding:utf-8 -*- from __future__ import print_function import redis import time import multiprocessing from contextlib import contextmanager as _contextmanager # 簡單建立redis的客戶端 r = redis.Redis(host='localhost', port=6379, db=0) # 分散式鎖實現 # finally中驗證本執行緒是否獲得鎖, 是為了防止誤刪別的執行緒獲取的鎖 @_contextmanager def dist_lock(client, key): dist_lock_key = 'lock:%s' % key is_acquire_lock = False try: is_acquire_lock = _acquire_lock(client, dist_lock_key) yield finally: if is_acquire_lock: _release_lock(client, dist_lock_key) # 嘗試獲取鎖 # 成功: 返回True, 失敗: 丟擲異常 # 使用set nx ex原語, 使得setnx和expire操作成為原子操作 def _acquire_lock(client, key): is_lock = r.set(key, 1, nx=True, ex=10) if not is_lock: raise Exception("already locked!") return is_lock # 釋放鎖 # 簡單刪除key # 如果刪除失敗, 鎖也會通過expire時間超時 def _release_lock(client, key): client.delete(key) # 測試函式 # 獲取鎖成功, 列印成功, 並持有鎖3s # 獲取鎖失敗, 直接列印 def func(): while 1: try: with dist_lock(r, 'key'): print("*", end='') time.sleep(3) except Exception, ex: print('!', end='') # 多程序啟動 # 這種模式下, 執行緒鎖無效, 可以驗證分散式鎖 process_list = list() for i in range(2): process_list.append(multiprocessing.Process(target=func)) for process in process_list: process.start() for process in process_list: process.join()