1. 程式人生 > 實用技巧 >《Redis深度歷險》六(事務+釋出訂閱+Stream)

《Redis深度歷險》六(事務+釋出訂閱+Stream)

事物

127.0.0.1:6379> multi
OK
127.0.0.1:6379> incr books
QUEUED
127.0.0.1:6379> incr books
QUEUED
127.0.0.1:6379> incr books
QUEUED
127.0.0.1:6379> exec
1) (integer) 2
2) (integer) 3
3) (integer) 4

multi指示事物的開始,exec指示事物的執行,discard指示事物的丟棄

特殊原子性

127.0.0.1:6379> multi
OK
127.0.0.1:6379> set notes iamastring
QUEUED
127.0.0.1:6379> incr notes
QUEUED
127.0.0.1:6379> set poorman iamdesperate
QUEUED
127.0.0.1:6379> exec
1) OK
2) (error) ERR value is not an integer or out of range
3) OK
127.0.0.1:6379> get notes
"iamastring"
127.0.0.1:6379> get poorman
"iamdesperate"

如上示例,雖然在執行中報錯了,但是後面的指令還繼續執行,redis的事物不能算原子性,僅僅滿足了事物的隔離性,隔離型中的序列化——當前執行的事物有著不被其他事物打斷的權利。

discard

127.0.0.1:6379> multi
OK
127.0.0.1:6379> incr books
QUEUED
127.0.0.1:6379> incr books
QUEUED
127.0.0.1:6379> discard
OK
127.0.0.1:6379> incr books
(integer) 1
127.0.0.1:6379> get books
"1"

可以丟棄之前所有指令。

watch

redis的watch是一種樂觀鎖,使用方式 虛擬碼 如下:

while True:
	do_watch()
	commands()
	multi()
	send_commands()
	try:
		exec()
		break
	except WatchError:
		continue

watch會在事物開始前,盯住1個活多個關鍵變數,當事物執行時,也就是伺服器收到了exec指令要順序執行快取的事物佇列,redis會檢查關鍵變數自watch之後,是否被修改了,如果被人動過了,exec就會返回null告訴客戶端執行失敗。

127.0.0.1:6379> watch books
OK
127.0.0.1:6379> incr books
(integer) 2
127.0.0.1:6379> multi
OK
127.0.0.1:6379> incr books
QUEUED
127.0.0.1:6379> exec  // 從watch到exec這期間,books被修改,所以exec失敗
(nil)
127.0.0.1:6379> get books
"2"
  • 注意:redis禁止在multi和exec之間執行watch命令,否則會報錯。

釋出訂閱

訂閱

> subscribe codehole.image codehole.text codehole.blog
1) "subscribe"
2) "codehole.image" 3) (integer) 1
1) "subscribe"
2) "codehole.text" 3) (integer) 2
1) "subscribe"
2) "codehole.blog" 3) (integer) 3

釋出

publish codehole.blog '{"content": "hello, everyone", "title": "welcome"}'

訊息結構

{'pattern': None, 'type': 'subscribe', 'channel': 'codehole', 'data': 1L}
  • data:資訊的內容,一個字串
  • channel:當前訂閱的主題
  • type:訊息的型別,普通的訊息就是message,如果是控制訊息,比如訂閱指令的反饋就是subscribe,如果是模式訂閱反饋,就是psubscribe
  • patter:表示當前訊息是那種模式訂閱得到的,如果是subscribe指令訂閱的,這個欄位就為空。

記憶體

記憶體回收機制

Redis 並不總是可以將空閒記憶體立即歸還給作業系統。

如果當前 Redis 記憶體有 10G,當你刪除了 1GB 的 key 後,再去觀察記憶體,你會發現 記憶體變化不會太大。原因是作業系統回收記憶體是以頁為單位,如果這個頁上只要有一個 key 還在使用,那麼它就不能被回收。Redis 雖然刪除了 1GB 的 key,但是這些 key 分散到了 很多頁面中,每個頁面都還有其它 key 存在,這就導致了記憶體不會立即被回收。

不過,如果你執行 flushdb,然後再觀察記憶體會發現記憶體確實被回收了。原因是所有的 key 都幹掉了,大部分之前使用的頁面都完全乾淨了,會立即被作業系統回收。

