1. 程式人生 > >Lua Web快速開發指南(10)

Lua Web快速開發指南(10)

Lua Web快速開發指南(10) - 利用MQ實現非同步任務、訂閱/釋出、訊息佇列

本章節我們將學習如何使用MQ庫.

MQ庫簡介

MQ庫實現了各類訊息代理中介軟體(Message Broker)的連線協議, 目前支援:redismqttstomp協議.

MQ庫基於上述協議實現了: 生產者 -> 消費者訂閱 -> 釋出模型, 可以在不依賴其它服務的情況下獨立完成任務.

API介紹

cf框架提供了多種MQ的封裝, 當我們需要使用的時候需要根據實際的協議進行選擇:

-- local MQ = require "MQ.mqtt"
-- local MQ = require "MQ.redis"
-- local MQ = require "MQ.stomp"

MQ:new(opt)

此方法將會建立一個的MQ物件例項.

opt是一個table型別的引數, 可以傳遞如下值:

  • host - 字串型別, 訊息佇列的域名或者IP地址.

  • port - int型別, 訊息佇列監聽的埠.

  • auth/db - 字串型別, 僅在redis協議下用作登入認證或者db選擇(沒有可以不填寫).

  • username/password - 字串型別, 僅在stomp/mqtt協議下用作登入認證(沒有可以不填寫).

  • vhost - 字串型別, 僅在使用某些特定訊息佇列server的時候填寫(例如:rabbit).

  • keepalive - int型別, 僅在使用mqtt的時候用來出發客戶端主動發出心跳包的時間.

以redis broker為示例:

local MQ = require "MQ.redis"
local mq = MQ:new {
  host = "localhost",
  port = 6379,
  -- db = 0,
  -- auth = "123456789",
}

MQ:on(pattern, function)

此方法用來訂閱一個指定pattern. 當broker將訊息傳遞到cf後, function將會被呼叫.

MQ庫會為function注入一個table型別的引數msg, 此引數將在斷開連線的時候為nil

.

msg根據採用的協議的不同msg的內容也將有所不同. 具體內容以logging庫的列印為準.

標準使用示例:

local Log = require("logging"):new()
mq:on("/notice", function(msg)
  if not msg then
    return Log:ERROR("['/notice'] SUBSCRIBE ERROR: 連線已斷開.")
  end
  Log:DEBUG(msg)
end)

開發者可以同時訂閱多個parttern.

MQ:emit(pattern, msg)

此方法用來向指定pattern傳送訊息. msg為字串型別的訊息.

使用示例:

mq:emit('/notice', '{"code":200,"data":[1,2,3,4,5,6,7,8,9,10]}')

單個MQ可以一直複用emit, 內部會建立一個寫入佇列去完成訊息的順序傳送. (在多個例項中無法保證訊息先後)

MQ:start()

此方法在作為獨立執行服務端時候呼叫.

使用示例:

mq:start()

MQ:clsoe()

此方法可以關閉不再使用的MQ; 在任何情況下MQ使用完畢後都需要呼叫此方法來釋放資源.

使用示例:

mq:close()

開始實踐

為了演示更加直觀, 這裡僅使用redis作為broker中專訊息.

1. 模擬生產者與消費者

我們模擬100個生產者向redis的/queue投遞訊息, 同時定義了一個消費者訂閱/queue持續進行消費

程式碼如下:

local cf = require "cf"
local json = require "json"
local Log = require("logging"):new()
local MQ = require "MQ.redis"

cf.fork(function ()
  local consumer = MQ:new {
    host = "localhost",
    port = 6379
  }

  local count = 0
  consumer:on("/queue", function (msg)
    if not msg then
      Log:ERROR("[/queue]連線失敗", "已經消費了"..count.."個訊息")
      return
    end
    count = count + 1
    Log:DEBUG("開始消費:", msg, "已經消費了"..count.."個訊息")
  end)

  consumer:start() -- Websoket內部無需使用這個方法
end)

for i = 1, 100 do
  cf.fork(function()

    local producer = MQ:new {
      host = "localhost",
      port = 6379
    }

    producer:emit("/queue", json.encode({
      code = 200,
      data = {
        id = math.random(1, 1 << 32)
      },
    }))

    producer:close()
  end)
end

輸出如下:

