1. 程式人生 > >對Giraph的一些理解

對Giraph的一些理解

Giraph的一些理解

這兩天又重新看了一下Giraph原始碼,對整體架構的理解又有了新的認識和理解,下面逐點來說。

一、          Giraph本質的理解:

大家都知道,Giraph對使用者來講可能是一個基於Pregel模型的圖運算專案,但是對於Hadoop來講,其實它是一個普通的MapReduce任務。因此我們在執行時可以把他看成是一個mapreduce任務,只是這個任務有點特殊和複雜。

特殊在,它沒有像普通mapreduce任務一樣重寫mapreduce函式,而只是寫了run函式,所以一切邏輯都在run函式中呈現,mapreduce函式都是空函式,並且這個對於hadoop

job沒有reduce任務,只有map任務。

複雜在,它不是簡單的map任務,在run函式中實現了一切pregel邏輯,包括資料載入、圖資料重分割槽、圖資料的計算等。

二、          Giraph到底利用了hadoop的什麼?

首先,用到mapreduce框架,是不是感覺有點蒙圈了?通過第一點的講解,大家知道了Giraph的本質,但是換個角度想想,它其實只是用到了hadoop的分散式任務啟動的功能,也就是說,hadoop給這個特殊的Job啟動了Nmaptask以後就再也沒做什麼。

那麼問題來了,使用過Giraph的夥伴們都知道,在執行命令裡我們可以指定work的數數目(-w N),而且神奇的發現,

hadoop確實啟動了N+1(多出來的一個是master)個mapworker)任務,可能有人會說,我們在使用hadoop的可以指定map數量,嗯嗯,這個說法沒錯,難道這麼簡單就完了嗎?仔細想想,使用者指定的hadoop數目不一定能起到作用啊,hadoop在確定map任務數量時候是根據它在hdfs上的block數量來啟動的,預設每個block啟動一個map任務,所以當我們在giraph啟動時人工指定的map數目不一定起作用。其實giraph用了一個很聰明的辦法,重寫InputFormat啊,在giraph中它為hadoop指定的InputFormat是一個“空”的-----BspInputFormat
,這個BspInputformat中一個重要的方法getsplits()其實獲得的是含有一個空路徑的splitlist,但是他的數量是N+1。所以hadoop在確定啟動多少個maptask時待用這個方法得到了含有N+1splitlist,啟動了N+1maptask來作為Giraphworker,但是split中檔案路徑為空,所以Hadoop沒有載入任何資料。

其次,giraph確實用到了hadoophdfs,這個沒什麼說的。

三、          Giraph同步和通訊的理解

可能你又會問,Giraph這個框架要實現Pregel必須滿足同步機制和通訊等功能,如何實現的呢?

首先來說說通訊吧,在Giraph中,因為它本質上是map任務,但沒有用到mapreduce框架的任何通訊功能,它的所有通訊都是用的Netty這個apache開源分散式通訊工具完成的,也就是不同worker之間的訊息(包括節點訊息等)都是netty完成的。

其次是同步機制,學習hadoop生態圈的對zookeeper都不會陌生,zookeeper是一個類似於簡單分散式檔案系統的分散式協同管理工具(具體的內容自己看吧)。在Giraph中,每一個迭代步中worker都會在zookeeper上某個路徑下(設為A)建立相應迭代步的znode,並且監聽“本步迭代完成標誌節點”是否已經建立,master的任務就是不停檢查A路徑下的znode節點數目是否等於worker數目,若等於則建立“本步迭代完成標誌節點”,此時worker會監聽到,進入下一個迭代步。

四、          Giraph的這個工作流程的理解

這個本來應該畫一個流程圖來說明更加直觀的,由於一些原因先文字敘述一下,後期再補吧。

通過第二點可以知道,當GiraphJob提交以後,hadoopGiraph啟動了N+1個任務後(此時的任務叫做maptask,因為還沒有確定哪個maptaskmaster哪個是worker),那麼Giraph接下來都做了哪些內容呢?

1.  GIraph要確定叢集上有沒有zookeeper服務(使用者可以指定),如果有,則進入第2步,如果沒有此時要啟動zookeeper服務。熟悉zookeeper的小夥伴應該知道,zookeeper中應該有一個主伺服器和多個從伺服器來為(client)叢集服務,因此Giraph首先要啟動zookeeper伺服器,Giraph中預設啟動一個zookeeper伺服器。這個過程又是怎樣的呢?