Redis 雖然無法保證立即回收已經刪除的 key 的記憶體,但是它會重用那些尚未回收的空 閒記憶體。這就好比電影院裡雖然人走了,但是座位還在,下一波觀眾來了,直接坐就行。而 作業系統回收記憶體就好比把座位都給搬走了。

主從同步

CAP(Consistent一致性,Avalability可用性,Partition Tolerance分割槽一致性)

網路分割槽時,一致性和可用性不能同時完好。

增量同步

Redis 同步的是指令流,主節點會將那些對自己的狀態產生修改性影響的指令記錄在本 地的記憶體 buffer 中,然後非同步將 buffer 中的指令同步到從節點,從節點一邊執行同步的指 令流來達到和主節點一樣的狀態,一遍向主節點反饋自己同步到哪裡了 (偏移量)。

因為記憶體的 buffer 是有限的,所以 Redis 主庫不能將所有的指令都記錄在記憶體 buffer 中。Redis 的複製記憶體 buffer 是一個定長的環形陣列,如果陣列內容滿了,就會從頭開始覆 蓋前面的內容。

如果因為網路狀況不好,從節點在短時間內無法和主節點進行同步,那麼當網路狀況恢 復時,Redis 的主節點中那些沒有同步的指令在 buffer 中有可能已經被後續的指令覆蓋掉 了,從節點將無法直接通過指令流來進行同步,這個時候就需要用到更加複雜的同步機制 — — 快照同步。

快照同步

是一個非常耗資源的操作,首先需要在主庫上進行一次記憶體資料的全量快照到磁碟,再將快照檔案的全部內容傳送到從節點。從節點快照檔案接受完畢後,進行一次全量載入,載入之前要將當前記憶體的資料清空,再進行增量同步。

在整個快照同步進行的過程中,主節點的複製 buffer 還在不停的往前移動,如果快照同 步的時間過長或者複製 buffer 太小,都會導致同步期間的增量指令在複製 buffer 中被覆 蓋,這樣就會導致快照同步完成後無法進行增量複製,然後會再次發起快照同步,如此極有 可能會陷入快照同步的死迴圈。

所以務必配置一個合適的複製 buffer 大小引數,避免快照複製的死迴圈。

增加從節點

增加從節點,必須要進行一次快照同步,完成後繼續進行增量同步。

無盤複製

所謂無盤複製是指主伺服器直接通過套接字 將快照內容傳送到從節點,生成快照是一個遍歷的過程,主節點會一邊遍歷記憶體,一遍將序 列化的內容傳送到從節點,從節點還是跟之前一樣,先將接收到的內容儲存到磁碟檔案中, 再進行一次性載入。

wait指令

redis的複製是非同步進行的,wait可以讓非同步程式設計同步,確保系統的強一致性。

> set key value 
OK
> wait 1 0 
(integer) 1

wait 提供兩個引數,第一個引數是從庫的數量 N,第二個引數是時間 t,以毫秒為單 位。它表示等待 wait 指令之前的所有寫操作同步到 N 個從庫 (也就是確保 N 個從庫的同 步沒有滯後),最多等待時間 t。如果時間 t=0,表示無限等待直到 N 個從庫同步完成達成 一致。

假設此時出現了網路分割槽,wait 指令第二個引數時間 t=0,主從同步無法繼續進行, wait 指令會永遠阻塞,Redis 伺服器將喪失可用性。

Stream

stream有一個訊息連結串列,將所有加入的訊息都串起來,每個訊息都有一個唯一的id對應,訊息是持久化的,redis重啟後,內容還在。

每個Stream有唯一的名稱,他就是redis的key,在xadd指令追加訊息時自動建立,每個Stream都可以掛多個消費組,每個消費組會有last_delivered_id在Stream陣列之上往前移動,並表示當前消費組消費到哪條訊息了。每個消費組都有一個Stream內唯一的名稱,不會自動建立,需要單獨的xgroup create建立,制定Stream的某個訊息ID開始消費,這個ID用來初始化last_delivered_id變數。

每個消費組 (Consumer Group) 的狀態都是獨立的,相互不受影響。也就是說同一份 Stream 內部的訊息會被每個消費組都消費到。

同一個消費組 (Consumer Group) 可以掛接多個消費者 (Consumer),這些消費者之間是 競爭關係,任意一個消費者讀取了訊息都會使遊標 last_delivered_id 往前移動。每個消費者有 一個組內唯一名稱。