[candy@MacBookPro:~/Documents/core_framework] $ ./cfadmin
[2019-06-25 16:05:36,240] [@script/main.lua:19] [DEBUG] : 開始消費:, {["pattern"]="/queue", ["payload"]="{"code":200,"data":{"id":3912595079}}", ["source"]="/queue", ["type"]="pmessage"}, 已經消費了1個訊息
[2019-06-25 16:05:36,240] [@script/main.lua:19] [DEBUG] : 開始消費:, {["pattern"]="/queue", ["payload"]="{"code":200,"data":{"id":2938696189}}", ["source"]="/queue", ["type"]="pmessage"}, 已經消費了2個訊息
[2019-06-25 16:05:36,240] [@script/main.lua:19] [DEBUG] : 開始消費:, {["pattern"]="/queue", ["payload"]="{"code":200,"data":{"id":3499397173}}", ["source"]="/queue", ["type"]="pmessage"}, 已經消費了3個訊息
[2019-06-25 16:05:36,240] [@script/main.lua:19] [DEBUG] : 開始消費:, {["pattern"]="/queue", ["payload"]="{"code":200,"data":{"id":1711272453}}", ["source"]="/queue", ["type"]="pmessage"}, 已經消費了4個訊息
[2019-06-25 16:05:36,240] [@script/main.lua:19] [DEBUG] : 開始消費:, {["pattern"]="/queue", ["payload"]="{"code":200,"data":{"id":3968420025}}", ["source"]="/queue", ["type"]="pmessage"}, 已經消費了5個訊息
[2019-06-25 16:05:36,240] [@script/main.lua:19] [DEBUG] : 開始消費:, {["pattern"]="/queue", ["payload"]="{"code":200,"data":{"id":1887895479}}", ["source"]="/queue", ["type"]="pmessage"}, 已經消費了6個訊息
[2019-06-25 16:05:36,240] [@script/main.lua:19] [DEBUG] : 開始消費:, {["pattern"]="/queue", ["payload"]="{"code":200,"data":{"id":3687986737}}", ["source"]="/queue", ["type"]="pmessage"}, 已經消費了7個訊息
[2019-06-25 16:05:36,240] [@script/main.lua:19] [DEBUG] : 開始消費:, {["pattern"]="/queue", ["payload"]="{"code":200,"data":{"id":2823099353}}", ["source"]="/queue", ["type"]="pmessage"}, 已經消費了8個訊息
[2019-06-25 16:05:36,240] [@script/main.lua:19] [DEBUG] : 開始消費:, {["pattern"]="/queue", ["payload"]="{"code":200,"data":{"id":2528190121}}", ["source"]="/queue", ["type"]="pmessage"}, 已經消費了9個訊息
[2019-06-25 16:05:36,240] [@script/main.lua:19] [DEBUG] : 開始消費:, {["pattern"]="/queue", ["payload"]="{"code":200,"data":{"id":4107999865}}", ["source"]="/queue", ["type"]="pmessage"}, 已經消費了10個訊息
.
..
...
....
.....
[2019-06-25 16:05:36,247] [@script/main.lua:19] [DEBUG] : 開始消費:, {["pattern"]="/queue", ["payload"]="{"code":200,"data":{"id":3608578767}}", ["source"]="/queue", ["type"]="pmessage"}, 已經消費了100個訊息

為了方便閱讀. 我們這裡取出前10條與最後第100條並且將msg的資料結構打印出來方便閱讀.

消費者的處理方式採用同步非阻塞處理的(當前業務未處理完成是不會繼續處理下個訊息的), 如果不想阻塞當前訊息佇列事件迴圈可以考慮自行fork一個協程來處理.

2. 推送訊息給某個使用者

使用者通過認證後接入到Server後訂閱自己專屬的頻道, 當有使用者專屬訊息的時候任何服務都可以利用此方法進行業務訊息推送.

我們

程式碼實現如下:

local cf = require "cf"
local json = require "json"
local Log = require("logging"):new()
local MQ = require "MQ.redis"

for uid = 1, 10 do
  cf.fork(function ()
    local client = MQ:new {
      host = "localhost",
      port = 6379
    }

    client:on("/user/"..uid.."/*", function (msg)
      if not msg then
        Log:ERROR("[/user/9257]連線失敗")
        return
      end
      Log:DEBUG("UID:["..uid.."]接收到推送訊息", msg)
    end)

    client:start() -- Websoket內部無需使用這個方法
  end)
end

local server = MQ:new {
  host = "localhost",
  port = 6379
}

cf.at(1, function (...)
  server:emit("/user/"..math.random(1, 10).."/ad", json.encode({
    code = 200,
    data = {}
  }))
end)

server:start()

執行後終端輸出如下所示:

