InfluxDB時序資料庫
前言
在我們很多應用中會遇到有一種基於一系列時間的資料需要處理,通過時間的順序可以將這些資料點連成線,再通過資料統計後可以做成多緯度的報表,也可通過機器學習來實現資料的預測告警。而時序資料庫就是用於存放管理這種有著時間順序資料的,時序資料庫一般都支援時序資料的快速寫入、持久化、多緯度的聚合查詢等基本功能。
InfluxDB簡介
InfluxDB是一個基於時間序列資料而開發的高效能資料儲存平臺,它可以對時序資料進行高吞吐量的攝取、壓縮和實時查詢。InfluxDB是用Go語言編寫的,它會編譯成一個沒有外部依賴的二進位制檔案來執行,支援Java、JavaScript、c#等語言。InfluxDB支援類似SQL的查詢語言,同時還支援正則表示式、算術表示式和時間序列特定函式以加速資料的處理效率。如下是跟InfluxDB相關的網址:
InfluxDB官網:https://www.influxdata.com/
InfluxDB官方文件:https://docs.influxdata.com/influxdb/
InfluxDB官方下載:https://portal.influxdata.com/downloads
InfluxDB客戶端工具下載:https://docs.influxdata.com/influxdb/v1.6/tools/api_client_libraries/
特點:
- 無結構(無模式):可以是任意數量的列
- 可以設定metric的儲存時間
- 支援與時間有關的相關函式(如min、max、sum、count、mean、median等),方便統計
- 支援儲存策略:可以用於資料的刪改。(influxDB沒有提供資料的刪除與修改方法)
- 支援連續查詢:是資料庫中自動定時啟動的一組語句,和儲存策略搭配可以降低InfluxDB的系統佔用量。
- 原生的HTTP支援,內建HTTP API
- 支援類似sql語法
- 支援設定資料在叢集中的副本數
- 支援定期取樣資料,寫入另外的measurement,方便分粒度儲存資料。
- 自帶web管理介面,方便使用(登入方式:http://:8083)
InfluxDB操作
這裡將會簡單的介紹下如何操作InfluxDB,通過這些操作基本也能滿足工作上的需要了。操作InfluxDB可以通過命令列工具,也可藉助開源的客戶端工具,我這裡使用的是一款名叫“InfluxDBStudio”基於C#編寫的開源工具。常用操作的程式碼如下:
#顯示使用者
show users
#建立使用者
create user "username" with password 'password'
#建立管理員許可權使用者
create user "username" with password 'password' with all privileges
#刪除使用者
drop user "username"
#建立資料庫
create database "db_name"
#顯示所有的資料庫
show databases
#刪除資料庫
drop database "db_name"
#使用資料庫
use db_name
#顯示該資料庫中所有的表
show measurements
#建立表,直接在插入資料的時候指定表名,其中test為表名
insert test,host=127.0.0.1,monitor_name=test count=1
#刪除表
drop measurement "measurement_name"
#查詢資料
select * from test order by time desc
#檢視當前資料庫的資料儲存策略(Retention Policies)
show retention policies on "db_name"
#建立新的資料儲存策略
#rp_name:策略名
#db_name:具體的資料庫名;
#3w:儲存3周,3周之前的資料將被刪除,influxdb具有各種事件引數,比如:h(小時),d(天),w(星期)
#replication 1:副本個數,一般為1就可以了
#default:設定為預設策略
create retention policy "rp_name" on "db_name" duration 3w replication 1 default
#修改資料儲存策略
alter retention policy "rp_name" on "db_name" duration 30d default
#刪除資料儲存策略
drop retention policy "rp_name"
#檢視資料庫的連續查詢(Continous Queries)
show continuous queries
#建立新的連續查詢(Continous Queries)
#cq_name:連續查詢名字
#db_name:資料庫名字
#sum(count):計算總和
#table_name:當前表名
#new_table_name:存新的資料的表名
#30m:時間間隔為30分鐘
create continous query cq_name on db_name begin select sum(count) into new_table_name from table_name group by time(30m) end
#刪除連續查詢
drop continous query cp_name on db_name
程式設計
.net core
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using InfluxData.Net.Common.Enums;
using InfluxData.Net.InfluxDb;
using InfluxData.Net.InfluxDb.Models;
using Microsoft.AspNetCore.Mvc;
namespace WebApplication1.Controllers
{
public class InfoController : Controller
{
//宣告InfluxDbClient
private InfluxDbClient clientDb;
public InfoController()
{
//連線InfluxDb的API地址、賬號、密碼
var infuxUrl = "http://localhost:8086/";
var infuxUser = "admin";
var infuxPwd = "admin";
//建立InfluxDbClient例項
clientDb = new InfluxDbClient(infuxUrl, infuxUser, infuxPwd, InfluxDbVersion.Latest);
}
/// <summary>
/// 從InfluxDB中讀取資料
/// </summary>
public async void GetData()
{
//傳入查詢命令,支援多條
var queries = new[]
{
" SELECT * FROM Reading WHERE time> now() - 24h "
};
var dbName = "code-hub";
//從指定庫中查詢資料
var response = await clientDb.Client.QueryAsync(queries, dbName);
//得到Serie集合物件(返回執行多個查詢的結果)
var series = response.ToList();
//取出第一條命令的查詢結果,是一個集合
var list = series[0].Values;
//從集合中取出第一條資料
var info_model = list.FirstOrDefault();
}
/// <summary>
/// 往InfluxDB中寫入資料
/// </summary>
public async void AddData()
{
//基於InfluxData.Net.InfluxDb.Models.Point實體準備資料
var point_model = new Point()
{
Name = "Reading",//表名
Tags = new Dictionary<string, object>()
{
{ "Id", 158}
},
Fields = new Dictionary<string, object>()
{
{ "Val", "webInfo" }
},
Timestamp = DateTime.UtcNow
};
var dbName = "code-hub";
//從指定庫中寫入資料,支援傳入多個物件的集合
var response = await clientDb.Client.WriteAsync(point_model, dbName);
}
}
}
Java spring-boot
pom配置
<!-- https://mvnrepository.com/artifact/org.influxdb/influxdb-java -->
<dependency>
<groupId>org.influxdb</groupId>
<artifactId>influxdb-java</artifactId>
<version>2.14</version>
</dependency>
主方法
package com.example.demo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import java.util.Random;
@SpringBootApplication
@EnableScheduling
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
@Scheduled(fixedRate = 1000)
public void doInsert(){
Random random = new Random();
InfluxDBDemo.insert(random.nextInt(1000));
}
}
工具類:
package com.example.demo;
import org.influxdb.InfluxDB;
import org.influxdb.InfluxDBFactory;
import org.influxdb.dto.Point;
public class InfluxDBDemo {
public static void insert(int num){
InfluxDB db = InfluxDBFactory.connect("http://192.168.192.128:8086", "admin", "admin");
db.setDatabase("my_test"); // 設定資料庫
Point.Builder builder = Point.measurement("test_measurement"); // 建立Builder,設定表名
builder.addField("count",num); // 新增Field
builder.tag("TAG_CODE","TAG_VALUE_" + num); // 新增Tag
Point point = builder.build();
db.write(point);
}
}
總結
1、InfluxDB是個專業的時序資料庫,通過時序庫可幫助我們更高效的處理應用中的時序資料。
2、使用InfluxDB庫時需先了解該庫的一些特色功能,如資料儲存策略、連續查詢等。
3、通過“InfluxData.Net”類庫可快速簡便的幫助我們在ASP.NET Core2程式中實現對InfluxDB的讀寫操作,Java也是支援的。
4、提供Docker版本的InfluxDB映象。