1. 程式人生 > 其它 >圖資料庫的易用性—GES與Flink的對接

圖資料庫的易用性—GES與Flink的對接

數字化時代,業務的實時處理需求越來越迫切,實時預警、實時風控、實時推薦等,Flink作為新一代流批統一的計算引擎,具有獨特的天然流式計算特性和更為先進的架構設計的特點,它可以從不同的第三方儲存引擎中讀取資料,進行處理,然後再寫出到另外的儲存引擎中。

GES擁抱變化,開發了與Flink的對接工具GES-Flink-Connector。GES-Flink-Connector是一款自定義的離線/實時資料同步Flink聯結器(connector),用於外部資料來源與GES圖資料庫的資料同步。Connector的作用就相當於一個聯結器,連線 Flink 計算引擎跟外界儲存系統。GES-Flink-Connector具備流批統一的能力,對於離線計算與流計算的資料都可以寫入GES圖資料庫中。利用Flink聯結器機制,只要實現了資料來源的Source Connector讀取資料,就可以通過GES-Flink-Connector將資料進行自定義轉換並匯入到GES圖資料庫中。

GES-Flink-Connector的架構圖如下所示:

功能介紹

GES-Flink-Connector具備如下能力:

  • 流批統一,支援流資料與批資料
  • 資料匯入支援三種提交模式,批量提交、間隔提交、混合提交
  • 利用Flink提供的Checkpoint機制,具備一定的容錯能力
  • 具備匯入失敗處理能力,批匯入失敗轉單條匯入,單條匯入失敗轉儲存
  • 具備髒資料發現能力,驗證屬性數量是否符合要求,驗證label是否存在
  • 具備髒資料和錯誤資料儲存能力,可將資料儲存到LOCAL、OBS、HDFS
  • 具備錯誤資料限制能力,當錯誤率達到一定上限時,停止任務

使用案例介紹

將離線資料匯入GES

以向GES中匯入JDBC離線資料為例,操作步驟如下:

  1. 將GES-Flink-Connector jar包打入本地maven倉庫
mvn install:install-file -DgroupId=com.huawei.ges -DartifactId=ges-flink-connector -Dversion=1.0.0 -Dpackaging=jar -Dfile=../jars/ges-flink-connector-1.0.0.jar
  1. 新增相關maven依賴(flink版本需高於1.7.2)
<dependency>
    <groupId>com.huawei.ges</groupId>
    <artifactId>ges-flink-connector</artifactId>
    <version>1.0.0</version>
</dependency>
  1. 配置相關引數

  2. 編寫資料轉換方法

// T is your data type
public class GraphStringDataConverter implements GraphDataConverter<T> {
    /**
     * Your convert method.
     * Separate your data fields with commas
     * e.g.
     * vertex
     * id, label, property 1, property 2,…
     * edge
     * id 1, id 2, label, property 1, property 2, …
     *
     * @param t your data
     * @return format string
     */
    @Override
    public String convert(T t) {
        // Implement your transformation method
        String s = ...
        return s;
    }
}
  1. 建立flink任務
// ------------------------flink環境建立----------------------------------
// 建立flink流資料環境
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
// 設定並行度
environment.setParallelism(CONCURRENT_COUNT);
// 開啟checkpoint 設定checkpoint時間間隔與checkpoint模式
environment.enableCheckpointing(1000, CheckpointingMode.AT_LEAST_ONCE);

// -------------------------資料來源獲取-------------------------------------
// table schema
TypeInformation[] fieldTypes = new TypeInformation[]{
    // id
    BasicTypeInfo.INT_TYPE_INFO,
    // label
    BasicTypeInfo.STRING_TYPE_INFO,
    // property 1
    BasicTypeInfo.STRING_TYPE_INFO
   	// ...
};
RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes);
// query sql
String querySql = "select * from {$your_table_name}";

// 資料來源獲取,JDBCInputFormat 讀出來資料為flink Row型別
DataStream<Row> dataSource =
    environment.createInput(
    JDBCInputFormat.buildJDBCInputFormat()
    .setDrivername("com.mysql.jdbc.Driver")
    .setDBUrl("your_mysql_jdbc_url")
    .setUsername("you_mysql_username")
    .setPassword("you_mysql_password")
    .setQuery(querySql)
    .setRowTypeInfo(rowTypeInfo)
    .finish());

// -------------------------輸出源配置---------------------------------------
// 讀取配置檔案
Properties gesProp = new Properties();
InputStream in = GraphFlinkConnectorJdbcVertexExample.class.getClassLoader().getResourceAsStream("config.properties");
gesProp.load(in);

// 建立flink Row資料轉為要求的逗號分隔字串的策略
GraphDataConverter<Row> graphRowDataConvert = new GraphRowDataConvert();
GraphDataConvertStrategy<Row, GraphDataConverter<Row>> rowConvertStrategy =
    new GraphDataConvertStrategy<>(graphRowDataConvert);
// 建立batch輸出方法,並新增轉化策略與配置檔案
GraphBatchOutputFormat<Row> outputFormat = new GraphBatchOutputFormat<>(rowConvertStrategy, gesProp);
// 建立sink輸出方法
GraphSinkFunction<Row> sinkFunction = new GraphSinkFunction<>(outputFormat);
// 為資料來源新增輸出方法
dataSource.addSink(sinkFunction).setParallelism(CONCURRENT_COUNT);
// 啟動flink
environment.execute();

通過DLI與雲上資料來源互動

GES-Flink-Connector-DLI版本用於雲上DLI Flink佇列,採用Flink SQL的方式完成資料到GES的匯入,操作步驟如下:

  1. 修改jar包內config.properties引數配置

  2. 將jar包匯入OBS

  3. DLI建立程式包(資料管理-程式包管理-建立程式包)

  4. DLI購買佇列並建立Flink作業

  5. 建立DLI Flink佇列與GES圖服務的對等連線(跨源連線-建立連線)

將vpc設定為GES圖引擎服務的同一個vpc,並測試地址連通性。

  1. 編輯Flink SQL
# SOURCE表示資料來源,可以是DLI支援的任意資料來源
CREATE SOURCE STREAM v_labels (
  id STRING,
  label STRING,
  uuid STRING,
  d1 STRING,
  d2 STRING
) WITH (
  type = "obs",
  bucket = "your bucket",
  region = "your region",
  object_name = "your file",
  row_delimiter = "\n",
  field_delimiter = ","
);

# SINK表示輸出源 為GES圖資料庫
CREATE SINK STREAM ges_sink (
  id STRING,
  label STRING,
  uuid STRING,
  d1 STRING,
  d2 STRING
) WITH (
  type = "user_defined",
  type_class_name = "com.huawei.ges.flink.connector.sink.GraphSinkFunction", -- 指定sinkFunction
  type_class_parameter = ""
);

# Some data processing
...

# 執行資料由輸入源匯入輸出源
INSERT INTO
  ges_sink
SELECT
  * -- 選擇想要輸出的欄位
FROM
  v_labels;