redis訂閱模式pub/sub
Pub/Sub:
"釋出/訂閱"在redis中,被設計的非常輕量級和簡潔,它做到了訊息的“釋出”和“訂閱”的
基本能力;但是尚未提供關於訊息的持久化等各種企業級的特性。
一個Redis client釋出訊息,其他多個redis client訂閱訊息,釋出的訊息“即發即失”,redis
不會持久儲存釋出的訊息;訊息訂閱者也將只能得到訂閱之後的訊息,通道中此前的訊息將無
從獲得。
訊息釋出者,即publish客戶端,無需獨佔連結,你可以在publish訊息的同時,使用同一個redis-client連結進行其他操作(例如:INCR等)
訊息訂閱者,即subscribe客戶端,需要獨佔連結,即進行subscribe期間,redis-client無法穿插其他操作,
此時client以阻塞的方式等待“publish端”的訊息;因此這裡subscribe端需要使用單獨的連結,甚至需要在額外的執行緒中使用。
Tcp預設連線時間固定,如果在這時間內sub端沒有接收到pub端訊息,或pub端沒有訊息產生,sub端的連線都會被強制回收,
這裡就需要使用特殊手段解決,用定時器來模擬pub和sub之間的保活機制,定時器時間不能超過TCP最大連線時間,具體根據機器環境來定;
一旦subscribe端斷開連結,將會失去部分訊息,即連結失效期間的訊息將會丟失,所以這裡就需要考慮到藉助redis的list來持久化;
如果你非常關注每個訊息,那麼你應該基於Redis做一些額外的補充工作,如果你期望訂閱是持久的,那麼如下的設計思路可以借鑑:
1) subscribe端:
首先向一個Set集合中增加“訂閱者ID”, 此Set集合儲存了“活躍訂閱”者,
訂閱者ID標記每個唯一的訂閱者,此Set為 "活躍訂閱者集合"
2) subcribe端開啟訂閱操作,並基於Redis建立一個以 "訂閱者ID" 為KEY的LIST資料結構,
此LIST中儲存了所有的尚未消費的訊息,此List稱為 "訂閱者訊息佇列"
3) publish端:
每釋出一條訊息之後,publish端都需要遍歷 "活躍訂閱者集合",並依次
向每個 "訂閱者訊息佇列" 尾部追加此次釋出的訊息.
4) 到此為止,我們可以基本保證,釋出的每一條訊息,都會持久儲存在每個 "訂閱者訊息佇列" 中.
5) subscribe端,每收到一個訂閱訊息,在消費之後,必須刪除自己的 "訂閱者訊息佇列" 頭部的一條記錄.
6) subscribe端啟動時,如果發現自己的 "訂閱者訊息佇列" 有殘存記錄, 那麼將會首先消費這些記錄,然後再去訂閱.
以上方法可以保證成功到達的訊息必消費不丟失;
但還是會存在ngx業務機方自丟失資料問題,也就是ngx業務機自身問題或網路問題導致ngx業務機發布的訊息沒有送達redis機器;
更完善的確認機制才能徹底解決上述存在問題;
注意,在實際ngx_lua_redis應用中,redis單個客戶端訂閱模式下僅能使用有限的幾個命令,不能使用其它結構命令,如lpop,rpush等;
因為 publish是普通的request/response模式, 但subscribe不是,否則會報錯:
ERR only (P)SUBSCRIBE \/ (P)UNSUBSCRIBE \/ PING \/ QUIT allowed in this cont
關於這點以下是官網一般解釋:
You are required to use two connections for pub and sub. A subscriber connection cannot issue any commands
other than subscribe, psubscribe, unsubscribe, punsubscribe (although @Antirez has hinted of a subscriber-safe
ping in the future). If you try to do anything else, redis tells you:
-ERR only (P)SUBSCRIBE / (P)UNSUBSCRIBE / QUIT allowed in this context
(note that you can't test this with redis-cli, since that understands the protocol well enough to prevent you
from issuing commands once you have subscribed - but any other basic socket tool should work fine)
This is because subscriber connections work very differently - rather than working on a request/response basis,
incoming messages can now come in at any time, unsolicited.
publish is a regular request/response command, so must be sent on a regular connection, not a subscriber connection.
於是該特性不適用單例模式,要解決上面侷限,需要多客戶端輔助操作同一結果,下列程式碼中會有展示;
--[[
cosocket即coroutine+socket
順序執行,但它是非阻塞執行方式
因為nginx core是非阻塞執行;
redis中subscribe是阻塞方式,
因此在nginx_lua平臺中使用redis
中sub特性無法保持阻塞連線狀態;
流程模型:http://www.cnblogs.com/foundwant/p/6382083.html
]]
local args = ngx.req.get_uri_args()
local ttype = args.type -- pub/sub
local function newRedis(timeout, ip, port, section)
local red = redis.new()
red:set_timeout(timeout)
local ok, err = red:connect(ip, port)
if not ok then
nlog.dinfo("connect:" .. err)
end
red:select(section)
return red
end
local red = newRedis(10000, "127.0.0.1", "6379", 0)
local bak = newRedis(10000, "127.0.0.1", "6379", 0)
local function subscribe(channel)
local res, err = red:subscribe(channel)
if not res then
nlog.dinfo("subscribe error.")
return nil, err
end
--這裡以函式返回,不然sub會在這裡斷連失去可操作性
--這就是提到的特殊之一
local function read_func(do_read)
if nil == do_read or true == do_read then
res, err = red:read_reply()
if not res then
return nil, err
end
return res
end
red:unsubscribe(channel)
red:set_keepalive(60000, 100)
--連接回收
bak:close()
bak:set_keepalive(60000, 100)
--斷連後重啟等待
red = newRedis(10000, "127.0.0.1", "6379", 0)
red:subscribe(channel)
bak = newRedis(10000, "127.0.0.1", "6379", 0)
return
end
return read_func
end
local subset = "subset" --set
local channel = "test" --list
consume = function(length)
--若訂閱者訊息佇列有殘餘,先消費,再訂閱
for i=1, llength do
local recv, err = red:lpop(channel) --頭部開始消費
nlog.dinfo("recv:" .. cjson.encode(recv))
end
redis_util.coroutine_count = 1
coroutine.yield()
end
--訂閱者
if "sub" == ttype then
--向set集合增加"訂閱者id"
red:sadd(subset, channel)
--為每個"訂閱者id"建立list
local llength = red:llen(channel)
if 0 == llength then
red:rpush(channel, "hello")
else
--若訂閱者訊息佇列有殘餘,先消費,再訂閱
for i=1, llength do
local recv, err = red:lpop(channel) --頭部開始消費
nlog.dinfo("recv:" .. cjson.encode(recv))
end
end
nlog.dinfo("run coroutine after...")
--開始訂閱
local func, err = subscribe(channel)
while true do
local res, err = func() --res:["message","test","world"]
if err then
func(false)
end
--在redis的訂閱模式中,
--單例模式下只能使用固定幾個命令[ (P)SUBSCRIBE,(P)UNSUBSCRIBE,QUIT,PING,... ],
--無法使用其它命令,比如lpop, rpush等命令,
--所以這裡無法使用red:lpop()來執行出隊刪除操作,
--只能另起一個客戶端物件來進行刪除操作;
local oo, ooerr = bak:lpop(channel)
nlog.dinfo("bak lpop:" .. cjson.encode(oo))
nlog.dinfo("res:" .. cjson.encode(res))
ngx.sleep(1)
end
end
--釋出者,測試用,實際呼叫是在業務層
if "pub" == ttype then
--先發布,再追加佇列
--local subchannel, err = red:spop(subset)
--nlog.dinfo("subchannel:" .. type(subchannel))
--if "userdata" ~= type(subchannel) then
for i=1, 1000 do
local str = "world_" .. i
red:publish(channel, str)
red:rpush(channel, str) --尾部追加
ngx.sleep(0.1)
end
--end
end
--監聽器,crontab定時執行
if "spy" == ttype then
while true do
red:publish(channel, "0")
ngx.sleep(60)
end
end
ok, err = red:set_keepalive(60000, 100)
if not ok then
ngx.say("set_keepalive:", err)
end
ngx.print("rpush done.")
ngx.exit(200)
Lua_Redis