1. 程式人生 > 其它 >乾貨丨如何用DolphinDB流計算引擎實現感測器資料異常檢測

乾貨丨如何用DolphinDB流計算引擎實現感測器資料異常檢測

技術標籤:工業物聯網DolphinDB時序資料庫物聯網異常檢測流計算

DolphinDBdatabase提供了流資料表(stream table)和流計算引擎用於實時資料處理,包括物聯網中感測器資料的異常檢測。內建的異常檢測引擎(Anomaly Detection Engine)能滿足大部分異常檢測場景的需求。如果異常檢測邏輯複雜且較為特殊,標準化的異常檢測引擎不能滿足要求,使用者可以用自定義訊息處理函式來實現。


1. 應用需求

一個監控系統,一秒鐘採集一次資料。現有以下2個異常檢測需求:

  • 每3分鐘內,若感測器溫度出現2次40攝氏度以上並且3次30攝氏度以上,系統報警。
  • 若感測器網路斷開,5分鐘內無資料,系統報警。

上述的報警是指若偵測到異常,向一個流資料表中寫一條記錄。


2. 設計思路

分散式時序資料庫DolphinDB的流計算框架目前已支援時序聚合引擎橫截面聚合引擎異常檢測引擎和自定義流計算引擎:

  • 時序聚合引擎(Time-Series Aggregator):能對裝置狀態進行縱向聚合計算(按時間序列聚合),或者將多個裝置狀態橫向聚合後再按時間聚合。時序聚合支援滑動視窗的流式計算。DolphinDB對內建的視窗聚合函式均進行了效能優化,單核CPU每秒可完成近百萬狀態的時序聚合。
  • 橫截面聚合引擎(Cross Sectional Aggregator):是快照引擎的擴充套件,能對裝置狀態進行橫向聚合計算,比如計算一批裝置的溫度均值。
  • 異常檢測引擎(Anomaly Detection Engine):能實時檢測資料是否符合使用者自定義的警報指標,如發現異常資料,將它們輸出到表中,滿足物聯網實時監控和預警的需求。
  • 自定義流計算引擎:當以上三種引擎都不能滿足需求時,使用者也可以使用DolphinDB指令碼或API語言自定義訊息處理函式。

對於第一個需求即3分鐘內感測器溫度出現異常系統即報警,異常檢測引擎恰好適用。只需要簡單的用DolphinDB指令碼寫一個表示式描述一下異常邏輯即可。但第2個需求不適用。異常檢測引擎是按裝置分組進行處理的。每次有新資料流入才觸發計算,或每隔一段時間,在固定長度的移動視窗中才進行聚合計算。一個感測器若沒有產生新資料,無法觸發計算。解決辦法是自定義一個訊息處理函式(message handler)去計算和檢測。具體實現思路是:用一個鍵值記憶體表記錄每個感測器的最新採集時間。訊息以一定時間間隔(比如1秒)進入訊息處理函式。訊息處理函式首先更新鍵值記憶體表,然後檢查這個表中每個裝置記錄的最新採集時間是否超過5分鐘,若有即報警。

3.詳細實現步驟


3.1 定義輸入輸出流資料表

首先定義一個流資料表用於接收實時採集的感測器資料,並用enableTableShareAndPersistence函式把流資料表共享和持久化到硬碟上。cacheSize引數限制記憶體中保留的最大資料量是100萬行。雖然感測器裝置有很多指標,因為本例只涉及溫度指標,所以本例對錶結構進行了簡化,表結構僅包含三列,即感測器編號deviceID,時間ts和溫度temperature。程式碼如下:

st=streamTable(1000000:0,`deviceID`ts`temperature,[INT,DATETIME,FLOAT])
enableTableShareAndPersistence(table=st,tableName=`sensor,asynWrite=false,compress=true, cacheSize=1000000)

其次定義報警輸出流資料表用於異常檢測引擎的輸出。按照DolphinDB使用者手冊中對建立異常檢測引擎函式createAnomalyDetectionEngine各引數的說明,異常引擎對輸出表的格式有嚴格要求,即它的第一列必須是時間型別,用於存放檢測到異常的時間戳,並且該列的資料型別需與輸入表的時間列一致。如果keyColumn(分組列)引數不為空,那麼第二列為keyColumn,在本例中,分組列為感測器編號deviceID。之後的兩列分別為int型別和string/symbol型別,用於記錄異常的型別(在metrics中的下標)和異常的內容。建表程式碼如下:

