Flink流計算與時序資料庫Influxdb+grafana
阿新 • • 發佈:2019-02-03
1、簡介
關於Influxdb和grafana,可以參考:介紹或者influxdb官方文件,grafana官方文件。這裡預設已經將influxdb和grafana安裝完成。
2、Flink sink to Influxdb
influxdb不屬於Flink內建的第三方connector,因此需要自定義addSink()方法:
txStream.addSink(new InfluxDBSink("transaction"))
而InfluxDBSink需要實現SinkFunction或RichSinkFunction,例如:
class InfluxDBSink(measurement : String) extends RichSinkFunction[TX]{
private val dataBaseName = "influxDemo"
var influxDB : InfluxDB = null
override def open(parameters : Configuration) : Unit = {
super.open(parameters)
influxDB = InfluxDBFactory.connect("http://data1:8086", "admin", "admin")
influxDB.createDatabase(dataBaseName)
influxDB.enableBatch(2000 , 100, TimeUnit.MILLISECONDS)
}
override def close() : Unit = {
super.close()
}
override def invoke(in: TX): Unit = {
val builder = Point.measurement(measurement)
.time(System.currentTimeMillis(), TimeUnit.MILLISECONDS)
.tag("code", in.code)
.addField("nIndex", in.nindex)
.addField("price" , in.price)
.addField("volume", in.volume)
.addField("value", in.value)
val p = builder.build()
influxDB.write(dataBaseName,"autogen",p)
}
說明:
influxdb中,measurement相當於一個“表”;
一個記錄被當做一個point,time()方法表示一個point的時間,這也是時間序列資料庫必須的欄位,可以用系統時間作為插入的時間,也可以用event time作為point的時間;
field是必須的欄位,即一個列;
而tag是可選的,是要被索引的欄位,當需要對某個欄位進行索引時,需要設定為一個tag;
“autogen”代表預設的保留策略。
寫入時,可以一條寫入,也可以批量寫入。此例中代表逐條寫入。
結果可以通過influxdb提供的webUI以及grafana實時展示:
詳細的程式碼見:這裡
參考