1. 程式人生 > 其它 >EMQ X:規則引擎

EMQ X:規則引擎

簡介

EMQ X Rule Engine (以下簡稱規則引擎) 用於配置 EMQ X 訊息流與裝置事件的處理、響應規則。規則引擎不僅提供了清晰、靈活的 "配置式" 的業務整合方案,簡化了業務開發流程,提升使用者易用性,降低業務系統與 EMQ X 的耦合度;也為 EMQ X 的私有功能定製提供了一個更優秀的基礎架構。

EMQ X 在 訊息釋出或事件觸發 時將觸發規則引擎,滿足觸發條件的規則將執行各自的 SQL 語句篩選並處理訊息和事件的上下文資訊。

訊息釋出

規則引擎藉助響應動作可將特定主題的訊息處理結果儲存到資料庫,傳送到 HTTP Server,轉發到訊息佇列 Kafka 或 RabbitMQ,重新發布到新的主題甚至是另一個 Broker 叢集中,每個規則可以配置多個響應動作。

選擇釋出到 t/# 主題的訊息,並篩選出全部欄位:

SELECT * FROM "t/#"

選擇釋出到 t/a 主題的訊息,並從 JSON 格式的訊息內容中篩選出 "x" 欄位:

SELECT payload.x as x FROM "t/a"

事件觸發

規則引擎使用 $events/ 開頭的虛擬主題(事件主題)處理 EMQ X 內建事件,內建事件提供更精細的訊息控制和客戶端動作處理能力,可用在 QoS1 QoS2 的訊息抵達記錄、裝置上下線記錄等業務中。

選擇客戶端連線事件,篩選 Username 為 'emqx' 的裝置並獲取連線資訊:

SELECT clientid, connected_at FROM "$events/client_connected" WHERE username = 'emqx'

規則引擎典型應用場景舉例

  • 動作監聽:智慧家庭智慧門鎖開發中,門鎖會因為網路、電源故障、人為破壞等原因離線導致功能異常,使用規則引擎配置監聽離線事件嚮應用服務推送該故障資訊,可以在接入層實現第一時間的故障檢測的能力;
  • 資料篩選:車輛網的卡車車隊管理,車輛感測器採集並上報了大量執行資料,應用平臺僅關注車速大於 40 km/h 時的資料,此場景下可以使用規則引擎對訊息進行條件過濾,向業務訊息佇列寫入滿足條件的資料;
  • 訊息路由:智慧計費應用中,終端裝置通過不同主題區分業務型別,可通過配置規則引擎將計費業務的訊息接入計費訊息佇列並在訊息抵達裝置端後傳送確認通知到業務系統,非計費資訊接入其他訊息佇列,實現業務訊息路由配置;
  • 訊息編解碼:其他公共協議 / 私有 TCP 協議接入、工控行業等應用場景下,可以通過規則引擎的本地處理函式(可在 EMQ X 上定製開發)做二進位制 / 特殊格式訊息體的編解碼工作;亦可通過規則引擎的訊息路由將相關訊息流向外部計算資源如函式計算進行處理(可由使用者自行開發處理邏輯),將訊息轉為業務易於處理的 JSON 格式,簡化專案整合難度、提升應用快速開發交付能力。

規則引擎組成

使用 EMQ X 的規則引擎可以靈活地處理訊息和事件。使用規則引擎可以方便地實現諸如將訊息轉換成指定格式,然後存入資料庫表,或者傳送到訊息佇列等。

與 EMQ X 規則引擎相關的概念包括: 規則(rule)、動作(action)、資源(resource) 和 資源型別(resource-type)。

規則、動作、資源的關係:

規則: {
    SQL 語句,
    動作列表: [
        {
            動作1,
            動作引數,
            繫結資源: {
                資源配置
            }
        },
        {
            動作2,
            動作引數,
            繫結資源: {
                資源配置
            }
         }
    ]
}
  • 規則(Rule): 規則由 SQL 語句和動作列表組成。動作列表包含一個或多個動作及其引數。
  • SQL 語句用於篩選或轉換訊息中的資料。
  • 動作(Action) 是 SQL 語句匹配通過之後,所執行的任務。動作定義了一個針對資料的操作。 動作可以繫結資源,也可以不繫結。例如,“inspect” 動作不需要繫結資源,它只是簡單列印資料內容和動作引數。而 “data_to_webserver” 動作需要繫結一個 web_hook 型別的資源,此資源中配置了 URL。
  • 資源(Resource): 資源是通過資源型別為模板例項化出來的物件,儲存了與資源相關的配置(比如資料庫連線地址和埠、使用者名稱和密碼等) 和系統資源(如檔案控制代碼,連線套接字等)。
  • 資源型別 (Resource Type): 資源型別是資源的靜態定義,描述了此型別資源需要的配置項。

