【閘道器開發】4.Openresty 事件通知機制的使用---events外掛
背景
在某些業務場景下,比如資料更新,程式執行,只需要某個程序執行一次,但是其他程序需要知道本次執行的結果,所以就需要一個通知機制,主工作程序執行程式,執行之後的結果通知給其他程序,當然本質上也是通過共享記憶體進行處理。
場景邏輯
原始碼準備
本次外掛來源依然是來自kong的外掛
外掛地址:https://github.com/Kong/lua-resty-worker-events
使用就是一個檔案lib/resty/worker 下的events.lua 不需要編譯與安裝
將其放在自己的resty 目錄下
本次程式碼地址:https://github.com/zhaoshoucheng/openresty/blob/main/pkg/lua_script/upstream/init.lua
主程序準備
我們需要一個worker進行任務的執行,以及推送event,為了與openresty的master做區別,我們可以將這個進行命名為main worker程序
如何確定main worker呢?因為所有worker在啟動時都是平級的,所以我們可以任意選擇worker作為main worker,所以我們利用第一個入共享記憶體資料的程序作為main worker。相當於一個鎖。
每次啟動時,因為master先執行,可以在這個階段清空共享記憶體,啟動其他程序時在重新選擇main worker
local module_name = (...):match("(.-)[^%.]+$") local cjson = require "cjson.safe" local upstream_conf = require(module_name .. "config") local upstream_shm = ngx.shared[upstream_conf.events_shm_name] -- 這裡是引入的配置檔案 -- on_init 需要在master執行,也就是init_by_lua_file 中執行 local function on_init() -- 刪除master upstream_shm:delete("upstream_master") end -- 判斷main worker的函式 local function is_master() local master = upstream_shm:get("upstream_master") if master then return false end upstream_shm:set("upstream_master","true") return true end
程式中利用is_master()返回值判斷是普通worker程序還是main worker
events程式
-- worker init 事件配置等初始化。shm 是使用共享記憶體的名字 upstream_conf.events_shm_name = events local ev = require "resty.worker.events" local events_ok, err = ev.configure( { shm = upstream_conf.events_shm_name, timeout = 2, -- life time of unique event data in shm interval = 0.1, -- poll interval (seconds) wait_interval = 0.010, -- wait before retry fetching event data wait_max = 0.5, -- max wait time before discarding event shm_retries = 100 -- number of retries when the shm returns "no memory" on posting an event }) if not events_ok then ngx.log(ngx.ERR, "failed to init events, err: "..tostring(err)) end -- 定義事件列表 local events = ev.event_list( upstream_conf.watch_path, -- available as _M.events._source "full_sync", -- available as _M.events.full_sync "sync_keys" -- available as _M.events.sync_keys ) -- 這裡包含兩個事件full_sync和sync_keys , _source 應該用任意字串就可以 --所有workers 註冊事件處理函式 local my_callback = function(data, event, source, pid) if event == events.full_sync then -- do sth elseif event == events.sync_keys then -- do sth end ngx.log(ngx.INFO,"get data event: "..cjson.encode(event).."data :"..data.."pid :"..tostring(pid).." now pid: "..tostring(ngx.worker.pid())) end ev.register(my_callback, events._source, events.full_sync) -- ev.register(my_callback, events._source, events.sync_keys) -- master 傳送事件 if is_master() then local raise_event = function(p, event, data) ngx.log(ngx.INFO,"master post event ") return ev.post(events._source, event, data) end -- raise_event(nil, events.full_sync, "test_event") ngx.timer.at(0, raise_event,events.full_sync, "test_event") end
測試
nginx 程序列表
思考與總結
直接執行 raise_event 和 ngx.timer.at(0) 有什麼區別的?如果是ngx.timer.at(1) 有什麼不一樣的現象嗎?
例如我們的master 啟動4個worker程序,他們分別會如何列印 “get data event:......”的日誌呢,可以觀察pid實驗一下。
我們上面的程式正常情況應該是4條(3條普通worker +1 條main worker)。
- 直接呼叫raise_event 會列印5條日誌,其中1條是main worker ,另外4條的程序全部是is shutting down的程序,也就是正在銷燬的4個程序還是會收到事件。這時另外3個程序還沒有register成功,post event 自然不會收到資料。
- 使用ngx.timer.at(0) 會列印 5 ~8 條不等,因為是ngx.timer是啟動lua協程啟動,但是這並不能保證其他程序的register會成功,所以除了上面的5條外,還有部分成功register程序會收到資料
3.將timer調長,由於我們的程序並沒有太多處理工作,所以reload時is shutting down的程序很快就會退出,register也很快就會成功,所以一般會按照預想的列印4條
這個在資料同步等設計時可能需要考慮一下,否則可能會出現意想不到資料不同步的問題。
整體在外掛使用時比較簡單,只是在融合其他外掛進行程式設計時需要多考慮一些。
後續會集合etcd、lmdb、cache、events 進行整體融合實現註冊中心的功能。