1. 程式人生 > 其它 >主流開源分散式圖計算框架 Benchmark

主流開源分散式圖計算框架 Benchmark

本文由美團 NLP 團隊高辰、趙登昌撰寫,首發於 Nebula Graph Community 公眾號

  1. 前言

隨著近年來資料的爆炸式增長,如何高效地分析處理資料,在業界一直備受關注。現實世界中的資料往往數量龐大且關係複雜,這些資料中不同個體間彼此互動產生的資料以圖的形式表現最為自然。比如微信的社交網路,是由節點(個人、公眾號)和邊(關注、點贊)構成的圖;淘寶的交易網路,是由節點(個人、商品)和邊(購買、收藏)構成的圖。

圖計算正是研究事物之間的關係,並對其進行完整的刻畫、計算和分析的一門技術。目前,已經有不少公司將圖計算技術應用到了自己的業務場景中(如京東金融的小額借貸業務,搜狗搜尋的搜尋排序系統、工商銀行的信用卡反欺詐系統等),取得了遠超傳統計算的效果。

而美團內部在騎手社交網路、金融反欺詐、裝置風險識別等諸多場景下也有使用圖計算的迫切需求。

圖計算技術可以很好地解決全圖的離線分析問題,但目前在工程落地上依然存在困難。圖計算中存在資料稀疏、頂點冪律分佈、活躍頂點集動態變化、並行通訊開銷大等問題,並不天然具備良好的並行擴充套件能力,設計不良的圖計算框架效能甚至不如單機。

為了滿足美團業務方的超大規模圖計算需求,需要選出一款圖計算框架,作為圖計算平臺的底層引擎。我們結合業務現狀,制定了選型的基本條件:

開源專案,團隊必須擁有對原始碼的控制力,才能保證資料安全和服務可用性。

分散式架構,具備良好的可擴充套件性。

能夠服務 OLAP 場景,高效能產出圖分析結果。

通用的圖計算系統,能提供多種流行的圖演算法,且能方便地定製開發新演算法,以應對多種業務應用場景。

經過廣泛的調研後,我們列舉一些有代表性的圖計算框架如下:

Neo4j-APOC :在圖資料庫的基礎上,支援一些基本圖演算法,分散式版本不開源。

Pregel:Google 在 2009 年提出,是圖計算模型的開山祖師,後續很多工作都受到它的思想影響。不開源。

Giraph:Facebook 基於 Pregel 思想的開源實現。

Gemini:清華大學基於 Pregel 思想進行了多項改進的實現,效能優秀。僅提供免費 Demo,商業版不開源。

KnightKing:針對 Walker 遊走類演算法專門設計的圖計算框架,不具有通用性。

GraphX:Apache 基金會基於 Spark 實現的圖計算框架,社群活躍度較高。

GraphLab(PowerGraph):商業軟體,不開源。已被蘋果收購。

Plato:騰訊基於 Gemini 和 KnightKing 思想的 C++ 開源實現,是一款高效能、可擴充套件、易插拔的圖計算框架。

按照選型的基本條件進行篩選,最終納入評測範圍的框架為:GraphX、Giraph、Plato。

  1. 測試概要

2.1 硬體配置

物理機配置

CPU:48核(Intel(R) Xeon(R) CPU E5-2650 v4 @ 2.20GHz)

記憶體:192GB

硬碟:5,587GB

例項數量:同機房 4 臺

2.2 部署方案

2.2.1 GraphX

系統版本:3.1.2

Spark 版本:3.1.2

GraphX 基於 Spark 平臺執行演算法,每個例項上需要預先啟動 1 個 worker(Spark 的配置引數可參看附錄 5.1.1)。

2.2.2 Giraph

系統版本:1.3.0

Spark 版本:2.7.6

Giraph 依賴 MapReduce 來啟動 Job,各例項需要預先按如下方式部署(Hadoop 配置引數可參看附錄 5.2.1)。

2.2.3 Plato

系統版本:0.1.1

Plato 在發起演算法執行前,會通過 Hydra 為每個例項啟動 1 個程序。

2.3 評測資料集