動作和資源型別是由 emqx 或外掛的程式碼提供的,不能通過 API 和 CLI 動態建立。

SQL 語句

SQL 語法

FROM、SELECT 和 WHERE 子句:

規則引擎的 SQL 語句基本格式為:

SELECT <欄位名> FROM <主題> [WHERE <條件>]
  • FROM 子句將規則掛載到某個主題上
  • SELECT 子句用於對資料進行變換,並選擇出感興趣的欄位
  • WHERE 子句用於對 SELECT 選擇出來的某個欄位施加條件過濾

FOREACH、DO 和 INCASE 子句:

如果對於一個數組資料,想針對陣列中的每個元素分別執行一些操作並執行 Actions,需要使用 FOREACH-DO-INCASE 語法。其基本格式為:

FOREACH <欄位名> [DO <條件>] [INCASE <條件>] FROM <主題> [WHERE <條件>]
  • FOREACH 子句用於選擇需要做 foreach 操作的欄位,注意選擇出的欄位必須為陣列型別
  • DO 子句用於對 FOREACH 選擇出來的陣列中的每個元素進行變換,並選擇出感興趣的欄位
  • INCASE 子句用於對 DO 選擇出來的某個欄位施加條件過濾

其中 DO 和 INCASE 子句都是可選的。DO 相當於針對當前迴圈中物件的 SELECT 子句,而 INCASE 相當於針對當前迴圈中物件的 WHERE 語句。

SQL 語句示例:

基本語法舉例

  • 從 topic 為 "t/a" 的訊息中提取所有欄位:
SELECT * FROM "t/a"
  • 從 topic 為 "t/a" 或 "t/b" 的訊息中提取所有欄位:
SELECT * FROM "t/a","t/b"
  • 從 topic 能夠匹配到 't/#' 的訊息中提取所有欄位。
SELECT * FROM "t/#"
  • 從 topic 能夠匹配到 't/#' 的訊息中提取 qos, username 和 clientid 欄位:
SELECT qos, username, clientid FROM "t/#"
  • 從任意 topic 的訊息中提取 username 欄位,並且篩選條件為 username = 'Steven':
SELECT username FROM "#" WHERE username='Steven'
  • 從任意 topic 的 JSON 訊息體(payload) 中提取 x 欄位,並建立別名 x 以便在 WHERE 子句中使用。WHERE 子句限定條件為 x = 1。下面這個 SQL 語句可以匹配到訊息體 {"x": 1}, 但不能匹配到訊息體 {"x": 2}:
SELECT payload as p FROM "#" WHERE p.x = 1
  • 類似於上面的 SQL 語句,但巢狀地提取訊息體中的資料,下面的 SQL 語句可以匹配到 JSON 訊息體 {"x": {"y": 1}}:
SELECT payload as a FROM "#" WHERE a.x.y = 1
  • 在 clientid = 'c1' 嘗試連線時,提取其來源 IP 地址和埠號:
SELECT peername as ip_port FROM "$events/client_connected" WHERE clientid = 'c1'
  • 篩選所有訂閱 't/#' 主題且訂閱級別為 QoS1 的 clientid:
SELECT clientid FROM "$events/session_subscribed" WHERE topic = 't/#' and qos = 1
  • 篩選所有訂閱主題能匹配到 't/#' 且訂閱級別為 QoS1 的 clientid。注意與上例不同的是,這裡用的是主題匹配操作符 '=~',所以會匹配訂閱 't' 或 't/+/a' 的訂閱事件:
SELECT clientid FROM "$events/session_subscribed" WHERE topic =~ 't/#' and qos = 1
  • FROM 子句後面的主題需要用雙引號 "" 引起來。
  • WHERE 子句後面接篩選條件,如果使用到字串需要用單引號 '' 引起來。
  • FROM 子句裡如有多個主題,需要用逗號 "," 分隔。例如 SELECT * FROM "t/1", "t/2" 。
  • 可以使用使用 "." 符號對 payload 進行巢狀選擇。

遍歷語法(FOREACH-DO-INCASE) 舉例

