1. 程式人生 > >EMQ X 規則引擎系列(五)儲存訊息到 Cassandra 資料庫

EMQ X 規則引擎系列(五)儲存訊息到 Cassandra 資料庫

Cassandra 介紹與安裝

Cassandra 是來自 Apache 的開源分散式資料庫系統,它能在支援線性擴充套件高可用的特性下,不損失原有的讀寫效能。目前廣泛運用於各個大企業的後端服務中,例如 Netflix、Apple 等已部署上千個節點。

Cassandra 的安裝參考:http://cassandra.apache.org/doc/latest/getting_started/installing.html

原理概覽

通過配置規則引擎,EMQ X 可將指定主題下滿足某條件的訊息儲存到 Cassandra 資料庫。其訊息流向簡圖如下:

其中:

  • PUB/SUB:為 EMQ X 中的釋出訂閱處理邏輯。
  • Rule:IoT 訊息規則,提取、篩選、轉換訊息報文中的資料。
  • Action: 為具體執行的動作。例如存資料庫、寫 Kafka 等。

場景介紹

為說明規則引擎在 Cassandra 資料庫下的使用方式,我們以 將發動機轉速超過 8000 的車輛狀態存入 Cassandra 中 為例。

假設車輛上報狀態資訊如下:

  • 上報主題:cmd/state/:id,主題中 id 代表車輛客戶端識別碼

  • 訊息體:

    {
      "id": "NXP-058659730253-963945118132721-22", // 客戶端識別碼
      "speed": 32.12, // 車輛速度
      "direction": 198.33212, // 行駛方向
      "tachometer": 3211, // 發動機轉速,數值大於 8000 時才需儲存
      "dynamical": 8.93, // 瞬時油耗
      "location": { // GPS 經緯度資料
        "lng": 116.296011,
        "lat": 40.005091
      },
      "ts": 1563268202 // 上報時間
    }
    

準備工作

建立資料庫

建立 emqx_rule_engine_output 表空間以儲存訊息資料:

CREATE KEYSPACE emqx_rule_engine_output WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}  AND durable_writes = true;

建立資料表

根據場景需求,建立資料表 use_statistics 結構及欄位註釋如下:

USE emqx_rule_engine_output;

CREATE TABLE use_statistics (
  msgid text,
  client_id text,
  speed double,
  tachometer int,
  ts int,
  PRIMARY KEY (msgid)
);

建立成功後確認資料表是否存在:

root@cqlsh:emqx_rule_engine_output> use emqx_rule_engine_output ;
root@cqlsh:emqx_rule_engine_output> desc use_statistics ;

CREATE TABLE emqx_rule_engine_output.use_statistics (
    msgid text PRIMARY KEY,
    client_id text,
    speed double,
    tachometer int,
    ts int
) WITH bloom_filter_fp_chance = 0.01
    AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
    AND comment = ''
    AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
    AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
    AND crc_check_chance = 1.0
    AND dclocal_read_repair_chance = 0.1
    AND default_time_to_live = 0
    AND gc_grace_seconds = 864000
    AND max_index_interval = 2048
    AND memtable_flush_period_in_ms = 0
    AND min_index_interval = 128
    AND read_repair_chance = 0.0
    AND speculative_retry = '99PERCENTILE';

配置規則引擎

建立資源

開啟 EMQ X Dashboard,進入左側選單的 資源 頁面,點選 新建 按鈕,選擇 Cassandra 資源型別進行建立:

EMQ X 叢集中節點所在網路環境可能互不相同,資源建立成功後點擊列表中 狀態按鈕,檢視各個節點資源連線狀況,如果節點上資源不可用,請檢查配置是否正確、網路連通性,並點選 重連 按鈕手動重連。

建立規則

進入左側選單的 規則 頁面,點選 新建 按鈕,進行規則建立。這裡選擇觸發事件 訊息釋出,在訊息釋出時觸發該規則進行資料處理。

選定觸發事件後,我們可在介面上看到可選欄位及示例 SQL:

篩選所需欄位

規則引擎使用 SQL 語句處理規則條件,該業務中我們需要將 payload 中所有欄位單獨選擇出來,使用 payload.<fieldName> 格式進行選擇,還需要訊息上下文的 topicqosid 資訊,當前 SQL 如下:

SELECT
  payload.id as client_id, payload.speed as speed, 
  payload.tachometer as tachometer,
  payload.ts as ts, id
FROM
  "message.publish"
WHERE
  topic =~ 't/#'

確立篩選條件

使用 SQL 語句 WHERE 字句進行條件篩選,該業務中我們需要定義兩個條件:

  • 僅處理 cmd/state/:id 主題,使用主題萬用字元 =~topic 進行篩選:topic =~ 'cmd/state/+'
  • 僅處理 tachometer > 8000 的訊息,使用比較符對 tachometer 進行篩選:payload.tachometer > 8000