^C[candy@MacBookPro:~/Documents/core_framework] $ ./cfadmin
[2019-06-25 16:20:23,506] [@script/main.lua:18] [DEBUG] : UID:[9]接收到推送訊息, {["source"]="/user/9/ad", ["pattern"]="/user/9/*", ["type"]="pmessage", ["payload"]="{"data":{},"code":200}"}
[2019-06-25 16:20:24,504] [@script/main.lua:18] [DEBUG] : UID:[4]接收到推送訊息, {["source"]="/user/4/ad", ["pattern"]="/user/4/*", ["type"]="pmessage", ["payload"]="{"data":{},"code":200}"}
[2019-06-25 16:20:25,506] [@script/main.lua:18] [DEBUG] : UID:[8]接收到推送訊息, {["source"]="/user/8/ad", ["pattern"]="/user/8/*", ["type"]="pmessage", ["payload"]="{"data":{},"code":200}"}
[2019-06-25 16:20:26,506] [@script/main.lua:18] [DEBUG] : UID:[8]接收到推送訊息, {["source"]="/user/8/ad", ["pattern"]="/user/8/*", ["type"]="pmessage", ["payload"]="{"data":{},"code":200}"}
[2019-06-25 16:20:27,505] [@script/main.lua:18] [DEBUG] : UID:[10]接收到推送訊息, {["source"]="/user/10/ad", ["pattern"]="/user/10/*", ["type"]="pmessage", ["payload"]="{"data":{},"code":200}"}
[2019-06-25 16:20:28,506] [@script/main.lua:18] [DEBUG] : UID:[2]接收到推送訊息, {["source"]="/user/2/ad", ["pattern"]="/user/2/*", ["type"]="pmessage", ["payload"]="{"data":{},"code":200}"}
[2019-06-25 16:20:29,506] [@script/main.lua:18] [DEBUG] : UID:[4]接收到推送訊息, {["source"]="/user/4/ad", ["pattern"]="/user/4/*", ["type"]="pmessage", ["payload"]="{"data":{},"code":200}"}
[2019-06-25 16:20:30,506] [@script/main.lua:18] [DEBUG] : UID:[8]接收到推送訊息, {["source"]="/user/8/ad", ["pattern"]="/user/8/*", ["type"]="pmessage", ["payload"]="{"data":{},"code":200}"}
[2019-06-25 16:20:31,505] [@script/main.lua:18] [DEBUG] : UID:[3]接收到推送訊息, {["source"]="/user/3/ad", ["pattern"]="/user/3/*", ["type"]="pmessage", ["payload"]="{"data":{},"code":200}"}
[2019-06-25 16:20:32,506] [@script/main.lua:18] [DEBUG] : UID:[6]接收到推送訊息, {["source"]="/user/6/ad", ["pattern"]="/user/6/*", ["type"]="pmessage", ["payload"]="{"data":{},"code":200}"}
[2019-06-25 16:20:33,506] [@script/main.lua:18] [DEBUG] : UID:[5]接收到推送訊息, {["source"]="/user/5/ad", ["pattern"]="/user/5/*", ["type"]="pmessage", ["payload"]="{"data":{},"code":200}"}
[2019-06-25 16:20:34,503] [@script/main.lua:18] [DEBUG] : UID:[7]接收到推送訊息, {["source"]="/user/7/ad", ["pattern"]="/user/7/*", ["type"]="pmessage", ["payload"]="{"data":{},"code":200}"}
[2019-06-25 16:20:35,506] [@script/main.lua:18] [DEBUG] : UID:[4]接收到推送訊息, {["source"]="/user/4/ad", ["pattern"]="/user/4/*", ["type"]="pmessage", ["payload"]="{"data":{},"code":200}"}
[2019-06-25 16:20:36,506] [@script/main.lua:18] [DEBUG] : UID:[6]接收到推送訊息, {["source"]="/user/6/ad", ["pattern"]="/user/6/*", ["type"]="pmessage", ["payload"]="{"data":{},"code":200}"}
[2019-06-25 16:20:37,505] [@script/main.lua:18] [DEBUG] : UID:[10]接收到推送訊息, {["source"]="/user/10/ad", ["pattern"]="/user/10/*", ["type"]="pmessage", ["payload"]="{"data":{},"code":200}"}
^C[candy@MacBookPro:~/Documents/core_framework] $

