1. 程式人生 > 其它 >【閘道器開發】4.Openresty 事件通知機制的使用---events外掛

【閘道器開發】4.Openresty 事件通知機制的使用---events外掛

背景

在某些業務場景下,比如資料更新,程式執行,只需要某個程序執行一次,但是其他程序需要知道本次執行的結果,所以就需要一個通知機制,主工作程序執行程式,執行之後的結果通知給其他程序,當然本質上也是通過共享記憶體進行處理。

場景邏輯

原始碼準備

本次外掛來源依然是來自kong的外掛
外掛地址:https://github.com/Kong/lua-resty-worker-events
使用就是一個檔案lib/resty/worker 下的events.lua 不需要編譯與安裝
將其放在自己的resty 目錄下
本次程式碼地址:https://github.com/zhaoshoucheng/openresty/blob/main/pkg/lua_script/upstream/init.lua

主程序準備

我們需要一個worker進行任務的執行,以及推送event,為了與openresty的master做區別,我們可以將這個進行命名為main worker程序
如何確定main worker呢?因為所有worker在啟動時都是平級的,所以我們可以任意選擇worker作為main worker,所以我們利用第一個入共享記憶體資料的程序作為main worker。相當於一個鎖。
每次啟動時,因為master先執行,可以在這個階段清空共享記憶體,啟動其他程序時在重新選擇main worker

local module_name = (...):match("(.-)[^%.]+$")
local cjson = require "cjson.safe"
local upstream_conf = require(module_name .. "config")
local upstream_shm = ngx.shared[upstream_conf.events_shm_name]
-- 這裡是引入的配置檔案
-- on_init 需要在master執行,也就是init_by_lua_file 中執行
local function on_init()
    -- 刪除master 
    upstream_shm:delete("upstream_master")
end

-- 判斷main worker的函式
local function is_master()
    local master = upstream_shm:get("upstream_master")
    if master then
        return false
    end
    upstream_shm:set("upstream_master","true")
    return true
end

程式中利用is_master()返回值判斷是普通worker程序還是main worker

events程式

 -- worker init  事件配置等初始化。shm 是使用共享記憶體的名字 upstream_conf.events_shm_name = events
    local ev = require "resty.worker.events"
    local events_ok, err =
    ev.configure(
    {
        shm = upstream_conf.events_shm_name,
        timeout = 2, -- life time of unique event data in shm
        interval = 0.1, -- poll interval (seconds)
        wait_interval = 0.010, -- wait before retry fetching event data
        wait_max = 0.5, -- max wait time before discarding event
        shm_retries = 100 -- number of retries when the shm returns "no memory" on posting an event
    })
    if not events_ok then
        ngx.log(ngx.ERR, "failed to init events, err: "..tostring(err))
    end

  -- 定義事件列表
  local events = ev.event_list(
        upstream_conf.watch_path, -- available as _M.events._source
        "full_sync",                -- available as _M.events.full_sync
        "sync_keys"                  -- available as _M.events.sync_keys
  )
  -- 這裡包含兩個事件full_sync和sync_keys , _source 應該用任意字串就可以
  
 --所有workers 註冊事件處理函式
   local my_callback = function(data, event, source, pid)
        if event == events.full_sync then
            -- do sth
        elseif event == events.sync_keys then
            -- do sth
        end
        ngx.log(ngx.INFO,"get data event: "..cjson.encode(event).."data :"..data.."pid :"..tostring(pid).." now pid: "..tostring(ngx.worker.pid()))
    end
    ev.register(my_callback, events._source, events.full_sync)
-- ev.register(my_callback, events._source, events.sync_keys)

  -- master 傳送事件
  if is_master() then
      local raise_event = function(p, event, data)
          ngx.log(ngx.INFO,"master post event ")
          return ev.post(events._source, event, data)
      end
      -- raise_event(nil, events.full_sync, "test_event")
      ngx.timer.at(0, raise_event,events.full_sync, "test_event")
  end 

測試

nginx 程序列表

思考與總結

直接執行 raise_event 和 ngx.timer.at(0) 有什麼區別的?如果是ngx.timer.at(1) 有什麼不一樣的現象嗎?
例如我們的master 啟動4個worker程序,他們分別會如何列印 “get data event:......”的日誌呢,可以觀察pid實驗一下。
我們上面的程式正常情況應該是4條(3條普通worker +1 條main worker)。

  1. 直接呼叫raise_event 會列印5條日誌,其中1條是main worker ,另外4條的程序全部是is shutting down的程序,也就是正在銷燬的4個程序還是會收到事件。這時另外3個程序還沒有register成功,post event 自然不會收到資料。
  2. 使用ngx.timer.at(0) 會列印 5 ~8 條不等,因為是ngx.timer是啟動lua協程啟動,但是這並不能保證其他程序的register會成功,所以除了上面的5條外,還有部分成功register程序會收到資料
    3.將timer調長,由於我們的程序並沒有太多處理工作,所以reload時is shutting down的程序很快就會退出,register也很快就會成功,所以一般會按照預想的列印4條

這個在資料同步等設計時可能需要考慮一下,否則可能會出現意想不到資料不同步的問題。
整體在外掛使用時比較簡單,只是在融合其他外掛進行程式設計時需要多考慮一些。
後續會集合etcd、lmdb、cache、events 進行整體融合實現註冊中心的功能。