假設有 ClientID 為 c_steve、主題為 t/1 的訊息,訊息體為 JSON 格式,其中 sensors 欄位為包含多個 Object 的陣列:

{
    "date": "2020-04-24",
    "sensors": [
        {"name": "a", "idx":0},
        {"name": "b", "idx":1},
        {"name": "c", "idx":2}
    ]
}

示例1: 要求將 sensors 裡的各個物件,分別作為資料輸入重新發布訊息到 sensors/${idx} 主題,內容為 ${name}。即最終規則引擎將會發出 3 條訊息:

  1. 主題:sensors/0 內容:a
  2. 主題:sensors/1 內容:b
  3. 主題:sensors/2 內容:c

要完成這個規則,我們需要配置如下動作:

  • 動作型別:訊息重新發布 (republish)
  • 目的主題:sensors/${idx}
  • 目的 QoS:0
  • 訊息內容模板:${name}

以及如下 SQL 語句:

FOREACH
    payload.sensors
FROM "t/#"

示例解析:

這個 SQL 中,FOREACH 子句指定需要進行遍歷的陣列 sensors,則選取結果為:

[
  {
    "name": "a",
    "idx": 0
  },
  {
    "name": "b",
    "idx": 1
  },
  {
    "name": "c",
    "idx": 2
  }
]

FOREACH 語句將會對於結果數組裡的每個物件分別執行 "訊息重新發布" 動作,所以將會執行重新發布動作 3 次。

示例2: 要求將 sensors 裡的 idx 值大於或等於 1 的物件,分別作為資料輸入重新發布訊息到 sensors/${idx} 主題,內容為 clientid=${clientid},name=${name},date=${date}。即最終規則引擎將會發出 2 條訊息:

  1. 主題:sensors/1 內容:clientid=c_steve,name=b,date=2020-04-24
  2. 主題:sensors/2 內容:clientid=c_steve,name=c,date=2020-04-24

要完成這個規則,我們需要配置如下動作:

  • 動作型別:訊息重新發布 (republish)
  • 目的主題:sensors/${idx}
  • 目的 QoS:0
  • 訊息內容模板:clientid=${clientid},name=${name},date=${date}

以及如下 SQL 語句:

FOREACH
    payload.sensors
DO
    clientid,
    item.name as name,
    item.idx as idx
INCASE
    item.idx >= 1
FROM "t/#"

示例解析:

這個 SQL 中,FOREACH 子句指定需要進行遍歷的陣列 sensors; DO 子句選取每次操作需要的欄位,這裡我們選了外層的 clientid 欄位,以及當前 sensor 物件的 nameidx 兩個欄位,注意 item 代表 sensors 陣列中本次迴圈的物件。INCASE 子句是針對 DO 語句中欄位的篩選條件,僅僅當 idx >= 1 滿足條件。所以 SQL 的選取結果為:

[
  {
    "name": "b",
    "idx": 1,
    "clientid": "c_emqx"
  },
  {
    "name": "c",
    "idx": 2,
    "clientid": "c_emqx"
  }
]

FOREACH 語句將會對於結果數組裡的每個物件分別執行 "訊息重新發布" 動作,所以將會執行重新發布動作 2 次。

在 DO 和 INCASE 語句裡,可以使用 item 訪問當前迴圈的物件,也可以通過在 FOREACH 使用 as 語法自定義一個變數名。所以本例中的 SQL 語句又可以寫為:

FOREACH
    payload.sensors as s
DO
    clientid,
    s.name as name,
    s.idx as idx
INCASE
    s.idx >= 1
FROM "t/#"

示例3: 在示例2 的基礎上,去掉 clientid 欄位 c_steve 中的 c_ 字首

在 FOREACH 和 DO 語句中可以呼叫各類 SQL 函式,若要將 c_steve 變為 steve,則可以把例2 中的 SQL 改為:

FOREACH
    payload.sensors as s
DO
    nth(2, tokens(clientid,'_')) as clientid,
    s.name as name,
    s.idx as idx
INCASE
    s.idx >= 1
FROM "t/#"

另外,FOREACH 子句中也可以放多個表示式,只要最後一個表示式是指定要遍歷的陣列即可。比如我們將訊息體改一下,sensors 外面多套一層 Object:

{
    "date": "2020-04-24",
    "data": {
        "sensors": [
            {"name": "a", "idx":0},
            {"name": "b", "idx":1},
            {"name": "c", "idx":2}
        ]
    }
}