share streamTable(1000:0, `time`deviceID`anomalyType`anomalyString, [DATETIME,INT,INT, SYMBOL]) as warningTable

3.2 建立異常檢測引擎,實現感測器溫度異常報警的功能

異常檢測引擎中,設定異常指標為sum(temperature > 40) > 2 && sum(temperature > 30) > 3 ,分組列(keyColumn)為感測器編號deviceID,資料視窗windowSize為180秒,計算的時間間隔step為30秒。這些引數如何設定可參考異常檢測引擎。程式碼如下:

engine = createAnomalyDetectionEngine(name="engine1", metrics=<[sum(temperature > 40) > 2 && sum(temperature > 30) > 3  ]>,dummyTable=sensor, outputTable=warningTable, timeColumn=`ts, keyColumn=`deviceID, windowSize = 180, step = 30)
subscribeTable(tableName="sensor", actionName="sensorAnomalyDetection", offset=0, handler= append!{engine}, msgAsTable=true)

3.3 建立自定義訊息處理函式,實現感測器離線報警的功能

第二個需求,需要儲存每個感測器的最新資料採集時間,用於判斷是否已有5分鐘未採集資料。本例採用鍵值記憶體表儲存每個裝置的最新狀態,並以感測器編號deviceID作為主鍵。鍵值表中,基於鍵值的查詢和更新具有非常高的效率。收到感測器資料時,用append!函式更新鍵值表中的記錄。如果新記錄中的主鍵值不存在於表中,那麼往表中新增新的記錄;如果新記錄的主鍵值與已有記錄的主鍵值重複時,會更新表中該主鍵值對應的記錄。

在輸出異常資訊到報警輸出流資料表時,異常的型別anomalyType因為上節異常檢測引擎已用0,所以這裡設為1。異常的內容設為空。

配置函式subscribeTable的引數throttle和batchSize,可以達到批量處理訊息提升效能的目的。引數throttle決定handler間隔多久時間處理一次訊息,本例中設定為每秒處理一次。這裡要注意當訊息的數量達到batchSize時,即便間隔時間沒到也會處理進來的訊息,所以需要將batchSize設定為一個比較大的數。示例程式碼如下,其中感測器數deviceNum假設為3:

t=keyedTable(`deviceID,100:0,`deviceID`time,[INT,DATETIME])
deviceNum=3
insert into t values(1..deviceNum,take(now().datetime(),deviceNum))
def checkNoData (mutable keyedTable, mutable outputTable, msg) {
	keyedTable.append!(select deviceID, ts from msg)
	warning = select now().datetime(), deviceID, 1 as anomalyType, "" as anomalyString from keyedTable where time < datetimeAdd(now().datetime(), -5, "m")
	if(warning.size() > 0) outputTable.append!(warning)
}
subscribeTable(tableName="sensor", actionName="noData", offset=0,handler=checkNoData{t, warningTable}, msgAsTable=true, batchSize=1000000, throttle=1)


4. 模擬寫入與驗證

假設3個感測器,一秒鐘採集一次資料,前一分鐘所有裝置都有資料,1分鐘後第3個裝置無資料。示例程式碼如下:

def writeData(){
	deviceNum = 3
	for (i in 0:60) {
		data = table(take(1..deviceNum, deviceNum) as deviceID, take(now().datetime(), deviceNum) as ts, rand(10..41, deviceNum) as temperature)
		sensor.append!(data)
		sleep(1000)
	}
	deviceNum = 2
	for (i in 0:600) {
		data = table(take(1..deviceNum, deviceNum) as deviceID ,take(now().datetime(), deviceNum) as ts, rand(10..45,deviceNum) as temperature)
		sensor.append!(data)
		sleep(1000)
	}	
}
submitJob("simulateData", "simulate sensor data", writeData)

執行後,查詢報警輸出表warningTable,可看到結果示例如下:


附錄

測試程式碼