使用Python編寫指令碼將MQTT資料轉存至InfluxDB
前言
之前使用Rabbitmq部署了一個簡單的MQTT伺服器,暫未做使用者隔離,也部署了InfluxDB時序資料庫,但是並不能直接通過配置將MQTT伺服器的資料轉存至時序資料庫中,於是我決定自己寫指令碼實現下.
準備
開啟shell使用
pip install influxdb
安裝InluxDB所需模組
pip install paho-mqtt
安裝Rabbmq所需模組
原始碼
# coding=utf-8
import json
import random
import threading
import os
import paho.mqtt.client as mqtt
import time
from influxdb import InfluxDBClient
from my_lib.code_handle.code_handle import auto_code
class Mqtt_handle:
topic_sub='$dp'
topic_pub='$info'
counts = 0
clientID = ''
for i in range(0, 2):
clientID = clientID.join(str(random.uniform(0, 1)))
mqtt_client = mqtt.Client(clientID)
DB_client = InfluxDBClient(self._host, 8086 , '', '', 'mydb') # 初始化
def __init__(self, host, port):
self._host = host
self._port = port
self.mqtt_client.on_connect = self._on_connect # 設定連線上伺服器回撥函式
self.mqtt_client.on_message = self._on_message # 設定接收到伺服器訊息回撥函式
def connect(self, username=None, password=None) :
self.mqtt_client.username_pw_set(username, password)
self.mqtt_client.connect(self._host, self._port, 60) # 連線伺服器,埠為1883,維持心跳為60秒
def publish(self, data):
self.mqtt_client.publish(self.topic_pub, data)
def loop(self, timeout=None):
thread = threading.Thread(target=self._loop, args=(timeout,))
thread.start()
def _loop(self, timeout=None):
if not timeout:
self.mqtt_client.loop_forever()
else:
self.mqtt_client.loop(timeout)
def _on_connect(self, client, userdata, flags, rc):
local_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
with open('./run.log', 'a+')as f:
f.write('@Run ' + local_time + ' Connected with result code :' + str(rc))
client.subscribe(self.topic_sub)
def _on_message(self, client, userdata, msg): # 從伺服器接受到訊息後回撥此函式
data_json = auto_code(str(msg.payload))
if self._is_json(data_json):
data_list = [json.loads(data_json)]
#如果符合InfluxDB格式就轉存至資料庫
if 'measurement' in data_list[0] and 'tags' in data_list[0] and 'fields' in data_list[0]:
try:
DB_client.write_points(data_list)
self.counts += 1
local_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
#//**********//記錄一個上傳日誌
with open('./upload.log', 'a+')as f:
f.write('Success,counts:' + str(self.counts) + ' Time:' + local_time + '\n')
except Exception as e:
with open('./upload.log', 'a+')as f:
f.write(e.message)
f.write('\nTopic:' + auto_code(str(msg.topic)) + " Msg:" + data_json + '\n\n')
#//**********//
#如果接受到停止指令就停止程式並記錄一個停止日誌
elif data_list[0].has_key('cmd') and data_list[0]['cmd'] == 'exit':
print '\[email protected]_handle Exit\n'
with open('./run.log', 'a')as f:
local_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
f.write(
'@Stop ' + local_time + ' Topic:' + auto_code(str(msg.topic)) + " Msg:" + data_json + '\n')
os._exit(0)#停止程式
#解析JSON前先判斷資料 是否是JSON格式,避免程式崩潰
def _is_json(self, data):
try:
json.loads(data)
except ValueError:
return False
return True
if __name__ == '__main__':
local_host = '127.0.0.1'
DB_client = InfluxDBClient(local_host, 8086, '', '', 'mydb') # 初始化
mqtt_client = Mqtt_handle(local_host, 1883)
mqtt_client.connect('influxdb', 'influxdb')
mqtt_client.loop()
寫一個簡單的Shell執行一下指令碼:
#!/bin/bash
nohup python /home/ubuntu/app/py/mqttDB/mqtt_handle.py &
相關推薦
使用Python編寫指令碼將MQTT資料轉存至InfluxDB
前言 之前使用Rabbitmq部署了一個簡單的MQTT伺服器,暫未做使用者隔離,也部署了InfluxDB時序資料庫,但是並不能直接通過配置將MQTT伺服器的資料轉存至時序資料庫中,於是我決定自己寫指令碼實現下. 準備 開啟shell使用 pip in
Python指令碼:將Redis資料轉存到Mysql列表中
目錄 一、思路 三、總結 一、思路 連線指定的redis和mysql資料庫,從redis中取出資料,然後存到mysql中,中間會遇到幾個問題,在下面的程式碼片段中指出 二、程式碼實現 # coding=utf-8 import js
Flume將 kafka 中的資料轉存到 HDFS 中
flume1.8 kafka Channel + HDFS sink(without sources) 將 kafka 中的資料轉存到 HDFS 中, 用作離線計算, flume 已經幫我們實現了, 新增配置檔案, 直接啟動 flume-ng 即可. The Kafka channel can be
Oracle用定時任務儲存過程將資料轉存到歷史表,提高查詢速度
一、定義儲存過程 CREATE OR REPLACE PROCEDURE Sync_INFO_HISTORY IS BEGIN insert into depart_passenger_info
python指令碼——將同一個資料夾下的相同檔名的不同檔案分開
需求:一個資料夾下有相同檔名的兩種格式的檔案,且數量相等,我的兩種檔案格式是:jpg和tif.rbox.txt,想要把這兩種檔案分別放到兩個資料夾裡面 例如:將789資料夾下的兩種檔案分別放到456資料夾和000資料夾下(原來的456資料夾和000資料夾是空的) 程式
將mongoDB資料轉化為json---Python實現
前提背景 我們知道,mongoDB資料庫表中的一條資料(document)在呈現的時候,很像json。在平時的使用中,有時候會有這樣的需求:我們需要將資料庫中的資料讀出來,並將其傳送(例如ajax請求)到前端頁面去解析呈現。顯然此時,為了更容易解析,我們需要將
將慢日誌轉存到數據庫
execute ins com mys spl 轉存 open for mes import re import MySQLdb host=‘10.76.45.7‘ port=3306 user=‘test‘ password=‘test‘ dbName=‘test‘
Java 輸入一行以空格分隔字元作為輸入資料轉存為陣列形式並輸出
用java寫一些演算法題目的時候需要輸入一些資料,像C或者CPP都可以有專用的輸入函式進行輸入,在Java裡需要稍微麻煩一些,具體程式碼如下: import java.util.Scanner; public class Main{ public static void main(
將cifar10資料轉成圖片
#將cifar10轉成圖片 import numpy as np import matplotlib.image as plimg from PIL import Image import pickle as p def load_CIFAR_batch(filename):
將mnist資料轉成圖片
from tensorflow.examples.tutorials.mnist import input_data import scipy.misc import os # 注意儲存的路徑 mnist = input_data.read_data_sets("MNIST_DATA", o
Android Gson 將json資料轉double 數值為0.0的問題
今天上午改需求 遇到一個奇葩的Bug 返回的為double 型別 於是在實體類裡寫private double space; get set略。。。 但是顯示的時候為0.0 於是將實體類的double 改為String 
BIGEMPA如何將高程資料轉成南方CASS的DAT格式
需要的工具 1. BIGEMPA地圖下載器(全能版已授權) 2. Global Mapper 14 第一步:將下載好的高程資料DEM直接拖到global mapper中(如何下載高程DEM?),如下圖所示:
如何將高程資料轉成南方CASS的DAT格式
釋出時間:2018-01-17 版權: 需要的工具 第一步:將下載好的高程資料DEM直接拖到global mapper中(如何下載高程DEM?),如下圖所示: 第二步:將DEM資料儲存為高程點的文字檔案,如下圖: 點選後,出現下圖:
FFmpeg 將YUV資料轉RGB
只要開始初始化一次,結束後釋放就好,中間可以迴圈轉碼 AVFrame *m_pFrameRGB,*m_pFrameYUV; uint8_t *m_rgbBuffer,*m_yuvBuffer; struct SwsContext *m_img_convert_ctx; void i
BIGEMAP 如何將高程資料轉成南方CASS的DAT格式
需要的工具 1. BIGEMPA地圖下載器(全能版已授權) 2. Global Mapper 14 第一步:將下載好的高程資料DEM直接拖到global mapper中如下圖所示: 第二步:將DEM資料儲存為高程點的文字
batch指令碼將proto檔案轉化為js
要進入proto的資料夾 開啟cmd 輸入dir *.proto > aj.text 開啟aj.text 刪除多餘的空行和沒有的行(不含檔名的行) 新建一個demo.bat @Echo Off Setlocal Enabledelayedexpansi
【深度學習框架Caffe學習與應用】第三課 將圖片資料轉化為LMDB資料``
1.將圖片資料轉化為LMDB資料 第一步:建立圖片檔案列表清單,一般為一個txt檔案,一行一張圖片 我在caffe/data/目錄下新建一個test_data的資料夾,裡面放訓練集及資料集
BIGEMAP如何將高程資料轉成南方CASS的DAT格式
1、添加了ArcGis切片快取的conf.xml檔案中的各級比例引數; 2、修改了天地圖四川請求地址; 3、針對向量匯出成csv格式(地理座標)時,可以設定座標格式(度分秒等) 4、增加了通過離線掃碼方式使用下載器,同時可以新增一鍵離線地圖服務; 5、優化了7引數計算的準確性; 6、修改了從工具箱中
Access資料庫資料轉存到MySql資料庫中
目錄 一、Navicat自帶匯入Access(*.mdb)資料的方式 二、藉助ODBC當然Access資料 1. 建立ODBC資料來源 2. 通過Navicat匯入資料 3. 新增鍵等 使用Navicat 8 for MySql來匯入資料,Access是2003版本的
oracle 海量資料轉存插入分割槽表
某普通表T,由於前期設計不當沒有分割槽,如今幾年來的資料量已達9億+, 空間佔用大約350G,線上重定義為分割槽表不現實,故採取申請時間視窗停此表應用,改造為分割槽表。1.建立分割槽表-- Create table 建立分割槽表T_PART,分割槽從14年6月開始。creat