1. 程式人生 > 其它 >【技術乾貨】程式碼示例:使用 Apache Flink 連線 TDengine

【技術乾貨】程式碼示例:使用 Apache Flink 連線 TDengine

小 T 導讀:想用 Flink 對接 TDengine?保姆級教程來了。

0、前言

TDengine 是由濤思資料開發並開源的一款高效能、分散式、支援 SQL 的時序資料庫(Time-Series Database)。

除了核心的時序資料庫功能外,TDengine 還提供快取資料訂閱流式計算等大資料平臺所需要的系列功能。但是很多小夥伴出於架構的考慮,還是需要將資料匯出到 Apache Flink、Apache Spark 等平臺進行計算分析。

為了幫助大家對接,我們特別推出了保姆級課程,包學包會。

1、技術實現

Apache Flink 提供了 SourceFunction 和 SinkFunction,用來提供 Flink 和外部資料來源的連線,其中 SouceFunction 為從資料來源讀取資料,SinkFunction 為將資料寫入資料來源。 與此同時,Flink 提供了 RichSourceFunction 和 RichSinkFunction 這兩個類(繼承自

AbstractRichFunction),提供了額外的初始化(open(Configuration))和銷燬方法(close())。 通過重寫這兩個方法,可以避免每次讀寫資料時都重新建立連線。

2、程式碼實現

完整原始碼:https://github.com/liuyq-617/TD-Flink

程式碼邏輯:

1) 自定義類 SourceFromTDengine

用途:資料來源連線,資料讀取

  1. package com.taosdata.flink;
  2. import org.apache.flink.configuration.Configuration;
  3. import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
  4. import com.taosdata.model.Sensor;
  5. import java.sql.*;
  6. import java.util.Properties;
  7. public class SourceFromTDengine extends RichSourceFunction<Sensor> {
  8. Statement statement;
  9. private Connection connection;
  10. private String property;
  11. public SourceFromTDengine(){
  12. super();
  13. }
  14. @Override
  15. public void open(Configuration parameters) throws Exception {
  16. super.open(parameters);
  17. String driver = "com.taosdata.jdbc.rs.RestfulDriver";
  18. String host = "u05";
  19. String username = "root";
  20. String password = "taosdata";
  21. String prop = System.getProperty("java.library.path");
  22. Logger LOG = LoggerFactory.getLogger(SourceFromTDengine.class);
  23. LOG.info("java.library.path:{}", prop);
  24. System.out.println(prop);
  25. Class.forName( driver );
  26. Properties properties = new Properties();
  27. connection = DriverManager.getConnection("jdbc:TAOS-RS://" + host + ":6041/tt" + "?user=root&password=taosdata"
  28. , properties);
  29. statement = connection.createStatement();
  30. }
  31. @Override
  32. public void close() throws Exception {
  33. super.close();
  34. if (connection != null) {
  35. connection.close();
  36. }
  37. if (statement != null) {
  38. statement.close();
  39. }
  40. }
  41. @Override
  42. public void run(SourceContext<Sensor> sourceContext) throws Exception {
  43. try {
  44. String sql = "select * from tt.meters";
  45. ResultSet resultSet = statement.executeQuery(sql);
  46. while (resultSet.next()) {
  47. Sensor sensor = new Sensor( resultSet.getLong(1),
  48. resultSet.getInt( "vol" ),
  49. resultSet.getFloat( "current" ),
  50. resultSet.getString( "location" ).trim());
  51. sourceContext.collect( sensor );
  52. }
  53. } catch (Exception e) {
  54. e.printStackTrace();
  55. }
  56. }
  57. @Override
  58. public void cancel() {
  59. }
  60. }

2) 自定義類 SinkToTDengine

用途:資料來源連線,資料寫入

SinkToTDengine

  1. package com.taosdata.flink;
  2. import org.apache.flink.configuration.Configuration;
  3. import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
  4. import com.taosdata.model.Sensor;
  5. import java.sql.*;
  6. import java.util.Properties;
  7. public class SinkToTDengine extends RichSinkFunction<Sensor> {
  8. Statement statement;
  9. private Connection connection;
  10. @Override
  11. public void open(Configuration parameters) throws Exception {
  12. super.open(parameters);
  13. String driver = "com.taosdata.jdbc.rs.RestfulDriver";
  14. String host = "TAOS-FQDN";
  15. String username = "root";
  16. String password = "taosdata";
  17. String prop = System.getProperty("java.library.path");
  18. System.out.println(prop);
  19. Class.forName( driver );
  20. Properties properties = new Properties();
  21. connection = DriverManager.getConnection("jdbc:TAOS-RS://" + host + ":6041/tt" + "?user=root&password=taosdata"
  22. , properties);
  23. statement = connection.createStatement();
  24. }
  25. @Override
  26. public void close() throws Exception {
  27. super.close();
  28. if (connection != null) {
  29. connection.close();
  30. }
  31. if (statement != null) {
  32. statement.close();
  33. }
  34. }
  35. @Override
  36. public void invoke(Sensor sensor, Context context) throws Exception {
  37. try {
  38. String sql = String.format("insert into sinktest.%s using sinktest.meters tags('%s') values(%d,%d,%f)",
  39. sensor.getLocation(),
  40. sensor.getLocation(),
  41. sensor.getTs(),
  42. sensor.getVal(),
  43. sensor.getCurrent()
  44. );
  45. statement.executeUpdate(sql);
  46. } catch (Exception e) {
  47. e.printStackTrace();
  48. }
  49. }
  50. }