組合上一步驟得到 SQL 如下:

SELECT
  payload.id as client_id, payload.speed as speed, 
  payload.tachometer as tachometer,
  payload.ts as ts,
  id
FROM
  "message.publish"
WHERE
  topic =~ 'cmd/state/+'
  AND payload.tachometer > 8000

使用 SQL 測試功能進行輸出測試

藉助 SQL 測試功能,我們可以實時檢視當前 SQL 處理後的資料輸出,該功能需要我們指定 payload 等模擬原始資料。

payload 資料如下,注意更改 tachometer 數值大小,以滿足 SQL 條件:

{
  "id": "NXP-058659730253-963945118132721-22",
  "speed": 32.12,
  "direction": 198.33212,
  "tachometer": 9001,
  "dynamical": 8.93,
  "location": {
    "lng": 116.296011,
    "lat": 40.005091
  },
  "ts": 1563268202
}

點選 SQL 測試 切換按鈕,更改 topicpayload 為場景中的資訊,點選 測試 按鈕檢視資料輸出:

測試輸出資料為:

{
  "client_id": "NXP-058659730253-963945118132721-22",
  "id": "589A429E9572FB44B0000057C0001",
  "speed": 32.12,
  "tachometer": 9001,
  "ts": 1563268202
}

測試輸出與預期相符,我們可以進行後續步驟。

新增響應動作,儲存訊息到 Cassandra

SQL 條件輸入輸出無誤後,我們繼續新增相應動作,配置寫入 SQL 語句,將篩選結果儲存到 Cassandra。

點選響應動作中的 新增 按鈕,選擇 儲存資料到 Cassandra 動作,選取剛剛選定的資源,我們使用 ${fieldName} 語法填充 SQL 語句,將資料插入到資料庫,最後點選 新建 按鈕完成規則建立。

動作的 SQL 配置如下:

INSERT INTO use_statistics (msgid, client_id, speed, tachometer, ts) VALUES (${id}, ${client_id}, ${speed}, ${tachometer}, ${ts});

測試

預期結果

我們成功建立了一條規則,包含一個處理動作,動作期望效果如下:

  1. 裝置向 cmd/state/:id 主題上報訊息,當訊息中的 tachometer 數值超過 8000 時將命中 SQL,規則列表中 已命中 數字增加 1;
  2. Cassandra emqx_rule_engine_output 資料庫的 use_statistics 表中將增加一條資料,數值與當前訊息一致。

使用 Dashboard 中的 Websocket 工具測試

切換到 工具 => Websocket 頁面,使用任意資訊客戶端連線到 EMQ X,連線成功後在 訊息 卡片傳送如下資訊:

  • 主題:cmd/state/NXP-058659730253-963945118132721-22

  • 訊息體:

    {
      "id": "NXP-058659730253-963945118132721-22",
      "speed": 32.12,
      "direction": 198.33212,
      "tachometer": 8081,
      "dynamical": 8.93,
      "location": {
        "lng": 116.296011,
        "lat": 40.005091
      },
      "ts": 1563268202
    }
    

點選 傳送 按鈕,此時訊息體中的 tachometer 數值,滿足上面設定的 tachometer > 8000 的條件,當前規則已命中統計值為加 1。

Cassandra 命令列中檢視資料表記錄得到資料如下:

至此,我們通過規則引擎實現了使用規則引擎儲存訊息到 Cassandra 資料庫的業務開發。


更多資訊請訪問我們的官網 emqx.io,或關注我們的開源專案 github.com/emqx/emqx ,詳細文件請訪問

相關推薦

EMQ X 規則引擎系列儲存訊息Cassandra 資料庫

Cassandra 介紹與安裝 Cassandra 是來自 Apache 的開源分散式資料庫系統,它能在支援線性擴充套件 、 高可

EMQ X 規則引擎系列儲存訊息到 MySQL 資料庫

場景介紹 該場景需要將 EMQ X 指定主題下且滿足條件的訊息儲存到 MySQL 資料庫。為了便於後續分析檢索,訊息內容需要進

# EMQ X 持久化外掛系列- 訊息儲存到 OpenTSDB 資料庫

OpenTSDB 是可擴充套件的分散式時序資料庫,底層依賴 HBase 並充分發揮了HBase的分散式列儲存特性,支援數百萬每秒的

OllyDBG 入門系列訊息斷點及 RUN 跟蹤

OllyDBG 入門系列(五)-訊息斷點及 RUN 跟蹤作者:CCDebuger找了幾十個不同語言編寫的 crackme,發現只用訊息斷點的話有很多並不能真正到達我們要找的關鍵位置,想想還是把訊息斷點和 RUN 跟蹤結合在一起講,更有效一點。關於訊息斷點的更多內容大家可以參

微信開發系列_訊息,事件的處理