消費者 (Consumer) 內部會有個狀態變數 pending_ids,它記錄了當前已經被客戶端讀取 的訊息,但是還沒有 ack。如果客戶端沒有 ack,這個變數裡面的訊息 ID 會越來越多,一 旦某個訊息被 ack,它就開始減少。這個 pending_ids 變數在 Redis 官方被稱之為 PEL,也 就是 Pending Entries List,這是一個很核心的資料結構,它用來確保客戶端至少消費了訊息一 次,而不會在網路傳輸的中途丟失了沒處理。

訊息ID

訊息 ID 的形式是 timestampInMillis-sequence,例如 1527846880572-5,它表示當前的消 息在毫米時間戳 1527846880572 時產生,並且是該毫秒內產生的第 5 條訊息。訊息 ID 可以 由伺服器自動生成,也可以由客戶端自己指定,但是形式必須是整數-整數,而且必須是後面 加入的訊息的 ID 要大於前面的訊息 ID。

CURD

127.0.0.1:6379> xadd person * name mike age 20
# * 號表示伺服器自動生成 ID,後面順序跟著一堆 key/value
"1605269145942-0"
127.0.0.1:6379> xadd person * name jack age 24
"1605269166153-0"
# -表示最小值 , + 表示最大值 按id來取
127.0.0.1:6379> xrange person - +
1) 1) "1605269145942-0"
   2) 1) "name"
      2) "mike"
      3) "age"
      4) "20"
2) 1) "1605269166153-0"
   2) 1) "name"
      2) "jack"
      3) "age"
      4) "24"
127.0.0.1:6379> xdel person 1605269166153-0
(integer) 1
127.0.0.1:6379> xlen person
(integer) 1
127.0.0.1:6379> xrange person - +
1) 1) "1605269145942-0"
   2) 1) "name"
      2) "mike"
      3) "age"
      4) "20"
127.0.0.1:6379> del person
(integer) 1
127.0.0.1:6379> xrange person - +
(empty array)

獨立消費

可以在不定義消費組的情況下進行獨立消費,當stream沒有新訊息時,可以阻塞等待。redis設定了一個單獨的消費指令xread,可以將Stream當成普通訊息佇列使用。

xread count 2 streams codehole 0-0 //從頭部讀取2條
xread count 1 streams codehole $ // 從尾部讀取一條,不會返回任何訊息
xread block 0 count 1 streams codehole $ 從尾部阻塞等待新訊息到來,下面的指令會堵住,直到新訊息到來

客戶端如果想要使用 xread 進行順序消費,那麼一定要記住當前消費到哪裡了 , 也就是返回的訊息 lD 下次繼續呼叫 xread 時,將上次返回的最後一個訊息 ID 作為 引數傳遞進去 , 就可以繼續消費後續的訊息。

block 0 表示永遠阻塞,直到訊息到來: block1000表示阻塞1s, 如果1s內沒有 任何訊息到來,就返回 nil。

建立消費組

Stream 通過xgroup create建立消費組,需要提供起始訊息ID引數來初始化last_delivered_id變數。

xgroup create mike cg1 0-0 表示從投開始消費
xgroup create mike cg2 $ 表示從尾部開始,只接受新訊息,當前訊息忽略
xinfo stream mike 獲取當前stream訊息
xinfo groups mkke 獲取stream的消費組資訊

消費

xreadgroup可以進行消費組的組內消費,需要提供消費組名稱,消費者名稱和起始訊息id,也可以阻塞等待新訊息。讀取新訊息後,對應的訊息id會進入消費者PEL結構裡,客戶端處理完畢使用xack通知伺服器,該訊息就會從PEL移除。

xreadgroup GROUP cg1 c1 count 1 streams mike > 
> 表示從當前消費組的last_delivered_id 後面開始讀,每消費1條就會前進

stream訊息太多怎麼辦

redis提供了一個定長Stream功能,在xadd指令中提供了一個定長長度maxlen,可以將老的訊息幹掉,確保連結串列不超過指定長度。

xlen mike 
5
xadd mike maxlen 3 * name he age 1
xlen mike
3

PEL如何避免訊息丟失

PEL裡儲存了發出去的訊息ID,待客戶端重新連線上後,可以在此收到PEL中的訊息ID列表,此時 xreadgroup 的起始訊息 D 必須是任意有效的消 息 ID,一般將引數設為 0-0, 表示讀取所有的 PEL 訊息以及自 last delivered id 之後 的新訊息。