則 FOREACH 中可以在決定要遍歷的陣列之前把 data 選取出來:

FOREACH
    payload.data as data
    data.sensors as s
...

CASE-WHEN 語法示例

示例1: 將訊息中 x 欄位的值範圍限定在 0~7 之間。

SELECT
  CASE WHEN payload.x < 0 THEN 0
       WHEN payload.x > 7 THEN 7
       ELSE payload.x
  END as x
FROM "t/#"

假設訊息為:

{"x": 8}

則上面的 SQL 輸出為:

{"x": 7}

陣列操作語法舉例

示例1: 建立一個數組,賦值給變數 a:

SELECT
  [1,2,3] as a
FROM
  "t/#"

下標從 1 開始,上面的 SQL 輸出為:

{
  "a": [1, 2, 3]
}

示例2: 從陣列中取出第 N 個元素。下標為負數時,表示從陣列的右邊取:

SELECT
  [1,2,3] as a,
  a[2] as b,
  a[-2] as c
FROM
  "t/#"

上面的 SQL 輸出為:

{
  "b": 2,
  "c": 2,
  "a": [1, 2, 3]
}

示例3: 從 JSON 格式的 payload 中巢狀的獲取值:

SELECT
  payload.data[1].id as id
FROM
  "t/#"

假設訊息為:

{"data": [
    {"id": 1, "name": "steve"},
    {"id": 2, "name": "bill"}
]}

則上面的 SQL 輸出為:

{"id": 1}

示例4: 陣列範圍(range)操作:

SELECT
  [1..5] as a,
  a[2..4] as b
FROM
  "t/#"

上面的 SQL 輸出為:

{
  "b": [2, 3, 4],
  "a": [1, 2, 3, 4, 5]
}

示例5: 使用下標語法修改陣列中的某個元素:

SELECT
  payload,
  'STEVE' as payload.data[1].name
FROM
  "t/#"

假設訊息為:

{"data": [
    {"id": 1, "name": "steve"},
    {"id": 2, "name": "bill"}
]}

則上面的 SQL 輸出為:

{
  "payload": {
    "data": [
      {"name": "STEVE", "id": 1},
      {"name": "bill", "id": 2}
    ]
  }
}

FROM 子句可用的事件主題

事件主題名 釋義
$events/message_delivered 訊息投遞
$events/message_acked 訊息確認
$events/message_dropped 訊息丟棄
$events/client_connected 連線完成
$events/client_disconnected 連線斷開
$events/session_subscribed 訂閱
$events/session_unsubscribed 取消訂閱

SELECT 和 WHERE 子句可用的欄位

SELECT 和 WHERE 子句可用的欄位與事件的型別相關。其中 clientid, usernameevent 是通用欄位,每種事件型別都有

普通主題 (訊息釋出)

event 事件型別,固定為 "message.publish"
id MQTT 訊息 ID
clientid Client ID
username 使用者名稱
payload MQTT 訊息體
peerhost 客戶端的 IPAddress
topic MQTT 主題
qos MQTT 訊息的 QoS
flags MQTT 訊息的 Flags
headers MQTT 訊息內部與流程處理相關的額外資料
timestamp 事件觸發時間 (ms)
publish_received_at PUBLISH 訊息到達 Broker 的時間 (ms)
node 事件觸發所在節點

$events/message_delivered (訊息投遞)

event 事件型別,固定為 "message.delivered"
id MQTT 訊息 ID
from_clientid 訊息來源 Client ID
from_username 訊息來源使用者名稱
clientid 訊息目的 Client ID
username 訊息目的使用者名稱
payload MQTT 訊息體
peerhost 客戶端的 IPAddress
topic MQTT 主題
qos MQTT 訊息的 QoS
flags MQTT 訊息的 Flags
timestamp 事件觸發時間 (ms)
publish_received_at PUBLISH 訊息到達 Broker 的時間 (ms)
node 事件觸發所在節點

$events/message_acked (訊息確認)

event 事件型別,固定為 "message.acked"
id MQTT 訊息 ID
from_clientid 訊息來源 Client ID
from_username 訊息來源使用者名稱
clientid 訊息目的 Client ID
username 訊息目的使用者名稱
payload MQTT 訊息體
peerhost 客戶端的 IPAddress
topic MQTT 主題
qos MQTT 訊息的 QoS
flags MQTT 訊息的 Flags
timestamp 事件觸發時間 (ms)
publish_received_at PUBLISH 訊息到達 Broker 的時間 (ms)
node 事件觸發所在節點