1.1      啟動好的多個maptask首先在hdfs的指定路徑下建立本task的標誌(例如,路徑B/host1 taskID)

1.2      每個task啟動一個zookManage(類名可能不正確,詳細見原始碼)物件,這個物件的主要任務就是從這些個maptask中選取一個作為zookeeper伺服器,經驗證這個始終都是task0,有興趣的可以自己看下原始碼。

1.3      Task0zookeeper伺服器啟動,並在hdfs上掛出伺服器地址和埠,其他task讀取地址和埠,所有task進入第2步。

2.          Giraph啟動了zookeeper服務以後,接下來要做的就是確定每個task的角色,也就是確定它具體是master還是worker,判斷邏輯如下:

a)      If not split master, everyone does the everything and/or runningZooKeeper. 

b)      If split master/worker, masters also run ZooKeeper

c)      If split master/worker == true and giraph.zkList is set, the masterwill not instantiate a ZK instance, but will assume a quorum is already activeon the cluster for Giraph to use.

最終的結果就是,作為zookeeper伺服器的task0兼任master,這個task不參與具體運算。其他的taskworker,參與具體運算。

3. 接下來,masterworker開始做不同的事情,並且開始進行同步了,這個過程利用了剛剛建好的zookeeper服務來完成。

master要做的事情就是載入使用者輸入檔案的splits資訊,這個資訊是利用使用者定義的InputFormat(注意區分BspInputFormat)來獲取的,此時獲取的splits才是真正的資料,master會將splits資訊以znode節點的形式掛到zookeeper上,同時master根據splits資訊建立分割槽,這裡需要說明一下分割槽數和splits數目,splits數目一般是hdfs上的blocks(這個和hadoop相似)的數量,分割槽數目則不同,分割槽數目是通過系統的一個計算公式(partitionCount=PARTITION_COUNT_MULTIPLIER* availableWorkerInfos.size() * availableWorkerInfos.size())來建立,這個數目要比workers數目大,得到這個數目後,會將分割槽安排到不同的work上,並將該partionToWorker這個資訊也以znode的形式掛到zookeeper上。

其他的workermaster生成splits資訊和partion資訊之前一直處於同步等待階段,當master將資訊掛載到zookeeper後,workerzookeeper上讀取splits資訊,並且開啟執行緒進行資料載入,資料載入過程線面會詳細講解。載入完資料以後,各個worker開始將自己load進來的點計算它屬於的分割槽號,並查到屬於哪個計算節點,並將其傳送到相應計算節點。

        4.  最後,當資料都載入到各個worker上以後開始進行迭代計算。

五、          Giraph中涉及到多執行緒的地方

Giraph中,主要涉及到多執行緒的有兩個地方,一個是worker在將split中所指的資料載入到記憶體時使用了執行緒;另一個是worker在迭代中對頂點計算時候使用了多執行緒。

先說第一個,當master把splits資訊放到zookeeper上以後,每個Worker建立N個InputsCallable執行緒讀取資料。N=Min(NUM_INPUT_THREADS,maxInputSplitThread),其中NUM_INPUT_THREADS預設值為1,maxInputSplitThread=InputSplitSize-1/maxWorkers+1。那麼,預設每個worker就是建立一個執行緒來載入資料。在每個執行緒中,它們會不停的遍歷splits列表上的資訊,當遇到一個沒有被載入的splits時,首先會在zookeeper上建立相應splits的正在載入標籤,然後載入完以後會建立已載入標籤。當載入完這個split之後繼續進行上述過程,進行載入。

再說第二個,當worker把資料載入到記憶體並且已經將頂點發送到屬於它的worker上以後,每個worker會得到若干個分割槽(原因在前面已經講過)。此時開始進行迭代計算,也是pregel的核心內容。具體的,每個worker會開啟N個執行緒來進行計算,此時worker會將本地的partionID儲存到一個佇列,partionID所對應的資料會放在一個叫做serviceWorker的物件中儲存。每個執行緒中,它會不停地遍歷屬於本worker上的partion,對每一個partion上的頂點進行遍歷執行使用者定義的compute函式,直到所有partion完成。

需要說明的是,上述的“N”是可以自定義的,不過一般都是採用預設。