Spark Connector Reader 原理與實踐
阿新 • • 發佈:2020-12-17
![nebula-spark-connector-reader](https://www-cdn.nebula-graph.com.cn/nebula-blog/Draft/nebula-spark-connector-reader.png)
本文主要講述如何利用 Spark Connector 進行 Nebula Graph 資料的讀取。
## Spark Connector 簡介
Spark Connector 是一個 Spark 的資料聯結器,可以通過該聯結器進行外部資料系統的讀寫操作,Spark Connector 包含兩部分,分別是 Reader 和 Writer,而本文側重介紹 Spark Connector Reader,Writer 部分將在下篇和大家詳聊。
## Spark Connector Reader 原理
Spark Connector Reader 是將 Nebula Graph 作為 Spark 的擴充套件資料來源,從 Nebula Graph 中將資料讀成 DataFrame,再進行後續的 map、reduce 等操作。
[Spark SQL ](https://spark.apache.org/sql/)允許使用者自定義資料來源,支援對外部資料來源進行擴充套件。通過 Spark SQL 讀取的資料格式是以命名列方式組織的分散式資料集 DataFrame,Spark SQL 本身也提供了眾多 API 方便使用者對 DataFrame 進行計算和轉換,能對多種資料來源使用 DataFrame 介面。
Spark 呼叫外部資料來源包的是 `org.apache.spark.sql`,首先了解下 Spark SQL 提供的的擴充套件資料來源相關的介面。
### Basic Interfaces
- BaseRelation:表示具有已知 Schema 的元組集合。所有繼承 BaseRelation 的子類都必須生成 StructType 格式的 Schema。換句話說,BaseRelation 定義了從資料來源中讀取的資料在 Spark SQL 的 DataFrame 中儲存的資料格式的。
- RelationProvider:獲取引數列表,根據給定的引數返回一個新的 BaseRelation。
- DataSourceRegister:註冊資料來源的簡寫,在使用資料來源時不用寫資料來源的全限定類名,而只需要寫自定義的 shortName 即可。
### Providers
- RelationProvider:從指定資料來源中生成自定義的 relation。 `createRelation()` 會基於給定的 Params 引數生成新的 relation。
- SchemaRelationProvider:可以基於給定的 Params 引數和給定的 Schema 資訊生成新的 Relation。
### RDD
- RDD[InternalRow]: 從資料來源中 Scan 出來後需要構造成 RDD[Row]
要實現自定義 Spark 外部資料來源,需要根據資料來源自定義上述部分方法。
在 Nebula Graph 的 Spark Connector 中,我們實現了將 Nebula Graph 作為 Spark SQL 的外部資料來源,通過 `sparkSession.read` 形式進行資料的讀取。該功能實現的類圖展示如下:
![](https://oscimg.oschina.net/oscnet/up-330d94016a546897d01fa1bb67c6e4ca2c9.png)
1. 定義資料來源 NebulaRelatioProvider,繼承 RelationProvider 進行 relation 自定義,繼承 DataSourceRegister 進行外部資料來源的註冊。
1. 定義 NebulaRelation 定義 Nebula Graph 的資料 Schema 和資料轉換方法。在 `getSchema()` 方法中連線 Nebula Graph 的 Meta 服務獲取配置的返回欄位對應的 Schema 資訊。
1. 定義 NebulaRDD 進行 Nebula Graph 資料的讀取。 `compute()` 方法中定義如何讀取 Nebula Graph 資料,主要涉及到進行 Nebula Graph 資料 Scan、將讀到的 Nebula Graph Row 資料轉換為 Spark 的 InternalRow 資料,以 InternalRow 組成 RDD 的一行,其中每一個 InternalRow 表示 Nebula Graph 中的一行資料,最終通過分割槽迭代的形式將 Nebula Graph 所有資料讀出組裝成最終的 DataFrame 結果資料。
## Spark Connector Reader 實踐
Spark Connector 的 Reader 功能提供了一個介面供使用者程式設計進行資料讀取。一次讀取一個點/邊型別的資料,讀取結果為 DataFrame。
下面開始實踐,拉取 GitHub 上 Spark Connector 程式碼:
```shell
git clone -b v1.0 [email protected]:vesoft-inc/nebula-java.git
cd nebula-java/tools/nebula-spark
mvn clean compile package install -Dgpg.skip -Dmaven.javadoc.skip=true
```
將編譯打成的包 copy 到本地 Maven 庫。
應用示例如下:
1. 在 mvn 專案的 pom 檔案中加入 `nebula-spark` 依賴
```shell
```
2. 在 Spark 程式中讀取 Nebula Graph 資料:
```shell
// 讀取 Nebula Graph 點資料
val vertexDataset: Dataset[Row] =
spark.read
.nebula("127.0.0.1:45500", "spaceName", "100")
.loadVerticesToDF("tag", "field1,field2")
vertexDataset.show()
// 讀取 Nebula Graph 邊資料
val edgeDataset: Dataset[Row] =
spark.read
.nebula("127.0.0.1:45500", "spaceName", "100")
.loadEdgesToDF("edge", "*")
edgeDataset.show()
```
配置說明:
- nebula(address: String, space: String, partitionNum: String)
```markdown
address:可以配置多個地址,以英文逗號分割,如“ip1:45500,ip2:45500”
space: Nebula Graph 的 graphSpace
partitionNum: 設定spark讀取Nebula時的partition數,儘量使用建立 Space 時指定的 Nebula Graph 中的 partitionNum,可確保一個Spark的partition讀取Nebula Graph一個part的資料。
```
- loadVertices(tag: String, fields: String)
```markdown
tag:Nebula Graph 中點的 Tag
fields:該 Tag 中的欄位,,多欄位名以英文逗號分隔。表示只讀取 fields 中的欄位,* 表示讀取全部欄位
```
- loadEdges(edge: String, fields: String)
```markdown
edge:Nebula Graph 中邊的 Edge
fields:該 Edge 中的欄位,多欄位名以英文逗號分隔。表示只讀取 fields 中的欄位,* 表示讀取全部欄位
```
## 其他
Spark Connector Reader 的 GitHub 程式碼:[https://github.com/vesoft-inc/nebula-java/tree/master/tools/nebula-spark](https://github.com/vesoft-inc/nebula-java/tree/master/tools/nebula-spark)
在此特別感謝半雲科技所貢獻的 Spark Connector 的 Java 版本
## 參考資料
[1] [Extending Spark Datasource API: write a custom spark datasource](http://sparkdatasourceapi.blogspot.com/2016/10/spark-data-source-api-write-custom.html)
[2] [spark external datasource source code](https://github.com/apache/spark/tree/master/external)
喜歡這篇文章?來來來,給我們的 [GitHub](https://github.com/vesoft-inc/nebula) 點個 star 表鼓勵啦~~