3) 自定義類 Sensor

用途:定義資料結構,用來接受資料

  1. package com.taosdata.model;
  2. public class Sensor {
  3. public long ts;
  4. public int val;
  5. public float current;
  6. public String location;
  7. public Sensor() {
  8. }
  9. public Sensor(long ts, int val, float current, String location) {
  10. this.ts = ts;
  11. this.val = val;
  12. this.current = current;
  13. this.location = location;
  14. }
  15. public long getTs() {
  16. return ts;
  17. }
  18. public void setTs(long ts) {
  19. this.ts = ts;
  20. }
  21. public int getVal() {
  22. return val;
  23. }
  24. public void setVal(int val) {
  25. this.val = val;
  26. }
  27. public float getCurrent() {
  28. return current;
  29. }
  30. public void setCurrent(float current) {
  31. this.current = current;
  32. }
  33. public String getLocation() {
  34. return location;
  35. }
  36. public void setLocation(String location) {
  37. this.location = location;
  38. }
  39. @Override
  40. public String toString() {
  41. return "Sensor{" +
  42. "ts=" + ts +
  43. ", val=" + val +
  44. ", current=" + current +
  45. ", location='" + location + '\'' +
  46. '}';
  47. }
  48. }

4) 主程式類 ReadFromTDengine

用途:呼叫 Flink 進行讀取和寫入資料

  1. package com.taosdata;
  2. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  3. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  4. import org.apache.flink.api.common.functions.MapFunction;
  5. import org.apache.flink.streaming.api.datastream.DataStream;
  6. import com.taosdata.model.Sensor;
  7. import org.slf4j.LoggerFactory;
  8. import org.slf4j.Logger;
  9. public class ReadFromTDengine {
  10. public static void main(String[] args) throws Exception {
  11. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  12. DataStreamSource<Sensor> SensorList = env.addSource( new com.taosdata.flink.SourceFromTDengine() );
  13. SensorList.print();
  14. SensorList.addSink( new com.taosdata.flink.SinkToTDengine() );
  15. env.execute();
  16. }
  17. }

3、簡單測試 RESTful 介面

1) 環境準備:

a) Flink 安裝&啟動:

b) TDengine Database 環境準備:

  • 建立原始資料: 
    • create database tt;
    • create table `meters` (`ts` TIMESTAMP,`vol` INT,`current` FLOAT) TAGS (`location` BINARY(20));
    • insert into beijing using meters tags(‘beijing’) values(now,220,30.2);
  • 建立目標資料庫表: 
    • create database sinktest;
    • create table `meters` (`ts` TIMESTAMP,`vol` INT,`current` FLOAT) TAGS (`location` BINARY(20));

2) 打包編譯:

原始碼位置: https://github.com/liuyq-617/TD-Flink

mvn clean package

3) 程式啟動:

flink run target/test-flink-1.0-SNAPSHOT-dist.jar

  • 讀取資料 
    • vi log/flink-root-taskexecutor-0-xxxxx.out 
    • 檢視到資料列印:Sensor{ts=1645166073101, val=220, current=5.7, location=’beijing’}
  • 寫入資料 
    • show sinktest.tables; 
      • 已經建立了beijing 子表
    • select * from sinktest.beijing; 
      • 可以查詢到剛插入的資料

4、使用 JNI 方式

舉一反三的小夥伴此時已經猜到,只要把 JDBC URL 修改一下就可以了。

但是 Flink 每次分派作業時都在使用一個新的 ClassLoader,而我們在計算節點上就會得到“Native library already loaded in another classloader”錯誤。

為了避免此問題,可以將 JDBC 的 jar 包放到 Flink 的 lib 目錄下,不去呼叫 dist 包就可以了。

  • cp taos-jdbcdriver-2.0.37-dist.jar /usr/local/flink-1.14.3/lib
  • flink run target/test-flink-1.0-SNAPSHOT.jar

5、小結

通過在專案中引入 SourceFromTDengine 和 SinkToTDengine 兩個類,即可完成在 Flink 中對 TDengine 的讀寫操作。後面我們會有文章介紹 Spark 和 TDengine 的對接。

注:文中使用的是 JDBC 的 RESTful 介面,這樣就不用在 Flink 的節點安裝 TDengine,JNI 方式需要在 Flink 節點安裝 TDengine Database 的客戶端。


想了解更多 TDengine Database的具體細節,歡迎大家在GitHub上檢視相關原始碼。