來自微信端的事件可以有多種 1:文字訊息 2:圖片訊息 3:語音訊息 4:點選按鈕事件 5:掃碼事件等等 使用者傳送的訊息或者事件  都是以xml的形式傳送給我們開發者的(也就是伺服器端) 我們也是以xml的格式返回去的 所有首先 我們得把使用者的訊息給獲取並解析了(我

python系列centos6.x中部署多個python版本

python pyenv centos6.x virtualenv 博主QQ:819594300博客地址:http://zpf666.blog.51cto.com/有什麽疑問的朋友可以聯系博主,博主會幫你們解答,謝謝支持!使用pyenv+virtualenv方式部署python多版本pyenv

Vert.x系列--ContextImpl原始碼分析

開發十年,就只剩下這套架構體系了! >>>   

源碼分析系列x264_ratecontrol_dataflow

技術 stc 碼率控制 碼率 targe article nbsp target 緩沖區 http://www.cnblogs.com/xkfz007/articles/2616159.html 碼率控制部分關鍵函數 5.1 x264_ratecontrol_star

Windows Server 2012單林、多樹、多站點AD 部署系列創建樹域

windows server 域 樹域 站點 多域環境 本章博文開始在BJ、SH、GZ站點為林bicionline.org 創建樹域控及調配的相關DNS等功能。網絡配置:1、為BJ站點ds04、SH站點pdc02和GZ站點ad02配置網絡, IP配置分別如下:(註:在創建域樹環境時,確

PHP系列PHP字符串處理

php字符串處理 php字符串處理1、字符串的處理方式(分割匹配找查替換)//聲明一個關聯數組,數組名為$lamp, 成員有4個$lamp = array( ‘os‘=>‘Linux‘,‘webserver‘ =>‘Apache‘, ‘db‘=>‘MySQL‘, ‘language‘=>

Python操作rabbitmq系列:根據主題分配消息

method type 同時 elephant com .info err 現在 bin 接著上一章,使用exchange_type=‘direct‘進行消息傳遞。這樣消息會完全匹配後發送到對應的接收端。現在我們想幹這樣一件事: C1獲取消息中包含:orange內容的消息,

【轉】Spring MVC系列之自定義數據綁定---HandlerMethodArgumentResolver

開閉 src pat 獲取參數 mvc .net 定義 開閉原則 淺析 介紹 前面幾節我們介紹了Spring MVC的幾種常見的數據綁定的方法,可以靈活地獲取用戶請求中的參數,例如@PathVariable,@ModelAttribute,@RequestPar

Vue入門系列Vue實例詳解與生命周期

auto res context mode parent all from bool silent 【入門系列】 【本文轉自】   http://www.cnblogs.com/fly_dragon Vue的實例是Vue框架的入口,其實也就是前端的ViewM

Scala入門系列:面向對象之類

important ica back ember const 就會 out 不用 spa // 定義類,包含field以及method class HelloWorld { private var name = "Leo" def sayHello() { prin

C# 多線程系列

技術 多線程 post 生死 div 求和 設置 wid 按順序 死鎖 為了線程安全,我們在需要的是會使用”獨占鎖“,但過多的鎖定也會有麻煩。多個線程因為競爭資源相互等待而造成的僵局,我們稱為死鎖。若無外力作用,這些進程將都無法推進。在死鎖中,至少有兩個線程被掛起,並

Java 設計模式系列單例模式

重要 理解 iat 版本 ide 默認 ces 內部實現 成功 Java 設計模式系列(五)單例模式 單例模式確保某個類只有一個實例,而且自行實例化並向整個系統提供這個實例。 一、懶漢式單例 /** * 懶漢式單例類.在第一次調用的時候實例化自己 * 1. 構造器私

Java Thread系列synchronized

執行 java 釋放 lock java t 操作 bject 線程 出現 Java Thread系列(五)synchronized synchronized鎖重入 關鍵字 synchronized 擁有鎖重入的功能,也就是在使用 synchronized 時,當線程等到

.NET面試題系列數據結構(Array、List、Queue、Stack)及線程安全問題

種類型 增刪 叠代器 鎖機制 時間 AS aop 不同 obj 集合 1. Array(數組): 分配在連續內存中,不能隨意擴展,數組中數值類型必須是一致的。數組的聲明有兩種形式:直接定義長度,然後賦值;直接賦值。   缺點:插入數據慢。   優點:性

FuelPHP 系列 ------ Security 防禦

func 所有 提交 odi scrip tac html option rip 項目中難免會有 form 提交,對用戶輸入的所有信息進行過濾,可以避免 XSS 攻擊,防止 SQL 註入。 一、設置配置信息 首先在 config.php 文件中,對 security 相關信

SQL系列—— 排序order by

消息 使用 選擇列 table asc 錯誤 實現 重用 應該 對查詢結果進行排序是日常應用開發中最為常見的需求,在SQL中通過order by實現。order by是select語句中一部分,即子句。 1.order by 1.1 單列排序 其實,檢索出的數據並不是隨機顯