1. 程式人生 > 其它 >EMQ X:WebHook

EMQ X:WebHook

WebHook

WebHook 是由 emqx_web_hook外掛提供的 將 EMQ X 中的鉤子事件通知到某個 Web 服務 的功能。

WebHook 的內部實現是基於鉤子,但它更靠近頂層一些。它通過在鉤子上的掛載回撥函式,獲取到 EMQ X 中的各種事件,並轉發至 emqx_web_hook 中配置的 Web 伺服器。

以 客戶端成功接入(client.connected) 事件為例,其事件的傳遞流程如下:

    Client      |    EMQ X     |  emqx_web_hook |   HTTP       +------------+
  =============>| - - - - - - -> - - - - - - - ->===========>  | Web Server |
                |    Broker    |                |  Request     +------------+

WebHook 對於事件的處理是單向的,它僅支援將 EMQ X 中的事件推送給 Web 服務,並不關心 Web 服務的返回。 藉助 Webhook 可以完成裝置線上、上下線記錄,訂閱與訊息儲存、訊息送達確認等諸多業務。

簡單來講,該機制目的在於增強軟體系統的擴充套件性、方便與其他三方系統的整合、或者改變其系統原有 的預設行為。如下圖:

當系統中不存在 鉤子 (Hooks) 機制時,整個事件處理流程 從 事件 (Event) 的輸入,到 處理 (Handler), 再到完成後的返回結果 (Result) 對於系統外部而講,都是不可見、且無法修改的。

而在這個過程中加入一個可掛載函式的點 (HookPoint),允許外部外掛掛載多個回撥函式,形成一個呼叫鏈。達到對內部事件處理過程的擴充套件和修改。系統中常用到的認證外掛則是按照該邏輯進行實現的。

因此,在 EMQ X 中,鉤子 (Hooks) 這種機制極大地方便了系統的擴充套件。我們不需要修改 emqx 核心代 碼,僅需要在特定的位置埋下掛載點 (HookPoint) ,便能允許外部外掛擴充套件 EMQ X 的各種行為。

對於實現者來說僅需要關注:

  1. 掛載點 (HookPoint) 的位置:包括其作用、執行的時機、和如何掛載和取消掛載。
  2. 回撥函式 的實現:包括回撥函式的入參個數、作用、資料結構等,及返回值代表的含義。
  3. 瞭解回撥函式在 鏈 上執行的機制:包括回撥函式執行的順序,及如何提前終止鏈的執行。

配置項說明

配置檔案:/etc/emqx/plugins/emqx_web_hook.conf

web.hook.url:Webhook 請求轉發的目的 Web 伺服器地址。

web.hook.encoding_of_payload_field:PUBLISH 報文中 Payload 欄位的編碼格式。

觸發規則

配置的格式如下:

## 格式示例
web.hook.rule.<Event>.<Number> = <Rule>

## 示例值
web.hook.rule.message.publish.1 = {"action": "on_message_publish", "topic": "a/b/c"}
web.hook.rule.message.publish.2 = {"action": "on_message_publish", "topic": "foo/#"}

event:目前支援以下事件

名稱 說明 執行時機
client.connect 處理連線報文 服務端收到客戶端的連線報文時
client.connack 下發連線應答 服務端準備下發連線應答報文時
client.connected 成功接入 客戶端認證完成併成功接入系統後
client.disconnected 連線斷開 客戶端連線層在準備關閉時
client.subscribe 訂閱主題 收到訂閱報文後,執行 client.check_acl 鑑權前
client.unsubscribe 取消訂閱 收到取消訂閱報文後
session.subscribed 會話訂閱主題 完成訂閱操作後
session.unsubscribed 會話取消訂閱 完成取消訂閱操作後
message.publish 訊息釋出 服務端在釋出(路由)訊息前
message.delivered 訊息投遞 訊息準備投遞到客戶端前
message.acked 訊息回執 服務端在收到客戶端發回的訊息 ACK 後
message.dropped 訊息丟棄 釋出出的訊息被丟棄後

number:同一個事件可以配置多個觸發規則,配置相同的事件應當依次遞增。

rule:觸發規則

其值為一個 JSON 字串,其中可用的 Key 有:

  • action:字串,取固定值
  • topic:字串,表示一個主題過濾器,操作的主題只有與該主題匹配才能觸發事件的轉發

例如,我們只將與 a/b/cfoo/# 主題匹配的訊息轉發到 Web 伺服器上,其配置應該為:

web.hook.rule.message.publish.1 = {"action": "on_message_publish", "topic": "a/b/c"}
web.hook.rule.message.publish.2 = {"action": "on_message_publish", "topic": "foo/#"}

這樣 Webhook 僅會轉發與 a/b/cfoo/# 主題匹配的訊息,例如 foo/bar 等,而不是轉發 a/b/dfo/bar

WebHook事件引數

事件觸發時 Webhook 會按照配置將每個事件組成一個 HTTP 請求傳送到 url 所配置的 Web 伺服器上。其請求格式為:

URL: <url>      # 來自於配置中的 `url` 欄位
Method: POST        # 固定為 POST 方法
Body: <JSON>        # Body 為 JSON 格式字串

對於不同的事件,請求 Body 體內容有所不同,下表列舉了各個事件中 Body 的引數列表:

client.connect

Key 型別 說明
action string 事件名稱 固定為:"client_connect"
clientid string 客戶端 ClientId
username string 客戶端 Username,不存在時該值為 "undefined"
ipaddress string 客戶端源 IP 地址
keepalive integer 客戶端申請的心跳保活時間
proto_ver integer 協議版本號

