EMQX之WebHook及其案例實現
1 WebHook
WebHook 是由 emqx_web_hook (opens new window)外掛提供的 將 EMQX 中的鉤子事件通知到某個 Web 服務 的功能。
WebHook 的內部實現是基於 鉤子,但它更靠近頂層一些。
它通過在鉤子上的掛載回撥函式,獲取到 EMQX 中的各種事件,並轉發至 emqx_web_hook 中配置的 Web 伺服器。
以 客戶端成功接入(client.connected) 事件為例,其事件的傳遞流程如下:
Client | EMQX | emqx_web_hook | HTTP +------------+ =============>| - - - - - - -> - - - - - - - ->===========> | Web Server | | Broker | | Request +------------+
提示:WebHook 對於事件的處理是單向的,它僅支援將 EMQX 中的事件推送給 Web 服務,並不關心 Web 服務的返回。
藉助 Webhook 可以完成裝置線上、上下線記錄,訂閱與訊息儲存、訊息送達確認等諸多業務。
2 配置項
Webhook 的配置檔案位於 etc/plugins/emqx_web_hook.conf
,配置項的詳細說明可以檢視 配置項。
3 觸發規則
在 etc/plugins/emqx_web_hook.conf
可配置觸發規則,其配置的格式如下:
## 格式示例 web.hook.rule.<Event>.<Number> = <Rule> ## 示例值 web.hook.rule.message.publish.1 = {"action": "on_message_publish", "topic": "a/b/c"} web.hook.rule.message.publish.2 = {"action": "on_message_publish", "topic": "foo/#"}
4 Event 觸發事件
目前支援以下事件:
名稱 | 說明 | 執行時機 |
---|---|---|
client.connect | 處理連線報文 | 服務端收到客戶端的連線報文時 |
client.connack | 下發連線應答 | 服務端準備下發連線應答報文時 |
client.connected | 成功接入 | 客戶端認證完成併成功接入系統後 |
client.disconnected | 連線斷開 | 客戶端連線層在準備關閉時 |
client.subscribe | 訂閱主題 | 收到訂閱報文後,執行 client.check_acl 鑑權前 |
client.unsubscribe | 取消訂閱 | 收到取消訂閱報文後 |
session.subscribed | 會話訂閱主題 | 完成訂閱操作後 |
session.unsubscribed | 會話取消訂閱 | 完成取消訂閱操作後 |
message.publish | 訊息釋出 | 服務端在釋出(路由)訊息前 |
message.delivered | 訊息投遞 | 訊息準備投遞到客戶端前 |
message.acked | 訊息回執 | 服務端在收到客戶端發回的訊息 ACK 後 |
message.dropped | 訊息丟棄 | 釋出出的訊息被丟棄後 |
5 Number
同一個事件可以配置多個觸發規則,配置相同的事件應當依次遞增。
6 Rule
觸發規則,其值為一個 JSON 字串,其中可用的 Key 有:
- action:字串,取固定值
- topic:字串,表示一個主題過濾器,操作的主題只有與該主題匹配才能觸發事件的轉發
例如,我們只將與 a/b/c
和 foo/#
主題匹配的訊息轉發到 Web 伺服器上,其配置應該為:
web.hook.rule.message.publish.1 = {"action": "on_message_publish", "topic": "a/b/c"}
web.hook.rule.message.publish.2 = {"action": "on_message_publish", "topic": "foo/#"}
這樣 Webhook 僅會轉發與 a/b/c
和 foo/#
主題匹配的訊息,例如 foo/bar
等,而不是轉發 a/b/d
或 fo/bar
。
7 Webhook 事件引數
事件觸發時 Webhook 會按照配置將每個事件組成一個 HTTP 請求傳送到 url
所配置的 Web 伺服器上。其請求格式為:
URL: <url> # 來自於配置中的 `url` 欄位
Method: POST # 固定為 POST 方法
Body: <JSON> # Body 為 JSON 格式字串
對於不同的事件,請求 Body 體內容有所不同,下表列舉了各個事件中 Body 的引數列表:
7.1 client.connect
Key | 型別 | 說明 |
---|---|---|
action | string | 事件名稱 固定為:"client_connect" |
clientid | string | 客戶端 ClientId |
username | string | 客戶端 Username,不存在時該值為 "undefined" |
ipaddress | string | 客戶端源 IP 地址 |
keepalive | integer | 客戶端申請的心跳保活時間 |
proto_ver | integer | 協議版本號 |
7.2 client.connack
Key | 型別 | 說明 |
---|---|---|
action | string | 事件名稱 固定為:"client_connack" |
clientid | string | 客戶端 ClientId |
username | string | 客戶端 Username,不存在時該值為 "undefined" |
ipaddress | string | 客戶端源 IP 地址 |
keepalive | integer | 客戶端申請的心跳保活時間 |
proto_ver | integer | 協議版本號 |
conn_ack | string | "success" 表示成功,其它表示失敗的原因 |
7.3 client.connected
Key | 型別 | 說明 |
---|---|---|
action | string | 事件名稱 固定為:"client_connected" |
clientid | string | 客戶端 ClientId |
username | string | 客戶端 Username,不存在時該值為 "undefined" |
ipaddress | string | 客戶端源 IP 地址 |
keepalive | integer | 客戶端申請的心跳保活時間 |
proto_ver | integer | 協議版本號 |
connected_at | integer | 時間戳(秒) |
7.4 client.disconnected
Key | 型別 | 說明 |
---|---|---|
action | string | 事件名稱 固定為:"client_disconnected" |
clientid | string | 客戶端 ClientId |
username | string | 客戶端 Username,不存在時該值為 "undefined" |
reason | string | 錯誤原因 |
7.5 client.subscribe
Key | 型別 | 說明 |
---|---|---|
action | string | 事件名稱 固定為:"client_subscribe" |
clientid | string | 客戶端 ClientId |
username | string | 客戶端 Username,不存在時該值為 "undefined" |
topic | string | 將訂閱的主題 |
opts | json | 訂閱引數 |
7.6 opts 包含
Key | 型別 | 說明 |
---|---|---|
qos | enum | QoS 等級,可取 0 1 2
|
7.7 client.unsubscribe
Key | 型別 | 說明 |
---|---|---|
action | string | 事件名稱 固定為:"client_unsubscribe" |
clientid | string | 客戶端 ClientId |
username | string | 客戶端 Username,不存在時該值為 "undefined" |
topic | string | 取消訂閱的主題 |
session.subscribed:同 client.subscribe
,action 為 session_subscribed
session.unsubscribed:同 client.unsubscribe
,action 為 session_unsubscribe
session.terminated: 同 client.disconnected
,action 為 session_terminated
7.8 message.publish
Key | 型別 | 說明 |
---|---|---|
action | string | 事件名稱 固定為:"message_publish" |
from_client_id | string | 釋出端 ClientId |
from_username | string | 釋出端 Username,不存在時該值為 "undefined" |
topic | string | 取消訂閱的主題 |
qos | enum | QoS 等級,可取 0 1 2
|
retain | bool | 是否為 Retain 訊息 |
payload | string | 訊息 Payload |
ts | integer | 訊息的時間戳(毫秒) |
7.8 message.delivered
Key | 型別 | 說明 |
---|---|---|
action | string | 事件名稱 固定為:"message_delivered" |
clientid | string | 接收端 ClientId |
username | string | 接收端 Username,不存在時該值為 "undefined" |
from_client_id | string | 釋出端 ClientId |
from_username | string | 釋出端 Username,不存在時該值為 "undefined" |
topic | string | 取消訂閱的主題 |
qos | enum | QoS 等級,可取 0 1 2
|
retain | bool | 是否為 Retain 訊息 |
payload | string | 訊息 Payload |
ts | integer | 訊息時間戳(毫秒) |
7.9 message.acked
Key | 型別 | 說明 |
---|---|---|
action | string | 事件名稱 固定為:"message_acked" |
clientid | string | 接收端 ClientId |
from_client_id | string | 釋出端 ClientId |
from_username | string | 釋出端 Username,不存在時該值為 "undefined" |
topic | string | 取消訂閱的主題 |
qos | enum | QoS 等級,可取 0 1 2
|
retain | bool | 是否為 Retain 訊息 |
payload | string | 訊息 Payload |
ts | integer | 訊息時間戳(毫秒) |
8 案例
8.1 裝置上線和下線時,能夠在第三方系統中查詢
- 修改 etc/plugins/emqx_web_hook.conf 檔案,設定事件轉發的url和地址和觸發規則
# 事件需要轉發的目的伺服器地址
web.hook.api.url = http://127.0.0.1:8991/mqtt/webhook
# 觸發規則
web.hook.rule.client.connected.1 = {"action": "on_client_connected"}
web.hook.rule.client.disconnected.1 = {"action": "on_client_disconnected"}
# 以下可以不開啟,測試用用
web.hook.rule.client.subscribe.1 = {"action": "on_client_subscribe"}
web.hook.rule.message.publish.2 = {"action": "on_message_publish","topic":"img/#"}
- 在EMQ的控制檯開啟emqx_web_hook 外掛
- 編寫springboot應用
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.HashMap;
import java.util.Map;
@RequestMapping("/mqtt")
@RestController
public class Controller_5 {
private final static Logger logger = LoggerFactory.getLogger(Controller_5.class);
private Map<String,Boolean> ztList=new HashMap<>();
@PostMapping("/webhook")
public void webhook(@RequestBody() Map<String,Object> params)
{
logger.info("引數列表 {}",params);
/**
* 注意 action,clientid,事件名 的名稱不能修改,否則匹配不上
*/
String action = (String)params.get("action");
String clientid = (String)params.get("clientid");
if(action.equals("client_connected"))
{
logger.info("client:{} 上線",clientid);
ztList.put(clientid,true);
}
if(action.equals("client_disconnected"))
{
logger.info("client:{} 下線",clientid);
ztList.put(clientid,false);
}
}
@RequestMapping("/ztList")
public Map<String,Boolean> getZtList()
{
return ztList;
}
}
8.2 Webhook實現客戶端斷連監控
8.2.1 斷連監控需求
系統需要知道所有客戶端當前的連線狀態,方便在後臺管理系統中進行直觀展示
8.2.2 程式碼實現
通過EMQX 的webhook將客戶端的連線斷開等事件通知到我們自建的服務上,通過事件型別獲取客戶端的連線狀態,然後將客戶端的連線狀態進行儲存,並且提供HTTP API供後臺系統查詢所有客戶端的狀態。
package com.itheima.controller.mqtt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.bind.annotation.*;
import java.util.HashMap;
import java.util.Map;
/**
*
*/
@RestController
@RequestMapping("/mqtt")
public class WebHookController {
private static final Logger log = LoggerFactory.getLogger(WebHookController.class);
private Map<String,Boolean> clientStatus = new HashMap<>();
@PostMapping("/webhook")
public void hook(@RequestBody Map<String,Object> params){
log.info("emqx 觸發 webhook,請求體資料={}",params);
String action = (String) params.get("action");
String clientId = (String) params.get("clientid");
if(action.equals("client_connected")){
log.info("客戶端{}接入本系統",clientId);
clientStatus.put(clientId,true);
}
if(action.equals("client_disconnected")){
log.info("客戶端{}下線",clientId);
clientStatus.put(clientId,false);
}
}
@GetMapping("/allStatus")
public Map getStatus(){
return this.clientStatus;
}
}
hook方法用來接收EMQ X傳入過來的請求,將客戶端Id的連線狀態記錄到map中,getAllStatus方法用來返回所有客戶端狀態。
然後通過客戶端連線/斷開EMQ X之後,通過訪問 all 介面就能得到這些客戶端得狀態了。
當然了,在實際的專案中肯定就不會這麼簡單,我們會將這些客戶端的狀態存入類似redis這樣的分散式快取中,方便整個系統進行存取隨時獲取客戶端狀態。