圖資料庫的易用性—GES與Flink的對接
數字化時代,業務的實時處理需求越來越迫切,實時預警、實時風控、實時推薦等,Flink作為新一代流批統一的計算引擎,具有獨特的天然流式計算特性和更為先進的架構設計的特點,它可以從不同的第三方儲存引擎中讀取資料,進行處理,然後再寫出到另外的儲存引擎中。
GES擁抱變化,開發了與Flink的對接工具GES-Flink-Connector。GES-Flink-Connector是一款自定義的離線/實時資料同步Flink聯結器(connector),用於外部資料來源與GES圖資料庫的資料同步。Connector的作用就相當於一個聯結器,連線 Flink 計算引擎跟外界儲存系統。GES-Flink-Connector具備流批統一的能力,對於離線計算與流計算的資料都可以寫入GES圖資料庫中。利用Flink聯結器機制,只要實現了資料來源的Source Connector讀取資料,就可以通過GES-Flink-Connector將資料進行自定義轉換並匯入到GES圖資料庫中。
GES-Flink-Connector的架構圖如下所示:
功能介紹
GES-Flink-Connector具備如下能力:
- 流批統一,支援流資料與批資料
- 資料匯入支援三種提交模式,批量提交、間隔提交、混合提交
- 利用Flink提供的Checkpoint機制,具備一定的容錯能力
- 具備匯入失敗處理能力,批匯入失敗轉單條匯入,單條匯入失敗轉儲存
- 具備髒資料發現能力,驗證屬性數量是否符合要求,驗證label是否存在
- 具備髒資料和錯誤資料儲存能力,可將資料儲存到LOCAL、OBS、HDFS
- 具備錯誤資料限制能力,當錯誤率達到一定上限時,停止任務
使用案例介紹
將離線資料匯入GES
以向GES中匯入JDBC離線資料為例,操作步驟如下:
- 將GES-Flink-Connector jar包打入本地maven倉庫
mvn install:install-file -DgroupId=com.huawei.ges -DartifactId=ges-flink-connector -Dversion=1.0.0 -Dpackaging=jar -Dfile=../jars/ges-flink-connector-1.0.0.jar
- 新增相關maven依賴(flink版本需高於1.7.2)
<dependency> <groupId>com.huawei.ges</groupId> <artifactId>ges-flink-connector</artifactId> <version>1.0.0</version> </dependency>
-
配置相關引數
-
編寫資料轉換方法
// T is your data type
public class GraphStringDataConverter implements GraphDataConverter<T> {
/**
* Your convert method.
* Separate your data fields with commas
* e.g.
* vertex
* id, label, property 1, property 2,…
* edge
* id 1, id 2, label, property 1, property 2, …
*
* @param t your data
* @return format string
*/
@Override
public String convert(T t) {
// Implement your transformation method
String s = ...
return s;
}
}
- 建立flink任務
// ------------------------flink環境建立----------------------------------
// 建立flink流資料環境
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
// 設定並行度
environment.setParallelism(CONCURRENT_COUNT);
// 開啟checkpoint 設定checkpoint時間間隔與checkpoint模式
environment.enableCheckpointing(1000, CheckpointingMode.AT_LEAST_ONCE);
// -------------------------資料來源獲取-------------------------------------
// table schema
TypeInformation[] fieldTypes = new TypeInformation[]{
// id
BasicTypeInfo.INT_TYPE_INFO,
// label
BasicTypeInfo.STRING_TYPE_INFO,
// property 1
BasicTypeInfo.STRING_TYPE_INFO
// ...
};
RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes);
// query sql
String querySql = "select * from {$your_table_name}";
// 資料來源獲取,JDBCInputFormat 讀出來資料為flink Row型別
DataStream<Row> dataSource =
environment.createInput(
JDBCInputFormat.buildJDBCInputFormat()
.setDrivername("com.mysql.jdbc.Driver")
.setDBUrl("your_mysql_jdbc_url")
.setUsername("you_mysql_username")
.setPassword("you_mysql_password")
.setQuery(querySql)
.setRowTypeInfo(rowTypeInfo)
.finish());
// -------------------------輸出源配置---------------------------------------
// 讀取配置檔案
Properties gesProp = new Properties();
InputStream in = GraphFlinkConnectorJdbcVertexExample.class.getClassLoader().getResourceAsStream("config.properties");
gesProp.load(in);
// 建立flink Row資料轉為要求的逗號分隔字串的策略
GraphDataConverter<Row> graphRowDataConvert = new GraphRowDataConvert();
GraphDataConvertStrategy<Row, GraphDataConverter<Row>> rowConvertStrategy =
new GraphDataConvertStrategy<>(graphRowDataConvert);
// 建立batch輸出方法,並新增轉化策略與配置檔案
GraphBatchOutputFormat<Row> outputFormat = new GraphBatchOutputFormat<>(rowConvertStrategy, gesProp);
// 建立sink輸出方法
GraphSinkFunction<Row> sinkFunction = new GraphSinkFunction<>(outputFormat);
// 為資料來源新增輸出方法
dataSource.addSink(sinkFunction).setParallelism(CONCURRENT_COUNT);
// 啟動flink
environment.execute();
通過DLI與雲上資料來源互動
GES-Flink-Connector-DLI版本用於雲上DLI Flink佇列,採用Flink SQL的方式完成資料到GES的匯入,操作步驟如下:
-
修改jar包內config.properties引數配置
-
將jar包匯入OBS
-
DLI建立程式包(資料管理-程式包管理-建立程式包)
-
DLI購買佇列並建立Flink作業
-
建立DLI Flink佇列與GES圖服務的對等連線(跨源連線-建立連線)
將vpc設定為GES圖引擎服務的同一個vpc,並測試地址連通性。
- 編輯Flink SQL
# SOURCE表示資料來源,可以是DLI支援的任意資料來源
CREATE SOURCE STREAM v_labels (
id STRING,
label STRING,
uuid STRING,
d1 STRING,
d2 STRING
) WITH (
type = "obs",
bucket = "your bucket",
region = "your region",
object_name = "your file",
row_delimiter = "\n",
field_delimiter = ","
);
# SINK表示輸出源 為GES圖資料庫
CREATE SINK STREAM ges_sink (
id STRING,
label STRING,
uuid STRING,
d1 STRING,
d2 STRING
) WITH (
type = "user_defined",
type_class_name = "com.huawei.ges.flink.connector.sink.GraphSinkFunction", -- 指定sinkFunction
type_class_parameter = ""
);
# Some data processing
...
# 執行資料由輸入源匯入輸出源
INSERT INTO
ges_sink
SELECT
* -- 選擇想要輸出的欄位
FROM
v_labels;