client.connack

Key 型別 說明
action string 事件名稱 固定為:"client_connack"
clientid string 客戶端 ClientId
username string 客戶端 Username,不存在時該值為 "undefined"
ipaddress string 客戶端源 IP 地址
keepalive integer 客戶端申請的心跳保活時間
proto_ver integer 協議版本號
conn_ack string "success" 表示成功,其它表示失敗的原因

client.connected

Key 型別 說明
action string 事件名稱 固定為:"client_connected"
clientid string 客戶端 ClientId
username string 客戶端 Username,不存在時該值為 "undefined"
ipaddress string 客戶端源 IP 地址
keepalive integer 客戶端申請的心跳保活時間
proto_ver integer 協議版本號
connected_at integer 時間戳(秒)

client.disconnected

Key 型別 說明
action string 事件名稱 固定為:"client_disconnected"
clientid string 客戶端 ClientId
username string 客戶端 Username,不存在時該值為 "undefined"
reason string 錯誤原因

client.subscribe

Key 型別 說明
action string 事件名稱 固定為:"client_subscribe"
clientid string 客戶端 ClientId
username string 客戶端 Username,不存在時該值為 "undefined"
topic string 將訂閱的主題
opts json 訂閱引數

opts 包含

Key 型別 說明
qos enum QoS 等級,可取 0 1 2

client.unsubscribe

Key 型別 說明
action string 事件名稱 固定為:"client_unsubscribe"
clientid string 客戶端 ClientId
username string 客戶端 Username,不存在時該值為 "undefined"
topic string 取消訂閱的主題

session.subscribed:同 client.subscribe,action 為 session_subscribed

session.unsubscribed:同 client.unsubscribe,action 為 session_unsubscribe

session.terminated: 同 client.disconnected,action 為 session_terminated

message.publish

Key 型別 說明
action string 事件名稱 固定為:"message_publish"
from_client_id string 釋出端 ClientId
from_username string 釋出端 Username,不存在時該值為 "undefined"
topic string 取消訂閱的主題
qos enum QoS 等級,可取 0 1 2
retain bool 是否為 Retain 訊息
payload string 訊息 Payload
ts integer 訊息的時間戳(毫秒)

message.delivered

Key 型別 說明
action string 事件名稱 固定為:"message_delivered"
clientid string 接收端 ClientId
username string 接收端 Username,不存在時該值為 "undefined"
from_client_id string 釋出端 ClientId
from_username string 釋出端 Username,不存在時該值為 "undefined"
topic string 取消訂閱的主題
qos enum QoS 等級,可取 0 1 2
retain bool 是否為 Retain 訊息
payload string 訊息 Payload
ts integer 訊息時間戳(毫秒)

message.acked

Key 型別 說明
action string 事件名稱 固定為:"message_acked"
clientid string 接收端 ClientId
from_client_id string 釋出端 ClientId
from_username string 釋出端 Username,不存在時該值為 "undefined"
topic string 取消訂閱的主題
qos enum QoS 等級,可取 0 1 2
retain bool 是否為 Retain 訊息
payload string 訊息 Payload
ts integer 訊息時間戳(毫秒)

WebHook案例編寫

修改配置檔案:

web.hook.url = http://127.0.0.1:8991/mqtt/webhook

web.hook.rule.client.connect.1       = {"action": "on_client_connect"}
web.hook.rule.client.connack.1       = {"action": "on_client_connack"}
web.hook.rule.client.connected.1     = {"action": "on_client_connected"}
web.hook.rule.client.disconnected.1  = {"action": "on_client_disconnected"}
web.hook.rule.client.subscribe.1     = {"action": "on_client_subscribe"}
web.hook.rule.client.unsubscribe.1   = {"action": "on_client_unsubscribe"}
web.hook.rule.session.subscribed.1   = {"action": "on_session_subscribed"}
web.hook.rule.session.unsubscribed.1 = {"action": "on_session_unsubscribed"}
web.hook.rule.session.terminated.1   = {"action": "on_session_terminated"}
web.hook.rule.message.publish.1      = {"action": "on_message_publish"}
web.hook.rule.message.delivered.1    = {"action": "on_message_delivered"}
web.hook.rule.message.acked.1        = {"action": "on_message_acked"}

啟動webhook外掛:

重啟emqx:

emqx restart

暴露方法:

@RestController
@RequestMapping("/mqtt")
public class WebHookController {

    private static final Logger logger = LoggerFactory.getLogger(WebHookController.class);

    private final Map<String,Boolean> clientStatusMap = new HashMap<>();

    @PostMapping("/webhook")
    public void webhook(@RequestBody Map<String, Object> params){
        logger.info("emqx 觸發 webhook,請求體資料={}",params);
        String action = (String) params.get("action");
        String clientId = (String) params.get("clientid");
        if(action.equals("client_connected")){
            //客戶端成功接入
            clientStatusMap.put(clientId,true);
        }
        if(action.equals("client_disconnected")){
            //客戶端斷開連線
            clientStatusMap.put(clientId,false);
        }
    }

    @GetMapping("/getall")
    public Map<String,Boolean> getAllStatus(){
        return clientStatusMap;
    }
}

打包上傳到伺服器中,並執行。

測試WebHook

使用客戶端工具,連線,訂閱主題和傳送訊息,觀察控制檯輸出。