這裡我們可以看到, 由訊息釋出到/user/9527/*下的topic的時候, 我們可以通過一次萬用字元訂閱就可以接收到所有下屬路由訊息.

3. 訊息廣播

在各種領域內, 訊息推送已經成為了一種最常見的業務. 我們現在來嘗試利用MQ實現訊息推送業務.

首先, 我們將script/main.lua的檔案寫入如下程式碼:

-- main.lua
local cf = require "cf"
local json = require "json"
local Log = require("logging"):new()
local MQ = require "MQ.redis"

for i = 1, 3 do
  cf.fork(function ()
    local uid = math.random(1, 1 << 32)
    local client_mq = MQ:new {
      host = "localhost", -- 主機名
      port = 6379,        -- 埠號
      -- db = nil,        -- 預設資料庫
      -- auth = nil,      -- 密碼
    }
    client_mq:on("/system/notice", function (msg)
      if not msg then
        Log:ERROR("['/system/notice'] SUBSCRIBE ERROR: 連線已斷開.")
        return
      end
      Log:DEBUG("UID:["..uid.."]接收到訊息: ", msg)
    end)

    client_mq:start()
  end)
end

local server_mq = MQ:new {
  host = "localhost", -- 主機名
  port = 6379,        -- 埠號
  -- db = nil,        -- 預設資料庫
  -- auth = nil,      -- 密碼
}

cf.at(3, function (args)
  server_mq:emit("/system/notice", json.encode({
    code = 200,
    msg = "系統即將關閉"
  }))
end)

server_mq:start()

這裡我們用啟動了3個協程來模擬使用者訂閱訊息, 並且每個協程都使用不同的UID來列印. 然後再啟動一個定時器模擬每三秒的訊息推送業務.

開啟終端執行./cfadmin後, 輸出如下:

[candy@MacBookPro:~/Documents/core_framework] $ ./cfadmin
[2019-06-25 15:43:24,842] [@script/main.lua:19] [DEBUG] : UID:[3363385555]接收到訊息: , {["pattern"]="/system/notice", ["payload"]="{"msg":"系統即將關閉","code":200}", ["type"]="pmessage", ["source"]="/system/notice"}
[2019-06-25 15:43:24,842] [@script/main.lua:19] [DEBUG] : UID:[1693861773]接收到訊息: , {["pattern"]="/system/notice", ["payload"]="{"msg":"系統即將關閉","code":200}", ["type"]="pmessage", ["source"]="/system/notice"}
[2019-06-25 15:43:24,842] [@script/main.lua:19] [DEBUG] : UID:[3608578767]接收到訊息: , {["pattern"]="/system/notice", ["payload"]="{"msg":"系統即將關閉","code":200}", ["type"]="pmessage", ["source"]="/system/notice"}

[2019-06-25 15:43:27,841] [@script/main.lua:19] [DEBUG] : UID:[3363385555]接收到訊息: , {["pattern"]="/system/notice", ["payload"]="{"msg":"系統即將關閉","code":200}", ["type"]="pmessage", ["source"]="/system/notice"}
[2019-06-25 15:43:27,841] [@script/main.lua:19] [DEBUG] : UID:[1693861773]接收到訊息: , {["pattern"]="/system/notice", ["payload"]="{"msg":"系統即將關閉","code":200}", ["type"]="pmessage", ["source"]="/system/notice"}
[2019-06-25 15:43:27,841] [@script/main.lua:19] [DEBUG] : UID:[3608578767]接收到訊息: , {["pattern"]="/system/notice", ["payload"]="{"msg":"系統即將關閉","code":200}", ["type"]="pmessage", ["source"]="/system/notice"}

[2019-06-25 15:43:30,841] [@script/main.lua:19] [DEBUG] : UID:[3363385555]接收到訊息: , {["pattern"]="/system/notice", ["payload"]="{"msg":"系統即將關閉","code":200}", ["type"]="pmessage", ["source"]="/system/notice"}
[2019-06-25 15:43:30,841] [@script/main.lua:19] [DEBUG] : UID:[1693861773]接收到訊息: , {["pattern"]="/system/notice", ["payload"]="{"msg":"系統即將關閉","code":200}", ["type"]="pmessage", ["source"]="/system/notice"}
[2019-06-25 15:43:30,841] [@script/main.lua:19] [DEBUG] : UID:[3608578767]接收到訊息: , {["pattern"]="/system/notice", ["payload"]="{"msg":"系統即將關閉","code":200}", ["type"]="pmessage", ["source"]="/system/notice"}
[candy@MacBookPro:~/Documents/core_framework] $

從終端的輸出內容中可以看到, 我們確實每隔3秒就收到了一次訊息推送.

4. 對基於Websocket協議的客戶端實現業務推送

首先, 我們需要建立一套基於httpd庫的Websocket路由. 讓我們開啟script/main.lua檔案並將下面的程式碼寫入進去.

local httpd require "httpd"

local app = httpd:new("Web")

app:ws('/ws', require "ws")

app:listen("0.0.0.0", 8080)

app:run()

Websocket必須在建立與客戶端的連線完成的同時利用MQ庫訂閱/chat. 每當客戶端傳送訊息過來觸發on_message的時候, 都將會訊息直接釋出到/chat內部通過中轉後實現推送聊天.

然後我們利用前面章節所學的Websocket指南, 編寫一段簡單的Websocket路由處理程式碼. 由於示例程式碼沒有UID生成機制. 為了方便除錯, 我們隨機生成32位整數作為唯一ID識別符號.

script/ws.lua具體程式碼如下所示:

local MQ = require "MQ.redis"
local class = require "class"

local websocket = class("websocket")

function websocket:ctor (opt)
  self.ws = opt.ws
  self.id = math.random(1, 1 << 32)
end

function websocket:on_open ()
  self.mq = MQ:new { host = 'localhost', port = 6379 }
  self.mq:on("/chat", function (msg)
    if not msg then
      return
    end
    self.ws:send(msg.payload)
  end)
end

function websocket:on_message (data, typ)
  if self.mq then
    self.mq:emit("/chat", data)
  end
  print("客戶端["..self.id.."]傳送了訊息:["..data.."]")
end

function websocket:on_error (error)

end

function websocket:on_close ()
  if self.mq then
    self.mq:close()
    self.mq = nil
  end
end

return websocket

注意: 我們需要記住當客戶端連線斷開的時候記得關閉訂閱回收資源. 啟動./cfadmin, 檢視是否正常執行.

讓我們下載客戶端工具, 並且安裝到我們的Chrome瀏覽器上. 提取碼:cgwr

現在, 我們執行客戶端工具在位址列輸入localhost:8080/ws連線我們剛剛啟動的Websocket Server, 然後開始向伺服器傳送訊息.

如果從終端中和客戶端看到類似的輸出內容, 說明我們的示例編寫完成.

[candy@MacBookPro:~/Documents/core_framework] $ ./cfadmin
[2019/06/25 20:11:59] [INFO] httpd正在監聽: 0.0.0.0:8080
[2019/06/25 20:11:59] [INFO] httpd正在執行Web Server服務...
[2019/06/25 20:12:01] - ::1 - ::1 - /ws - GET - 101 - req_time: 0.000095/Sec
[2019/06/25 20:12:17] - ::1 - ::1 - /ws - GET - 101 - req_time: 0.000080/Sec
客戶端[1693861773]傳送了訊息:[hello! 我是2]
客戶端[1693861773]傳送了訊息:[hello! 我是2]
客戶端[1693861773]傳送了訊息:[hello! 我是2]
客戶端[1693861773]傳送了訊息:[hello! 我是2]
客戶端[1693861773]傳送了訊息:[hello! 我是2]
客戶端[1693861773]傳送了訊息:[hello! 我是2]
客戶端[1693861773]傳送了訊息:[hello! 我是2]
[2019/06/25 20:12:23] - ::1 - ::1 - /ws - GET - 101 - req_time: 0.000052/Sec
客戶端[3363385555]傳送了訊息:[hello! 我是1]
客戶端[3363385555]傳送了訊息:[hello! 我是1]
客戶端[3363385555]傳送了訊息:[hello! 我是1]
客戶端[3363385555]傳送了訊息:[hello! 我是1]
客戶端[3363385555]傳送了訊息:[hello! 我是1]
客戶端[3363385555]傳送了訊息:[hello! 我是1]
客戶端[3363385555]傳送了訊息:[hello! 我是1]
客戶端[1693861773]傳送了訊息:[hello! 我是2]

最後

上述程式碼僅用redis協議進行模擬, 其它協議請參考Wiki.

學習完成

至此Lua Web開發指南已經編寫完畢. cf框架都內建庫非常的多, 維護框架都同時還要編寫使用教程. 作者不可能一個一個介紹完全.

軟體開發領域內不僅僅需要師傅領進門, 個人修行也是一種能力的體現. cf框架已經有了專屬的QQ討論社群: 727531854, 點選加群.

目前內部就作者一個人在裡面. 如果您也對它比較感興趣, 歡迎您到群裡