我們使用不同資料量級的 2 個圖資料集進行評測:分別是Twitter 社交關注關係資料集(twitter-2010:https://law.di.unimi.it/webdata/twitter-2010/ )和網頁連結關係資料集(clueweb-12:https://law.di.unimi.it/webdata/clueweb12/ )。

twitter-2010

圖的有向性:有向圖

點數量:41,652,230

邊數量:1,468,365,182

clueweb-12

圖的有向性:有向圖

點數量:955,207,488

邊數量:42,574,107,469

2.4 評測圖演算法

在 LDBC(關聯資料基準委員會)提出的 Graphalytics 基準演算法中,我們選出了比較典型的 PageRank、connected-component 和 SSSP 作為本評測的圖演算法。

為了確保評測實驗的合理公平性,我們還統一了各框架的演算法執行引數,並重寫了部分框架的演算法實現程式碼,以保證各框架演算法執行結果的等價性(各框架的詳細配置引數及原始碼實現請參看附錄 5.1、附錄 5.2、附錄 5.3)。

2.4.1 PageRank

PageRank 是一種節點中心性指標演算法,用於度量頂點的重要程度。

演算法思路:PageRank 是一個全圖迭代式演算法。圖中每個頂點有 1 個初始 rank值,作為頂點的重要度。演算法每一輪迭代中,所有頂點的 rank 值都會更新。某頂點在一輪迭代中的新 rank 值,由所有指向它的鄰居為它“貢獻”的 rank 值計算得出;而該頂點的新 rank 值,又可以繼續在下輪迭代為它指向的頂點做“貢獻”。當迭代達到指定次數,或者全圖所有頂點的 rank 值變化小於指定閾值時,演算法終止。

演算法統一引數:

最大迭代次數:100

結束閾值:0

2.4.2 connected-component

connected-component 演算法用於識別並切分出一個非連通圖中的所有最大連通子圖。本評測使用的是針對有向圖的單向連通圖演算法。

演算法思路:connected-component 是一個非全圖迭代式演算法。我們使用 label 值來表示頂點所屬的連通子圖。演算法開始時,將每個頂點的 label 值初始化為頂點 id,並都設為啟用態。演算法迭代中,啟用態的頂點會向其指向的鄰居頂點發送自己的 label 值,鄰居頂點判斷如果接收到的 label 值比自己的小,則更新 label,並把自己置為啟用態。當圖中沒有啟用態的頂點,即沒有訊息傳遞時,演算法終止。最終,label 值相同的頂點被劃分在同一個連通子圖。

2.4.3 SSSP

SSSP(Single Source Shortest Path,單源最短路徑)演算法用於計算圖中所有頂點到指定頂點的最短距離。

演算法思路:SSSP 也是一個非全圖迭代式演算法。我們使用dist 表示某頂點到指定源點的最短距離。演算法開始時,源點的 dist 值設為 0,其他頂點的 dist 值初始化為無窮大值,並將源點置為啟用態。演算法迭代中,啟用態的頂點向鄰居傳送自己的 dist 值,鄰居頂點判斷如果接收到的(dist 值 +1)小於自己的 dist 值,則更新 dist,並置為啟用態。當圖中沒有啟用態的頂點,即沒有訊息傳遞時,演算法終止。

演算法統一引數:

源點id:0

  1. 結果及分析

我們分別在單節點(1 node)、兩節點(2 nodes)、四節點(4 nodes)部署模式下,使用 GraphX、Giraph 和 Plato 執行 3 個演算法(PageRank、connected-component、SSSP),並統計了各自的時間消耗和峰值記憶體佔用情況。

下面分兩個資料集進行結果展示及資料分析(詳細評測資料請見附錄5.4)。

3.1 資料集 twitter-2010

3.1.1 測試結果

說明:GraphX 在單節點(1 node)部署模式下,無法在 10h 內完成幾種演算法的執行。因而缺失該情況下的統計資料。

PageRank

圖1. PageRank 演算法,不同數量執行節點下的時間消耗

圖2. PageRank 演算法,不同數量執行節點下的記憶體佔用

connected-component

圖3. connected-component 演算法,不同數量執行節點下的時間消耗

圖4. connected-component 演算法,不同數量執行節點下的記憶體佔用

SSSP

圖5. SSSP 演算法,不同數量執行節點下的時間消耗

圖6. SSSP 演算法,不同數量執行節點下的記憶體佔用

3.1.2 資料分析

GraphX:執行幾種演算法的時間和記憶體消耗都很高。由於依賴的底層資料模型 RDD 的不變性,計算過程中會產生大量新的 RDD 作為中間結果,雖然 GraphX 對不變的頂點和邊進行了一定程度的的複用優化,但框架本身限制還是導致了大量的記憶體佔用和較差的效能。尤其在單節點(1 node)場景下,無法在 10h 內完成幾種演算法的執行。

Giraph:整體效能和記憶體開銷與 GraphX 相當。Giraph 基於 map 容器來儲存圖資料,帶來了很高的記憶體佔用。Giraph 的低效能,一大部分原因在於其點對點直接通訊帶來的高昂代價,尤其當存在大頂點時,向所有鄰居都發送訊息會導致巨大的訊息快取佔用以及通訊開銷,從而引發“長尾”問題,拖慢演算法的整體執行時間。

Plato:切圖將頂點集合按照塊式劃分,並使每個頂點和它的所有出邊/入邊在同 1 個分片上,該原則保證了 Plato 在兩種通訊模式(Pull/Push)下的高效率執行。圖7 為適用於 Pull 通訊模式的切圖方式,將頂點和其出邊劃分到了 1 個分片上。由於使用“點分割”切圖,頂點可能儲存多份,即某頂點可能有多個映象頂點(黃色圓圈),但只會有 1 個主頂點(白色圓圈)。頂點維度的資料都儲存在主頂點上,映象頂點充當訊息傳遞的“橋樑”。

圖7. 適用於 Pull 通訊模式的切圖

PageRank 由於是全圖迭代式演算法,使用 Pull 通訊模式。一次完整的 Pull 通訊過程,分為 SIGNAL 和 SLOT 兩個階段。

如圖8 所示,以 PageRank 演算法中更新頂點 1 的 rank值 為例(這裡只描述模擬計算過程):在 SIGNAL 階段,所有分片上的頂點 1(主頂點和映象頂點)從指向它的鄰居收集 rank 值並在本地聚合,聚合後的 rank 值會發給頂點 1 的主頂點所在分片(分片 0);在 SLOT 階段,分片 0 上的主頂點將來自各分片的 rank 值進行最終的合併計算,從而得到新的 rank 值。可以看出,該模式下完成 1 個頂點的訊息傳遞最多隻需傳送 (分片數 -1)個訊息,大大減少了程序間通訊量,能顯著提升效能。

圖1 顯示,Plato 執行 PageRank 演算法要比 GraphX / Giraph 在速度上高一個數量級。對於 connected-component 和 SSSP 這兩種非全圖迭代式演算法,Plato 使用了自適應切換的雙通訊模式,執行過程中會根據該時刻活躍邊數的比例來選擇效能更優的通訊模式,圖3 和圖5 看出,對比另外兩個框架,Plato 有數倍到 1 個數量級的效能優勢。在圖資料儲存方面,Plato 通過良好的資料結構設計,大大減少了記憶體佔用。並且其頂點索引和邊陣列的結構設計,實現了獲取某頂點鄰居的時間開銷為 O(1)。整體來看,Plato 比 GraphX / Giraph 計算速度高一個數量級,記憶體需求小一個數量級。

圖8. Pull 通訊模式

3.2 超大資料集 clueweb-12

由於機器資源限制,對於十億點、百億邊的超大資料集 clueweb-12,我們只在 4 nodes 場景下對幾個框架進行演算法評測。

3.2.1 測試結果

3.2.2 資料分析

GraphX / Giraph:原因如 3.1.2 節所述,GraphX 和 Giraph 由於記憶體佔用及效能原因,在 4 nodes 模式下,均無法在 10h 內完成演算法執行。在超大資料集和四機器資源下,兩框架都為不可用狀態。

Plato:得益於優秀的通訊模式設計和精緻的儲存結構實現(詳見 3.1.2節),Plato 在四節點部署模式下,即使面對超大資料集,依然出色地完成了幾種演算法的執行,執行時間最長不超過 70 min,且單臺機器的記憶體佔用遠未達到記憶體上限(192 GB)。

  1. 結論

總結上述實驗結果及資料分析,我們得出以下結論:

GraphX 由於底層 RDD 的不變性,執行效率和記憶體佔用均不理想。

Giraph 基於 map 容器的圖資料結構導致了很高的記憶體佔用,原生的點對點通訊模式也造成效能低下。

Plato 的塊式切分、雙引擎通訊模式、優化的底層儲存結構設計使其不論執行效率還是記憶體開銷都遠優於另外兩個框架,能高效地完成對大資料集的演算法執行。

綜上,我們最終選擇 Plato 作為圖計算平臺的底層引擎。

  1. 附錄

5.1 GraphX 配置引數及原始碼

5.1.1 Spark 配置引數

spark-defaults.conf

spark.driver.cores 2spark.driver.memory 2gspark.executor.memory 128gspark.local.dir /opt/meituan/appdatas/spark-tmp

5.1.2 各演算法執行指令碼及原始碼

PageRank 執行指令碼

/opt/meituan/appdatas/spark-3.1.2-bin-hadoop3.2/bin/spark-submit \ # 提交 spark 任務的 bin 檔案--deploy-mode cluster \ # 部署模式為叢集模式--master spark://HOST:PORT \ # 指定 master 節點地址--class PageRankDemo \ # 演算法的執行類/opt/meituan/appdatas/graphx-runtime/graphx-spark.jar \ # 演算法的執行 jar 包spark://HOST:PORT \ # 指定 master 節點地址"/opt/meituan/appdatas/graphx-runtime/twitter-2010-s/*" \ # 輸入檔名或資料夾(自動掃描資料夾下所有 csv)100 # 最大迭代輪次

PageRank 演算法執行類:PageRankDemo.scala(自定義實現)

import java.io.Fileimport org.apache.spark.graphx.{Edge, Graph}import org.apache.spark.rdd.RDDimport org.apache.spark.{SparkConf, SparkContext}object PageRankDemo { def main(args: Array[String]): Unit = { val master = if (args.length > 0) args(0) else "local[*]" val input = if (args.length > 1) args(1) else "test.csv" val maxIter = if (args.length > 2) args(2).toInt else 10 var watchTs = System.currentTimeMillis() val conf = new SparkConf() .setAppName("Spark PageRank") .setMaster(master) .set("spark.ui.port", "8415") val spark = new SparkContext(conf) val links: RDD[String] = spark.textFile(input) val edges = links.map( line => line.split(",") ) .map( line => ( line(0).toLong, line(1).toLong) ) val graph = Graph.fromEdgeTuples(edges, 1) println("[PERF] load edges: " + links.count() + ", init graph cost: " + (System.currentTimeMillis() - watchTs)) watchTs = System.currentTimeMillis() val ranks = graph.staticPageRank(maxIter).vertices println("[PERF] rank cost: " + (System.currentTimeMillis() - watchTs)) watchTs = System.currentTimeMillis() val outputPath = "/tmp/graphx_pr_out_csv" Util deleteDir(new File(outputPath)) ranks.saveAsTextFile(outputPath) println("[PERF] save output cost: " + (System.currentTimeMillis() - watchTs)) }}

connected-component 執行指令碼

/opt/meituan/appdatas/spark-3.1.2-bin-hadoop3.2/bin/spark-submit \ # 提交 spark 任務的 bin 檔案--deploy-mode cluster \ # 部署模式為叢集模式--master spark://HOST:PORT \ # 指定 master 節點地址--class ConnectedComponentsDemo \ # 演算法的執行類/opt/meituan/appdatas/graphx-runtime/graphx-spark.jar \ # 演算法的執行 jar 包spark://HOST:PORT \ # 指定 master 節點地址"/opt/meituan/appdatas/graphx-runtime/twitter-2010-s/*" \ # 輸入檔名或資料夾(自動掃描資料夾下所有 csv)

connected-component 演算法執行類: ConnectedComponentsDemo.scala(自定義實現)

import java.io.Fileimport org.apache.spark.graphx.{Edge, Graph}import org.apache.spark.rdd.RDDimport org.apache.spark.{SparkConf, SparkContext}object ConnectedComponentsDemo { def main(args: Array[String]): Unit = { val master = if (args.length > 0) args(0) else "local[*]" val input = if (args.length > 1) args(1) else "test.csv" var watchTs = System.currentTimeMillis() val conf = new SparkConf() .setAppName("Spark ConnectedComponents") .setMaster(master) .set("spark.ui.port", "8415") val spark = new SparkContext(conf) val links: RDD[String] = spark.textFile(input) val edges = links.map( line => line.split(",") ) .map( line => ( line(0).toLong, line(1).toLong) ) val graph = Graph.fromEdgeTuples(edges, 1) println("[PERF] load edges: " + links.count() + ", init graph cost: " + (System.currentTimeMillis() - watchTs)) watchTs = System.currentTimeMillis() val ranks = ConnectedComponentsNew.run(graph).vertices println("[PERF] rank cost: " + (System.currentTimeMillis() - watchTs)) watchTs = System.currentTimeMillis() val outputPath = "/tmp/graphx_cc_out_csv" Util deleteDir(new File(outputPath)) ranks.saveAsTextFile(outputPath) println("[PERF] save output cost: " + (System.currentTimeMillis() - watchTs)) }}

SSSP 執行指令碼

/opt/meituan/appdatas/spark-3.1.2-bin-hadoop3.2/bin/spark-submit \ # 提交 spark 任務的 bin 檔案--deploy-mode cluster \ # 部署模式為叢集模式--master spark://HOST:PORT \ # 指定 master 節點地址--class SsspDemo \ # 演算法的執行類/opt/meituan/appdatas/graphx-runtime/graphx-spark.jar \ # 演算法的執行 jar 包spark://HOST:PORT \ # 指定 master 節點地址"/opt/meituan/appdatas/graphx-runtime/twitter-2010-s/*" \ # 輸入檔名或資料夾(自動掃描資料夾下所有 csv)0 # 指定演算法的源點

SSSP 演算法執行類:SsspDemo.scala(自定義實現)

import java.io.Fileimport org.apache.spark.graphx.{Edge, EdgeDirection, Graph, VertexId}import org.apache.spark.rdd.RDDimport org.apache.spark.{SparkConf, SparkContext}object SsspDemo { def main(args: Array[String]): Unit = { val master = if (args.length > 0) args(0) else "local[*]" val input = if (args.length > 1) args(1) else "test.csv" val sourceId = if (args.length > 2) args(2).toInt else 0 val maxIter = if (args.length > 3) args(3).toInt else Int.MaxValue var watchTs = System.currentTimeMillis() val conf = new SparkConf() .setAppName("Spark SSSP") .setMaster(master) .set("spark.ui.port", "8415") val spark = new SparkContext(conf) val links: RDD[String] = spark.textFile(input) val edges: RDD[Edge[Double]] = links .map( line => line.split(",") ) .map( line => Edge(line(0).toLong, line(1).toLong, 1.0d)) val vertexes: RDD[(VertexId, Double)] = edges .flatMap(edge => Array(edge.srcId, edge.dstId)) .distinct() .map(id => if (id == sourceId) (id, 0.0) else (id, Double.PositiveInfinity) ) val defaultVertex = -1.0d val initialGraph: Graph[(Double), Double] = Graph(vertexes, edges, defaultVertex) println("[PERF] load edges: " + links.count() + ", init graph cost: " + (System.currentTimeMillis() - watchTs)) watchTs = System.currentTimeMillis() val sssp = initialGraph.pregel(Double.PositiveInfinity, maxIter, EdgeDirection.Out)( //Vertex Program (id, dist, newDist) => { if (dist <= newDist) dist else newDist }, //Send Message triplet => { if (triplet.srcAttr + triplet.attr < triplet.dstAttr) { Iterator((triplet.dstId, triplet.srcAttr + triplet.attr)) } else { Iterator.empty } }, //Merge Message (a, b) => { math.min(a, b) } ) println("[PERF] rank cost: " + (System.currentTimeMillis() - watchTs)) watchTs = System.currentTimeMillis() val outputPath = "/tmp/graphx_sssp_out_csv" Util deleteDir(new File(outputPath)) sssp.vertices.saveAsTextFile(outputPath) println("[PERF] save output cost: " + (System.currentTimeMillis() - watchTs)) }}

5.2 Giraph 配置引數及原始碼

5.2.1 Hadoop 配置引數

yarn-site.xml

<configuration><!-- Site specific YARN configuration properties --><property> <name>yarn.nodemanager.aux-services</name> <value>mapreduce_shuffle</value></property><property> <name>yarn.resourcemanager.ha.enabled</name> <value>true</value></property><property> <name>yarn.resourcemanager.cluster-id</name> <value>rmCluster</value></property><property> <name>yarn.resourcemanager.ha.rm-ids</name> <value>rm1,rm2</value></property><property> <name>yarn.resourcemanager.hostname.rm1</name> <value>ip2</value></property><property> <name>yarn.resourcemanager.hostname.rm2</name> <value>ip3</value></property><property> <name>yarn.resourcemanager.zk-address</name> <value>ip1:2181,ip2:2181,ip3:2181,ip4:2181</value></property><property> <name>yarn.resourcemanager.recovery.enabled</name> <value>true</value></property><property> <name>yarn.resourcemanager.store.class</name> <value>org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore</value></property><property> <name>yarn.resourcemanager.webapp.address.rm1</name> <value>ip2:8088</value></property><property> <name>yarn.resourcemanager.webapp.address.rm2</name> <value>ip3:8088</value></property><property> <name>yarn.application.classpath</name> <value>/opt/meituan/appdatas/hadoop-2.7.6/etc/hadoop,/opt/meituan/appdatas/hadoop-2.7.6/share/hadoop/common/*,/opt/meituan/appdatas/hadoop-2.7.6/share/hadoop/common/lib/*,/opt/meituan/appdatas/hadoop-2.7.6/share/hadoop/hdfs/*,/opt/meituan/appdatas/hadoop-2.7.6/share/hadoop/hdfs/lib/*,/opt/meituan/appdatas/hadoop-2.7.6/share/hadoop/mapreduce/*,/opt/meituan/appdatas/hadoop-2.7.6/share/hadoop/mapreduce/lib/*,/opt/meituan/appdatas/hadoop-2.7.6/share/hadoop/yarn/*,/opt/meituan/appdatas/hadoop-2.7.6/share/hadoop/yarn/lib/* </value></property><property> <name>yarn.scheduler.minimum-allocation-mb</name> <value>512</value></property><property> <name>yarn.scheduler.maximum-allocation-mb</name> <value>185344</value></property><property> <name>yarn.nodemanager.resource.memory-mb</name> <value>185344</value></property><property> <name>yarn.app.mapreduce.am.resource.mb</name> <value>4096</value></property><property> <name>yarn.app.mapreduce.am.command-opts</name> <value>-Xmx3276m</value></property><property> <name>yarn.nodemanager.resource.cpu-vcores</name> <value>48</value></property><property> <name>yarn.scheduler.maximum-allocation-vcores</name> <value>192</value></property><property> <name>dfs.datanode.max.transfer.threads</name> <value>8192</value></property></configuration>

根據除錯,PageRank 演算法每臺機器啟動 39 個 map task,connected-component 和 SSSP 演算法每臺機器啟動 19 個 map task,能達到最優執行效能(僅作為和本評測相同機器配置的參考設定)。

mapred-site.xml(39 map task / node)

<configuration><property> <name>mapreduce.framework.name</name> <value>yarn</value></property><property> <name>mapreduce.jobhistory.address</name> <value>ip1.mt:10020</value></property><property> <name>mapreduce.jobhistory.webapp.address</name> <value>ip1:19888</value></property><property> <name>mapreduce.jobhistory.joblist.cache.size</name> <value>20000</value></property><property> <name>mapreduce.jobhistory.done-dir</name> <value>${yarn.app.mapreduce.am.staging-dir}/history/done</value></property><property> <name>mapreduce.jobhistory.intermediate-done-dir</name> <value>${yarn.app.mapreduce.am.staging-dir}/history/done_intermediate</value></property><property> <name>yarn.app.mapreduce.am.staging-dir</name> <value>/opt/meituan/appdatas/hadoop-2.7.6/data/hadoop-yarn/staging</value></property><property> <name>yarn.app.mapreduce.am.env</name> <value>HADOOP_MAPRED_HOME=$HADOOP_HOME</value></property><property> <name>mapreduce.map.env</name> <value>HADOOP_MAPRED_HOME=$HADOOP_HOME</value></property><property> <name>mapreduce.reduce.env</name> <value>HADOOP_MAPRED_HOME=$HADOOP_HOME</value></property><property> <name>mapred.job.tracker</name> <value>ip1.mt:54311</value></property><property> <name>mapred.tasktracker.map.tasks.maximum</name> <value>40</value></property><property> <name>mapred.map.tasks</name> <value>2</value></property><property> <name>mapreduce.map.memory.mb</name> <value>4608</value></property><property> <name>mapreduce.reduce.memory.mb</name> <value>10</value></property><property> <name>mapreduce.map.java.opts</name> <value>-Xmx4147m</value></property><property> <name>mapreduce.reduce.java.opts</name> <value>-Xmx8m</value></property><property> <name>mapred.task.timeout</name> <value>36000000</value></property><property> <name>mapreduce.job.counters.limit</name> <value>500</value></property></configuration>

mapred-site.xml(19 map task / node)

<configuration><property> <name>mapreduce.framework.name</name> <value>yarn</value></property><property> <name>mapreduce.jobhistory.address</name> <value>ip1:10020</value></property><property> <name>mapreduce.jobhistory.webapp.address</name> <value>ip1:19888</value></property><property> <name>mapreduce.jobhistory.joblist.cache.size</name> <value>20000</value></property><property> <name>mapreduce.jobhistory.done-dir</name> <value>${yarn.app.mapreduce.am.staging-dir}/history/done</value></property><property> <name>mapreduce.jobhistory.intermediate-done-dir</name> <value>${yarn.app.mapreduce.am.staging-dir}/history/done_intermediate</value></property><property> <name>yarn.app.mapreduce.am.staging-dir</name> <value>/opt/meituan/appdatas/hadoop-2.7.6/data/hadoop-yarn/staging</value></property><property> <name>yarn.app.mapreduce.am.env</name> <value>HADOOP_MAPRED_HOME=$HADOOP_HOME</value></property><property> <name>mapreduce.map.env</name> <value>HADOOP_MAPRED_HOME=$HADOOP_HOME</value></property><property> <name>mapreduce.reduce.env</name> <value>HADOOP_MAPRED_HOME=$HADOOP_HOME</value></property><property> <name>mapred.job.tracker</name> <value>xr-nlpkg-graph-proxy01.mt:54311</value></property><property> <name>mapred.tasktracker.map.tasks.maximum</name> <value>20</value></property><property> <name>mapred.map.tasks</name> <value>2</value></property><property> <name>mapreduce.map.memory.mb</name> <value>9216</value></property><property> <name>mapreduce.reduce.memory.mb</name> <value>10</value></property><property> <name>mapreduce.map.java.opts</name> <value>-Xmx8294m</value></property><property> <name>mapreduce.reduce.java.opts</name> <value>-Xmx8m</value></property><property> <name>mapred.task.timeout</name> <value>36000000</value></property><property> <name>mapreduce.job.counters.limit</name> <value>500</value></property></configuration>

5.2.2 各演算法執行指令碼及原始碼

邊輸入檔案的解析類:LongStaticDoubleTextEdgeInputFormat.java(自定義實現)

package org.apache.giraph.io.formats;import org.apache.giraph.io.EdgeReader;import org.apache.giraph.utils.IntPair;import org.apache.hadoop.io.FloatWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.InputSplit;import org.apache.hadoop.mapreduce.TaskAttemptContext;import java.io.IOException;import java.util.regex.Pattern;/** * Simple text-based {@link org.apache.giraph.io.EdgeInputFormat} for * unweighted graphs with int ids. * * Each line consists of: source_vertex, target_vertex */public class LongStaticDoubleTextEdgeInputFormat extends TextEdgeInputFormat<LongWritable, FloatWritable> { /** Splitter for endpoints */ private static final Pattern SEPARATOR = Pattern.compile(","); @Override public EdgeReader<LongWritable, FloatWritable> createEdgeReader( InputSplit split, TaskAttemptContext context) throws IOException { return new LongStaticDoubleTextEdgeReader(); } public class LongStaticDoubleTextEdgeReader extends TextEdgeReaderFromEachLineProcessed<IntPair> { @Override protected IntPair preprocessLine(Text line) throws IOException { String[] tokens = SEPARATOR.split(line.toString()); return new IntPair(Integer.parseInt(tokens[0]), Integer.parseInt(tokens[1])); } @Override protected LongWritable getSourceVertexId(IntPair endpoints) throws IOException { return new LongWritable(endpoints.getFirst()); } @Override protected LongWritable getTargetVertexId(IntPair endpoints) throws IOException { return new LongWritable(endpoints.getSecond()); } @Override protected FloatWritable getValue(IntPair endpoints) throws IOException { // 本類只處理無權圖,這裡將邊權重初始化為預設值1.0 return new FloatWritable(1.0f); } }}

PageRank 執行指令碼

hadoop fs -rm -r /giraph-out-prhadoop jar /opt/meituan/appdatas/nlp-giraph/giraph-examples/target/giraph-examples-1.3.0-SNAPSHOT-for-hadoop-2.7.6-jar-with-dependencies.jar \ # 編譯的jar包org.apache.giraph.GiraphRunner \ # Giraph啟動類org.apache.giraph.examples.SimplePageRankComputation \ # 演算法的執行類-mc org.apache.giraph.examples.SimplePageRankComputation\$SimplePageRankMasterCompute \ # 演算法的主計算類(pagerank演算法的特殊配置)-eif org.apache.giraph.io.formats.LongStaticDoubleTextEdgeInputFormat \ # 輸入邊檔案的解析類(自定義實現)-eip /giraph-input/twitter-2010-s \ # 輸入邊檔案的檔名或資料夾(自動掃描資料夾下所有csv)-vof org.apache.giraph.io.formats.IdWithValueTextOutputFormat \ # 頂點資料的輸出類-op /giraph-out-pr \ # 頂點資料的輸出路徑-w 39*N \ # 啟動的總worker數量(N為機器數量)-ca giraph.SplitMasterWorker=true # 指定自定義引數

connected-component 執行指令碼

hadoop fs -rm -r /giraph-out-cchadoop jar /opt/meituan/appdatas/nlp-giraph/giraph-examples/target/giraph-examples-1.3.0-SNAPSHOT-for-hadoop-2.7.6-jar-with-dependencies.jar \ # 編譯的jar包org.apache.giraph.GiraphRunner \ # Giraph啟動類org.apache.giraph.examples.ConnectedComponentsComputation \ # 演算法的執行類-eif org.apache.giraph.io.formats.LongStaticDoubleTextEdgeInputFormat \ # 輸入邊檔案的解析類(自定義實現)-eip /giraph-input/twitter-2010-s \ # 輸入邊檔案的檔名或資料夾(自動掃描資料夾下所有csv)-vof org.apache.giraph.io.formats.IdWithValueTextOutputFormat \ # 頂點資料的輸出類-op /giraph-out-cc \ # 頂點資料的輸出路徑-w 19*N \ # 啟動的總worker數量(N為機器數量)-ca giraph.SplitMasterWorker=true # 指定自定義引數

connected-component 演算法執行類:ConnectedComponentsComputation.java(有邏輯重寫)

package org.apache.giraph.examples;import org.apache.giraph.graph.BasicComputation;import org.apache.giraph.edge.Edge;import org.apache.giraph.graph.Vertex;import org.apache.hadoop.io.FloatWritable;import org.apache.hadoop.io.LongWritable;import java.io.IOException;@Algorithm( name = "Connected components", description = "Finds connected components of the graph")public class ConnectedComponentsComputation extends BasicComputation<LongWritable, LongWritable, FloatWritable, LongWritable> { /** * Propagates the smallest vertex id to all neighbors. Will always choose to * halt and only reactivate if a smaller id has been sent to it. * * @param vertex Vertex * @param messages Iterator of messages from the previous superstep. * @throws IOException */ @Override public void compute( Vertex<LongWritable, LongWritable, FloatWritable> vertex, Iterable<LongWritable> messages) throws IOException { if (getSuperstep() == 0 || getSuperstep() == 1 && vertex.getNumEdges() == 0) { // 初始化label值為頂點id vertex.setValue(vertex.getId()); } long currentComponent = vertex.getValue().get(); if (getSuperstep() == 0) { for (Edge<LongWritable, FloatWritable> edge : vertex.getEdges()) { LongWritable neighbor = edge.getTargetVertexId(); sendMessage(neighbor, vertex.getValue()); } vertex.voteToHalt(); return; } boolean changed = false; for (LongWritable message : messages) { long candidateComponent = message.get(); if (candidateComponent < currentComponent) { currentComponent = candidateComponent; changed = true; } } // propagate new component id to the neighbors if (changed) { vertex.setValue(new LongWritable(currentComponent)); sendMessageToAllEdges(vertex, vertex.getValue()); } vertex.voteToHalt(); }}

SSSP 執行指令碼

hadoop fs -rm -r /giraph-out-sssphadoop jar /opt/meituan/appdatas/nlp-giraph/giraph-examples/target/giraph-examples-1.3.0-SNAPSHOT-for-hadoop-2.7.6-jar-with-dependencies.jar \ # 編譯的jar包org.apache.giraph.GiraphRunner \ # Giraph啟動類org.apache.giraph.examples.SimpleShortestPathsComputation \ # 演算法的執行類-eif org.apache.giraph.io.formats.LongStaticDoubleTextEdgeInputFormat \ # 輸入邊檔案的解析類(自定義實現)-eip /giraph-input/twitter-2010-s \ # 輸入邊檔案的檔名或資料夾(自動掃描資料夾下所有csv)-vof org.apache.giraph.io.formats.IdWithValueTextOutputFormat \ # 頂點資料的輸出類-op /giraph-out-sssp \ # 頂點資料的輸出路徑-w 19*N \ # 啟動的總worker數量(N為機器數量)-ca giraph.SplitMasterWorker=true,SimpleShortestPathsVertex.sourceId=0 # 指定自定義引數

SSSP 演算法執行類:SimpleShortestPathsComputation.java(有邏輯重寫)

package org.apache.giraph.examples;import org.apache.giraph.graph.BasicComputation;import org.apache.giraph.conf.LongConfOption;import org.apache.giraph.edge.Edge;import org.apache.giraph.graph.Vertex;import org.apache.hadoop.io.DoubleWritable;import org.apache.hadoop.io.FloatWritable;import org.apache.hadoop.io.LongWritable;import org.apache.log4j.Logger;import java.io.IOException;/** * Demonstrates the basic Pregel shortest paths implementation. */@Algorithm( name = "Shortest paths", description = "Finds all shortest paths from a selected vertex")public class SimpleShortestPathsComputation extends BasicComputation< LongWritable, DoubleWritable, FloatWritable, DoubleWritable> { /** The shortest paths id */ public static final LongConfOption SOURCE_ID = new LongConfOption("SimpleShortestPathsVertex.sourceId", 1, "The shortest paths id"); /** Class logger */ private static final Logger LOG = Logger.getLogger(SimpleShortestPathsComputation.class); /** * Is this vertex the source id? * * @param vertex Vertex * @return True if the source id */ private boolean isSource(Vertex<LongWritable, ?, ?> vertex) { return vertex.getId().get() == SOURCE_ID.get(getConf()); } @Override public void compute( Vertex<LongWritable, DoubleWritable, FloatWritable> vertex, Iterable<DoubleWritable> messages) throws IOException { if (getSuperstep() == 0 || !isSource(vertex) && vertex.getValue().get() == 0.0) { // 所有頂點的距離值初始化為最大值 vertex.setValue(new DoubleWritable(Double.MAX_VALUE)); } double minDist = isSource(vertex) ? 0d : Double.MAX_VALUE; for (DoubleWritable message : messages) { minDist = Math.min(minDist, message.get()); } if (minDist < vertex.getValue().get()) { vertex.setValue(new DoubleWritable(minDist)); for (Edge<LongWritable, FloatWritable> edge : vertex.getEdges()) { double distance = minDist + edge.getValue().get(); sendMessage(edge.getTargetVertexId(), new DoubleWritable(distance)); } } vertex.voteToHalt(); }}

5.3 Plato 配置引數及原始碼

5.3.1 各演算法執行指令碼及原始碼

PageRank 執行指令碼

#!/bin/bashset -exROOT_DIR="/opt/meituan/appdatas/plato-mt-0.1.1" # 原始碼路徑WORK_DIR="/opt/meituan/appdatas/plato-runtime" # 工作路徑MAIN="$ROOT_DIR/bazel-bin/example/pagerank" # 編譯好的演算法二進位制檔案所在地址WNUM=N # 程序分片數WCORES=48 # 使用執行緒數MPI_HOSTS="HOST1,HOST2..." # 程序ip列表(替換為實際的機器ip列表)INPUT=${INPUT:="/opt/meituan/appdatas/graphx-runtime/twitter-2010-s"} # 輸入檔名或資料夾(自動掃描資料夾下所有csv)OUTPUT=${OUTPUT:="$WORK_DIR/../plato-output/pr_output"} # 輸出資料夾IS_DIRECTED=${IS_DIRECTED:=true} # 是否有向圖EPS=${EPS:=0} # Delta值小於此值認為迭代已經收斂,立即退出DAMPING=${DAMPING:=0.85} # 整形引數ITERATIONS=${ITERATIONS:=100} # 最大迭代輪次# paramPARAMS+=" --threads ${WCORES}"PARAMS+=" --input ${INPUT} --output ${OUTPUT} --is_directed=${IS_DIRECTED}"PARAMS+=" --iterations ${ITERATIONS} --eps ${EPS} --damping ${DAMPING}"# mpichMPIRUN_CMD=${MPIRUN_CMD:="$ROOT_DIR/3rd/mpich/bin/mpiexec.hydra"}# testexport LD_LIBRARY_PATH=${LD_LIBRARY_PATH}:$ROOT_DIR/3rd/hadoop2/lib# run${MPIRUN_CMD} -n ${WNUM} -host $MPI_HOSTS ${MAIN} ${PARAMS} # host引數指定執行機,需要能通過ssh公鑰訪問

connected-component 執行指令碼

#!/bin/bashset -exROOT_DIR="/opt/meituan/appdatas/plato-mt-0.1.1" # 原始碼路徑WORK_DIR="/opt/meituan/appdatas/plato-runtime" # 工作路徑MAIN="$ROOT_DIR/bazel-bin/example/cgm_simple" # 編譯好的演算法二進位制檔案所在地址WNUM=N # 程序分片數(N 替換為機器個數)WCORES=48 # 使用執行緒數MPI_HOSTS="HOST1,HOST2..." # 程序ip列表(替換為實際的機器ip列表)INPUT=${INPUT:="/opt/meituan/appdatas/graphx-runtime/twitter-2010-s"} # 輸入檔名或資料夾(自動掃描資料夾下所有csv)OUTPUT=${OUTPUT:="$WORK_DIR/../plato-output/cc_output"} # 輸出資料夾IS_DIRECTED=${IS_DIRECTED:=true} # 是否有向圖OUTPUT_METHOD=${OUTPUT_METHOD:="all_vertices"} # 輸出的結果格式# paramPARAMS+=" --threads ${WCORES}"PARAMS+=" --input ${INPUT} --output ${OUTPUT} --is_directed=${IS_DIRECTED}"PARAMS+=" --output_method ${OUTPUT_METHOD}"# mpichMPIRUN_CMD=${MPIRUN_CMD:="$ROOT_DIR/3rd/mpich/bin/mpiexec.hydra"}# testexport LD_LIBRARY_PATH=${LD_LIBRARY_PATH}:$ROOT_DIR/3rd/hadoop2/lib# run${MPIRUN_CMD} -n ${WNUM} -host $MPI_HOSTS ${MAIN} ${PARAMS} # host引數指定執行機,需要能通過ssh公鑰訪問

SSSP 執行指令碼

#!/bin/bashset -exROOT_DIR="/opt/meituan/appdatas/plato-mt-0.1.1" # 原始碼路徑WORK_DIR="/opt/meituan/appdatas/plato-runtime" # 工作路徑MAIN="$ROOT_DIR/bazel-bin/example/sssp_simple" # 編譯好的演算法二進位制檔案所在地址WNUM=N # 程序分片數(N 替換為機器個數)WCORES=48 # 使用執行緒數MPI_HOSTS="HOST1,HOST2..." # 程序ip列表(替換為實際的機器ip列表)INPUT=${INPUT:="/opt/meituan/appdatas/graphx-runtime/twitter-2010-s"} # 輸入檔名或資料夾(自動掃描資料夾下所有csv)OUTPUT=${OUTPUT:="$WORK_DIR/../plato-output/sssp_output"} # 輸出資料夾IS_DIRECTED=${IS_DIRECTED:=true} # 是否有向圖# paramPARAMS+=" --threads ${WCORES}"PARAMS+=" --input ${INPUT} --output ${OUTPUT} --is_directed=${IS_DIRECTED}"# mpichMPIRUN_CMD=${MPIRUN_CMD:="$ROOT_DIR/3rd/mpich/bin/mpiexec.hydra"}# testexport LD_LIBRARY_PATH=${LD_LIBRARY_PATH}:$ROOT_DIR/3rd/hadoop2/lib# run${MPIRUN_CMD} -n ${WNUM} -host $MPI_HOSTS ${MAIN} ${PARAMS} # host引數指定執行機,需要能通過ssh公鑰訪問

Plato 本身不包含 SSSP 演算法包,因此對該演算法進行了等價性實現:

1、在 /example 資料夾下新建 sssp_simple.cc,並在BUILD檔案最後新增如下內容。

sssp_simple.cc:

#include <cstdint>#include "glog/logging.h"#include "gflags/gflags.h"#include "plato/graph/graph.hpp"#include "plato/algo/sssp/sssp.hpp"DEFINE_string(input, "", "input file, in csv format, without edge data");DEFINE_string(output, "", "output directory, store the sssp result");DEFINE_bool(is_directed, true, "is graph directed or not");DEFINE_uint32(root, 0, "start sssp from which vertex");DEFINE_int32(alpha, -1, "alpha value used in sequence balance partition");DEFINE_bool(part_by_in, false, "partition by in-degree");bool string_not_empty(const char*, const std::string& value) { if (0 == value.length()) { return false; } return true;}DEFINE_validator(input, &string_not_empty);DEFINE_validator(output, &string_not_empty);void init(int argc, char** argv) { gflags::ParseCommandLineFlags(&argc, &argv, true); google::InitGoogleLogging(argv[0]); google::LogToStderr();}int main(int argc, char** argv) { plato::stop_watch_t watch; auto& cluster_info = plato::cluster_info_t::get_instance(); watch.mark("load"); init(argc, argv); cluster_info.initialize(&argc, &argv); plato::graph_info_t graph_info(FLAGS_is_directed); auto graph = plato::create_dualmode_seq_from_path<plato::empty_t>(&graph_info, FLAGS_input, plato::edge_format_t::CSV, plato::dummy_decoder<plato::empty_t>, FLAGS_alpha, FLAGS_part_by_in); plato::algo::sssp_opts_t opts; opts.root_ = FLAGS_root; watch.mark("t0"); plato::thread_local_fs_output os(FLAGS_output, (boost::format("%04d_") % cluster_info.partition_id_).str(), true); auto callback = [&] (plato::vid_t v_i, std::uint32_t value) { auto& fs_output = os.local(); fs_output << v_i << "," << value << "\n"; }; if (0 == cluster_info.partition_id_) { LOG(INFO) << "Load graph cost: " << watch.show("load") / 1000.0 << "s"; } plato::vid_t visited = plato::algo::single_source_shortest_path(graph.second, graph.first, graph_info, opts, callback); if (0 == cluster_info.partition_id_) { LOG(INFO) << "sssp done, visited: " << visited << ", cost: " << watch.show("t0") / 1000.0 << "s"; } return 0;}

BUILD:

cc_binary ( name = "sssp_simple", srcs = [ "sssp_simple.cc", ], copts = ['-g', '-O2', ] + PLATO_OPTS, linkopts = [ ] + PLATO_OPTS, deps = [ "//3rd/glog:glog", "//3rd/gflags:gflags", "//3rd/boost:boost_include", "//plato/graph:graph", "//plato/algo/sssp:sssp", ], defines = [ # "__DCSC_DEBUG__", ], linkstatic = 1,)

2、新建 /plato/algo/sssp 目錄,並在該路徑下新建 sssp.hpp 和 BUILD檔案。

sssp.hpp:

#ifndef __PLATO_ALGO_SSSP_HPP__#define __PLATO_ALGO_SSSP_HPP__#include <cstdint>#include <cstdlib>#include "glog/logging.h"#include "plato/util/perf.hpp"#include "plato/util/atomic.hpp"#include "plato/graph/graph.hpp"#include "plato/engine/dualmode.hpp"namespace plato { namespace algo {struct sssp_opts_t { vid_t root_ = 0;};// distance 訊息結構體struct distance_msg_type_t { vid_t v_i; std::uint32_t value;};/* * demo implementation of single source shortest path * * \tparam INCOMING graph type, with incoming edges * \tparam OUTGOING graph type, with outgoing edges * * \param in_edges incoming edges, dcsc, ... * \param out_edges outgoing edges, bcsr, ... * \param graph_info base graph-info * \param opts sssp options * \param callback callback func to ouput result * * \return * visited vertices count * */template <typename INCOMING, typename OUTGOING, typename Callback>vid_t single_source_shortest_path( INCOMING& in_edges, OUTGOING& out_edges, const graph_info_t& graph_info, const sssp_opts_t& opts, Callback&& callback) { plato::stop_watch_t watch; auto& cluster_info = plato::cluster_info_t::get_instance(); watch.mark("run"); // 傳入兩種切分方式的圖資料,構建雙模式引擎 dualmode_engine_t<INCOMING, OUTGOING> engine ( std::shared_ptr<INCOMING>(&in_edges, [](INCOMING*) { }), std::shared_ptr<OUTGOING>(&out_edges, [](OUTGOING*) { }), graph_info); // alloc structs used during bfs auto visited = engine.alloc_v_subset(); // 標記已訪問的頂點 auto active_current = engine.alloc_v_subset(); // 標記本輪迭代中活躍的頂點 auto active_next = engine.alloc_v_subset(); // 標記下輪迭代中活躍的頂點 auto distance = engine.template alloc_v_state<std::uint32_t>(); // 儲存頂點的最短距離 // 初始化非源點的距離為最大值、源點的距離為0 distance.fill(std::numeric_limits<std::uint32_t>::max()); distance[opts.root_] = 0; // 標記源點已訪問過 visited.set_bit(opts.root_); // 標記源點為活躍態 active_current.set_bit(opts.root_); // 全域性變數,統計所有分片中活躍態的頂點數量 plato::vid_t actives = 1; for (int epoch_i = 0; 0 != actives; ++epoch_i) { using pull_context_t = plato::template mepa_ag_context_t<distance_msg_type_t>; using pull_message_t = plato::template mepa_ag_message_t<distance_msg_type_t>; using push_context_t = plato::template mepa_bc_context_t<distance_msg_type_t>; using adj_unit_list_spec_t = typename INCOMING::adj_unit_list_spec_t; watch.mark("t1"); active_next.clear(); actives = engine.template foreach_edges<distance_msg_type_t, plato::vid_t> ( // PUSH [&](const push_context_t& context, vid_t v_i) { context.send(distance_msg_type_t {v_i, distance[v_i] + 1}); }, [&](int /*p_i*/, distance_msg_type_t& msg) { plato::vid_t activated = 0; auto neighbours = out_edges.neighbours(msg.v_i); for (auto it = neighbours.begin_; neighbours.end_ != it; ++it) { plato::vid_t dst = it->neighbour_; if ((plato::write_min(&distance[dst], msg.value)) ) { active_next.set_bit(dst); visited.set_bit(dst); ++activated; } } return activated; }, // PULL [&](const pull_context_t& context, plato::vid_t v_i, const adj_unit_list_spec_t& adjs) { for (auto it = adjs.begin_; adjs.end_ != it; ++it) { plato::vid_t src = it->neighbour_; if (active_current.get_bit(src)) { context.send(pull_message_t {v_i, distance_msg_type_t{v_i, distance[src] + 1}}); break; } } }, [&](int, pull_message_t & msg) { if (plato::write_min(&distance[msg.v_i_], msg.message_.value)) { active_next.set_bit(msg.v_i_); visited.set_bit(msg.v_i_); return 1; } return 0; }, active_current ); std::swap(active_current, active_next); if (0 == cluster_info.partition_id_) { LOG(INFO) << "active_v[" << epoch_i << "] = " << actives << ", cost: " << watch.show("t1") / 1000.0 << "s"; } } if (0 == cluster_info.partition_id_) { LOG(INFO) << "Run cost: " << watch.show("run") / 1000.0 << "s"; } watch.mark("output"); // save output auto active_all = engine.alloc_v_subset(); active_all.fill(); //traverse auto active_view_all = plato::create_active_v_view(engine.out_edges()->partitioner()->self_v_view(), active_all); active_view_all.template foreach<vid_t>([&] (vid_t v_i) { callback(v_i, distance[v_i]); return 1; }); if (0 == cluster_info.partition_id_) { LOG(INFO) << "Output cost: " << watch.show("output") / 1000.0 << "s"; } visited.sync(); return visited.count();}}} // namespace algo, namespace plato#endif

BUILD:

load("//build_tools/rules:variables.bzl", "PLATO_OPTS")cc_library ( name = "sssp", hdrs = [ "sssp.hpp", ], srcs = [], includes = [ ".", ], deps = [ "//3rd/glog:glog", "//plato/util:perf", "//plato/util:atomic", "//plato/graph:graph", "//plato/engine:dualmode", ], defines = [ "__DUALMODE_DEBUG__", ], copts = [ '-O2', '-Wall', '-std=c++11', ] + PLATO_OPTS, linkopts = [ ] + PLATO_OPTS, visibility = ["//visibility:public"],)

5.4 評測原始資料

5.4.1 twitter-2010 原始資料

PageRank

18.9 x:表示在 2 nodes 模式下,Plato 與另外兩個框架中效能較優的執行時間比值(2823.676 / 149.337 = 18.9)。即在該場景下,Plato 相比另外兩個框架效能至少提升了 18.9 倍。其他指標同理。

connected-component

SSSP

5.4.2 clueweb-12 原始資料

  1. 參考閱讀

美團圖資料庫平臺建設及業務實踐