1. 程式人生 > >Spark Connector Reader 原理與實踐

Spark Connector Reader 原理與實踐

![nebula-spark-connector-reader](https://www-cdn.nebula-graph.com.cn/nebula-blog/Draft/nebula-spark-connector-reader.png) 本文主要講述如何利用 Spark Connector 進行 Nebula Graph 資料的讀取。 ## Spark Connector 簡介 Spark Connector 是一個 Spark 的資料聯結器,可以通過該聯結器進行外部資料系統的讀寫操作,Spark Connector 包含兩部分,分別是 Reader 和 Writer,而本文側重介紹 Spark Connector Reader,Writer 部分將在下篇和大家詳聊。 ## Spark Connector Reader 原理 Spark Connector Reader 是將 Nebula Graph 作為 Spark 的擴充套件資料來源,從 Nebula Graph 中將資料讀成 DataFrame,再進行後續的 map、reduce 等操作。 [Spark SQL ](https://spark.apache.org/sql/)允許使用者自定義資料來源,支援對外部資料來源進行擴充套件。通過 Spark SQL 讀取的資料格式是以命名列方式組織的分散式資料集 DataFrame,Spark SQL 本身也提供了眾多 API 方便使用者對 DataFrame 進行計算和轉換,能對多種資料來源使用 DataFrame 介面。 Spark 呼叫外部資料來源包的是 `org.apache.spark.sql`,首先了解下 Spark SQL 提供的的擴充套件資料來源相關的介面。 ### Basic Interfaces - BaseRelation:表示具有已知 Schema 的元組集合。所有繼承 BaseRelation 的子類都必須生成 StructType 格式的 Schema。換句話說,BaseRelation 定義了從資料來源中讀取的資料在 Spark SQL 的 DataFrame 中儲存的資料格式的。 - RelationProvider:獲取引數列表,根據給定的引數返回一個新的 BaseRelation。 - DataSourceRegister:註冊資料來源的簡寫,在使用資料來源時不用寫資料來源的全限定類名,而只需要寫自定義的 shortName 即可。 ### Providers - RelationProvider:從指定資料來源中生成自定義的 relation。 `createRelation()`  會基於給定的 Params 引數生成新的 relation。 - SchemaRelationProvider:可以基於給定的 Params 引數和給定的 Schema 資訊生成新的 Relation。 ### RDD - RDD[InternalRow]: 從資料來源中 Scan 出來後需要構造成 RDD[Row] 要實現自定義 Spark 外部資料來源,需要根據資料來源自定義上述部分方法。 在 Nebula Graph 的 Spark Connector 中,我們實現了將 Nebula Graph 作為 Spark SQL 的外部資料來源,通過 `sparkSession.read` 形式進行資料的讀取。該功能實現的類圖展示如下: ![](https://oscimg.oschina.net/oscnet/up-330d94016a546897d01fa1bb67c6e4ca2c9.png) 1. 定義資料來源 NebulaRelatioProvider,繼承 RelationProvider 進行 relation 自定義,繼承 DataSourceRegister 進行外部資料來源的註冊。 1. 定義 NebulaRelation 定義 Nebula Graph 的資料 Schema 和資料轉換方法。在 `getSchema()` 方法中連線 Nebula Graph 的 Meta 服務獲取配置的返回欄位對應的 Schema 資訊。 1. 定義 NebulaRDD 進行 Nebula Graph 資料的讀取。 `compute()` 方法中定義如何讀取 Nebula Graph 資料,主要涉及到進行 Nebula Graph 資料 Scan、將讀到的 Nebula Graph Row 資料轉換為 Spark 的 InternalRow 資料,以 InternalRow 組成 RDD 的一行,其中每一個 InternalRow 表示 Nebula Graph 中的一行資料,最終通過分割槽迭代的形式將 Nebula Graph 所有資料讀出組裝成最終的 DataFrame 結果資料。 ## Spark Connector Reader 實踐 Spark Connector 的 Reader 功能提供了一個介面供使用者程式設計進行資料讀取。一次讀取一個點/邊型別的資料,讀取結果為 DataFrame。 下面開始實踐,拉取 GitHub 上 Spark Connector 程式碼: ```shell git clone -b v1.0 [email protected]:vesoft-inc/nebula-java.git cd nebula-java/tools/nebula-spark mvn clean compile package install -Dgpg.skip -Dmaven.javadoc.skip=true ``` 將編譯打成的包 copy 到本地 Maven 庫。 應用示例如下: 1. 在 mvn 專案的 pom 檔案中加入 `nebula-spark` 依賴 ```shell ``` 2. 在 Spark 程式中讀取 Nebula Graph 資料: ```shell // 讀取 Nebula Graph 點資料 val vertexDataset: Dataset[Row] = spark.read .nebula("127.0.0.1:45500", "spaceName", "100") .loadVerticesToDF("tag", "field1,field2") vertexDataset.show() // 讀取 Nebula Graph 邊資料 val edgeDataset: Dataset[Row] = spark.read .nebula("127.0.0.1:45500", "spaceName", "100") .loadEdgesToDF("edge", "*") edgeDataset.show() ``` 配置說明: - nebula(address: String, space: String, partitionNum: String) ```markdown address:可以配置多個地址,以英文逗號分割,如“ip1:45500,ip2:45500” space: Nebula Graph 的 graphSpace partitionNum: 設定spark讀取Nebula時的partition數,儘量使用建立 Space 時指定的 Nebula Graph 中的 partitionNum,可確保一個Spark的partition讀取Nebula Graph一個part的資料。 ``` - loadVertices(tag: String, fields: String) ```markdown tag:Nebula Graph 中點的 Tag fields:該 Tag 中的欄位,,多欄位名以英文逗號分隔。表示只讀取 fields 中的欄位,* 表示讀取全部欄位 ``` - loadEdges(edge: String, fields: String) ```markdown edge:Nebula Graph 中邊的 Edge fields:該 Edge 中的欄位,多欄位名以英文逗號分隔。表示只讀取 fields 中的欄位,* 表示讀取全部欄位 ``` ## 其他 Spark Connector Reader 的 GitHub 程式碼:[https://github.com/vesoft-inc/nebula-java/tree/master/tools/nebula-spark](https://github.com/vesoft-inc/nebula-java/tree/master/tools/nebula-spark) 在此特別感謝半雲科技所貢獻的 Spark Connector 的 Java 版本 ## 參考資料 [1] [Extending Spark Datasource API: write a custom spark datasource](http://sparkdatasourceapi.blogspot.com/2016/10/spark-data-source-api-write-custom.html) [2] [spark external datasource source code](https://github.com/apache/spark/tree/master/external) 喜歡這篇文章?來來來,給我們的 [GitHub](https://github.com/vesoft-inc/nebula) 點個 star 表鼓勵啦~~