python中的Redis鍵空間通知(過期回撥)
介紹
Redis是一個記憶體資料結構儲存庫,用於快取,高速資料攝取,處理訊息佇列,分散式鎖定等等。
使用Redis優於其他記憶體儲存的優點是Redis提供永續性和資料結構,如列表,集合,有序集和雜湊。
在本文中,我想簡要介紹一下Redis鍵空間通知。我將解釋鍵空間通知是什麼,並演示如何配置Redis以接收它們。然後我將向您展示如何在python中訂閱Redis通知。
啟用鍵空間通知
預設情況下,禁用鍵空間事件通知。我們可以在redis.conf或redis-cli中啟用它們,如下所示:
$ redis-cli config set notify-keyspace-events KEA OK
該KEA
字串意味著每一個可能的事件被啟用。要檢視每個字元的含義,請檢視文件。該CLI可以在特殊模式下,它允許您訂閱的頻道,以接收郵件的工作。
現在讓我們檢查事件是否正常:
$ redis-cli --csv psubscribe '*'
Reading messages... (press Ctrl-C to quit)
"psubscribe","*",1
psubscribe '*'
意味著我們想要使用模式訂閱所有事件*
。在新的終端輸入redis-cli和SET key1
to value1
。
127.0.0.1:6379> set key1 value1 OK
在第一個終端,您將看到:
$ redis-cli --csv psubscribe '*'
Reading messages... (press Ctrl-C to quit)
"psubscribe","*",1
"pmessage","*","[email protected]__:key1","set"
"pmessage","*","[email protected]__:set","key1
通知正在執行:)
Redis鍵空間通知
Redis金鑰空間通知自2.8.0版開始提供。對於每個更改任何Redis金鑰的操作,我們可以配置Redis將訊息釋出到Pub / Sub。
上面你收到了三個事件:
"psubscribe","*",1
"pmessage","*","[email protected]__:key1","set"
"pmessage","*","[email protected]__:set","key1
第一個事件意味著我們已成功訂閱作為回覆中第二個元素的通道。1表示我們當前訂閱的頻道數。第二個事件是金鑰空間通知。在金鑰空間通道中,我們收到了事件的名稱set
作為訊息。第三個事件是關鍵事件通知。在keyevent頻道中,我們收到了金鑰的名稱key1
作為訊息。
Redis Pub / Sub
使用Redis的Pub / Sub圖層傳遞事件。為了訂閱頻道channel1
和channel2
,客戶端發出一個訂閱與頻道的名稱命令:
SUBSCRIBE channel1 channel2
其他客戶(釋出者)傳送到這些頻道的訊息將由Redis推送到所有訂閱的客戶端(訂閱者)。Redis Pub / Sub實現支援模式匹配。客戶端可以訂閱glob樣式模式,以便使用PSUBSCRIBE接收發送到與給定模式匹配的通道名稱的所有訊息。例如:
PSUBSCRIBE channel*
將接收所有發來的簡訊channel1
,channel.b
等等。
如果您的釋出/訂閱客戶端斷開連線並稍後重新連線,則在客戶端斷開連線期間傳遞的所有事件都將丟失。
Redis為每個客戶端維護一個客戶端輸出緩衝區。Pub / Sub的客戶端輸出緩衝區的預設限制設定為:
client-output-buffer-limit pubsub 32mb 8mb 60
Redis將強制客戶端在兩種情況下斷開連線:如果輸出緩衝區增長超過32MB,或者輸出緩衝區持續保持8MB資料60秒。這些跡象表明客戶消費資料的速度比釋出時慢。
將來有計劃允許更可靠的事件傳遞,但可能會在更一般的層面上解決,要麼為Pub / Sub本身帶來可靠性,要麼允許Lua指令碼攔截Pub / Sub訊息以執行推送等操作把事件放到一個清單中。
訂閱python中的通知
首先我們需要Redis redis-py的python客戶端,所以讓我們安裝它:
$ pip install redis
事件迴圈
看看下面的程式碼。它訂閱所有鍵空間通知並列印任何收到的。
import time
from redis import StrictRedis
redis = StrictRedis(host='localhost', port=6379)
pubsub = redis.pubsub()
pubsub.psubscribe('[email protected]__:*')
print('Starting message loop')
while True:
message = pubsub.get_message()
if message:
print(message)
else:
time.sleep(0.01)
這就是我們建立Redis連線的方式:
redis = StrictRedis(host='localhost', port=6379)
預設情況下,所有響應都以位元組形式返回。使用者負責解碼它們。如果應解碼來自客戶端的所有字串響應,則使用者可以將SID_responses = True指定為StrictRedis。在這種情況下,任何返回字串型別的Redis命令都將使用指定的編碼進行解碼。
接下來,我們建立一個pubsub物件,該物件訂閱一個頻道並偵聽新訊息:
pubsub = redis.pubsub()
pubsub.psubscribe('[email protected]__:*')
然後我們通過無限迴圈等待事件:
while True:
message = pubsub.get_message()
...
如果有資料,get_message()將讀取並返回它。如果沒有資料,則該方法將返回None。
從pubsub例項讀取的每條訊息都是一個包含以下鍵的字典:
- 鍵入:下列之一:
subscribe
,unsubscribe
,psubscribe
,punsubscribe
,message
,pmessage
- channel:訂閱的頻道或釋出訊息的頻道
- pattern:匹配已釋出訊息的通道的模式(除型別外在所有情況下均為None
pmessage
) - data:訊息資料
現在啟動python指令碼,在另一個終端輸入帶有值的redis-cli和SET鍵mykey
myvalue
127.0.0.1:6379> set mykey myvalue
OK
您將看到指令碼的以下輸出:
$ python subscribe.py
Starting message loop
{'type': 'psubscribe', 'data': 1, 'channel': b'[email protected]__:*', 'pattern': None}
{'type': 'pmessage', 'data': b'set', 'channel': b'[email protected]__:mykey', 'pattern': b'[email protected]__:*'}
回撥
也可以註冊回撥函式來處理已釋出的訊息。訊息處理程式只接受一個引數即訊息。要使用訊息處理程式訂閱通道或模式,請將通道或模式名稱作為關鍵字引數傳遞,其值為回撥函式。當使用訊息處理程式在通道或模式上讀取訊息時,將建立訊息字典並將其傳遞給訊息處理程式。在這種情況下,從get_message()返回None值,因為訊息已經處理完畢。
import time
from redis import StrictRedis
redis = StrictRedis(host='localhost', port=6379)
pubsub = redis.pubsub()
def event_handler(msg):
print('Handler', msg)
pubsub.psubscribe(**{'[email protected]__:*': event_handler})
print('Starting message loop')
while True:
message = pubsub.get_message()
if message:
print(message)
else:
time.sleep(0.01)
127.0.0.1:6379> set mykey myvalue
OK
如您所見,set事件mykey
由event_handler回撥處理。
$ python subscribe2.py
Starting message loop
{'pattern': None, 'channel': b'[email protected]__:*', 'data': 1, 'type': 'psubscribe'}
Handler {'pattern': b'[email protected]__:*', 'channel': b'[email protected]__:mykey', 'data': b'set', 'type': 'pmessage'}
單獨執行緒中的事件迴圈
另一種選擇是在單獨的執行緒中執行事件迴圈:
import time
from redis import StrictRedis
redis = StrictRedis(host='localhost', port=6379)
def event_handler(msg):
print(msg)
thread.stop()
pubsub = redis.pubsub()
pubsub.psubscribe(**{'[email protected]__:expired': event_handler})
thread = pubsub.run_in_thread(sleep_time=0.01)
上面的程式碼建立了一個新執行緒並啟動了事件迴圈。處理完第一個過期事件後,我們使用該thread.stop()
方法關閉事件迴圈和執行緒。
在幕後,這只是一個圍繞get_message()的包裝器,它在一個單獨的執行緒中執行。run_in_thread()
採用可選sleep_time
引數。如果指定,則事件迴圈將使用迴圈的每次迭代中的值呼叫time.sleep()。
127.0.0.1:6379> set mykey myvalue ex 1
OK
預期產量:
$ python subscribe3.py
{'type': 'pmessage', 'channel': b'[email protected]__:expired', 'pattern': b'[email protected]__:expired', 'data': b'mykey'}
概要
Redis的一個常見用例是,當應用程式需要能夠響應儲存在特定金鑰或金鑰中的值可能發生的更改時。感謝金鑰空間通知和Pub / Sub,我們可以響應Redis資料中的更改。通知非常容易使用,而事件處理器可以在地理上分佈。
最大的缺點是Pub / Sub實現要求釋出者和訂閱者一直處於啟動狀態。訂閱伺服器在停止或連線丟失時會丟失資料。