$events/message_dropped (訊息丟棄)

event 事件型別,固定為 "message.dropped"
id MQTT 訊息 ID
reason 訊息丟棄原因
clientid 訊息目的 Client ID
username 訊息目的使用者名稱
payload MQTT 訊息體
peerhost 客戶端的 IPAddress
topic MQTT 主題
qos MQTT 訊息的 QoS
flags MQTT 訊息的 Flags
timestamp 事件觸發時間 (ms)
publish_received_at PUBLISH 訊息到達 Broker 的時間 (ms)
node 事件觸發所在節點

$events/client_connected (終端連線成功)

event 事件型別,固定為 "client.connected"
clientid 訊息目的 Client ID
username 訊息目的使用者名稱
mountpoint 主題掛載點(主題字首)
peername 終端的 IPAddress 和 Port
sockname emqx 監聽的 IPAddress 和 Port
proto_name 協議名字
proto_ver 協議版本
keepalive MQTT 保活間隔
clean_start MQTT clean_start
expiry_interval MQTT Session 過期時間
is_bridge 是否為 MQTT bridge 連線
connected_at 終端連線完成時間 (s)
timestamp 事件觸發時間 (ms)
node 事件觸發所在節點

$events/client_disconnected (終端連線斷開)

event 事件型別,固定為 "client.disconnected"
reason 終端連線斷開原因: normal:客戶端主動斷開 kicked:服務端踢出,通過 REST API keepalive_timeout: keepalive 超時 not_authorized: 認證失敗,或者 acl_nomatch = disconnect 時沒有許可權的 Pub/Sub 會主動斷開客戶端 tcp_closed: 協議錯誤 internal_error: 畸形報文解析出錯
clientid 訊息目的 Client ID
username 訊息目的使用者名稱
peername 終端的 IPAddress 和 Port
sockname emqx 監聽的 IPAddress 和 Port
disconnected_at 終端連線斷開時間 (s)
timestamp 事件觸發時間 (ms)
node 事件觸發所在節點

$events/session_subscribed (終端訂閱成功)

event 事件型別,固定為 "session.subscribed"
clientid 訊息目的 Client ID
username 訊息目的使用者名稱
peerhost 客戶端的 IPAddress
topic MQTT 主題
qos MQTT 訊息的 QoS
timestamp 事件觸發時間 (ms)
node 事件觸發所在節點

$events/session_unsubscribed (取消終端訂閱成功)

event 事件型別,固定為 "session.unsubscribed"
clientid 訊息目的 Client ID
username 訊息目的使用者名稱
peerhost 客戶端的 IPAddress
topic MQTT 主題
qos MQTT 訊息的 QoS
timestamp 事件觸發時間 (ms)
node 事件觸發所在節點

SQL 關鍵字和符號

SELECT - FROM - WHERE 語句

SELECT 語句用於決定最終的輸出結果裡的欄位。比如:

下面 SQL 的輸出結果中將只有兩個欄位 "a" 和 "b":

SELECT a, b FROM "t/#"

WHERE 語句用於對本事件中可用欄位,或 SELECT 語句中定義的欄位進行條件過濾。比如:

# 選取 username 為 'abc' 的終端發來的訊息,輸出結果為所有可用欄位:

SELECT * FROM "#" WHERE username = 'abc'

## 選取 clientid 為 'abc' 的終端發來的訊息,輸出結果將只有 cid 一個欄位。
## 注意 cid 變數是在 SELECT 語句中定義的,故可在 WHERE 語句中使用:

SELECT clientid as cid FROM "#" WHERE cid = 'abc'

## 選取 username 為 'abc' 的終端發來的訊息,輸出結果將只有 cid 一個欄位。
## 注意雖然 SELECT 語句中只選取了 cid 一個欄位,所有訊息釋出事件中的可用欄位 (比如 clientid, username 等) 仍然可以在 WHERE 語句中使用:

SELECT clientid as cid FROM "#" WHERE username = 'abc'

## 但下面這個 SQL 語句就不能工作了,因為變數 xyz 既不是訊息釋出事件中的可用欄位,又沒有在 SELECT 語句中定義:

SELECT clientid as cid FROM "#" WHERE xyz = 'abc'

FROM 語句用於選擇事件來源。如果是訊息釋出則填寫訊息的主題,如果是事件則填寫對應的事件主題。