對Giraph的一些理解
對Giraph的一些理解
這兩天又重新看了一下Giraph原始碼,對整體架構的理解又有了新的認識和理解,下面逐點來說。
一、 Giraph本質的理解:
大家都知道,Giraph對使用者來講可能是一個基於Pregel模型的圖運算專案,但是對於Hadoop來講,其實它是一個普通的MapReduce任務。因此我們在執行時可以把他看成是一個mapreduce任務,只是這個任務有點特殊和複雜。
特殊在,它沒有像普通mapreduce任務一樣重寫map和reduce函式,而只是寫了run函式,所以一切邏輯都在run函式中呈現,map、reduce函式都是空函式,並且這個對於hadoop的
複雜在,它不是簡單的map任務,在run函式中實現了一切pregel邏輯,包括資料載入、圖資料重分割槽、圖資料的計算等。
二、 Giraph到底利用了hadoop的什麼?
首先,用到mapreduce框架,是不是感覺有點蒙圈了?通過第一點的講解,大家知道了Giraph的本質,但是換個角度想想,它其實只是用到了hadoop的分散式任務啟動的功能,也就是說,hadoop給這個特殊的Job啟動了N個maptask以後就再也沒做什麼。
那麼問題來了,使用過Giraph的夥伴們都知道,在執行命令裡我們可以指定work的數數目(-w N),而且神奇的發現,
其次,giraph確實用到了hadoop的hdfs,這個沒什麼說的。
三、 Giraph同步和通訊的理解
可能你又會問,Giraph這個框架要實現Pregel必須滿足同步機制和通訊等功能,如何實現的呢?
首先來說說通訊吧,在Giraph中,因為它本質上是map任務,但沒有用到mapreduce框架的任何通訊功能,它的所有通訊都是用的Netty這個apache開源分散式通訊工具完成的,也就是不同worker之間的訊息(包括節點訊息等)都是netty完成的。
其次是同步機制,學習hadoop生態圈的對zookeeper都不會陌生,zookeeper是一個類似於簡單分散式檔案系統的分散式協同管理工具(具體的內容自己看吧)。在Giraph中,每一個迭代步中worker都會在zookeeper上某個路徑下(設為A)建立相應迭代步的znode,並且監聽“本步迭代完成標誌節點”是否已經建立,master的任務就是不停檢查A路徑下的znode節點數目是否等於worker數目,若等於則建立“本步迭代完成標誌節點”,此時worker會監聽到,進入下一個迭代步。
四、 Giraph的這個工作流程的理解
這個本來應該畫一個流程圖來說明更加直觀的,由於一些原因先文字敘述一下,後期再補吧。
通過第二點可以知道,當GiraphJob提交以後,hadoop為Giraph啟動了N+1個任務後(此時的任務叫做maptask,因為還沒有確定哪個maptask是master哪個是worker),那麼Giraph接下來都做了哪些內容呢?
1. GIraph要確定叢集上有沒有zookeeper服務(使用者可以指定),如果有,則進入第2步,如果沒有此時要啟動zookeeper服務。熟悉zookeeper的小夥伴應該知道,zookeeper中應該有一個主伺服器和多個從伺服器來為(client)叢集服務,因此Giraph首先要啟動zookeeper伺服器,Giraph中預設啟動一個zookeeper伺服器。這個過程又是怎樣的呢?
1.1 啟動好的多個maptask首先在hdfs的指定路徑下建立本task的標誌(例如,路徑B/host1 taskID) 。
1.2 每個task啟動一個zookManage(類名可能不正確,詳細見原始碼)物件,這個物件的主要任務就是從這些個maptask中選取一個作為zookeeper伺服器,經驗證這個始終都是task0,有興趣的可以自己看下原始碼。
1.3 Task0將zookeeper伺服器啟動,並在hdfs上掛出伺服器地址和埠,其他task讀取地址和埠,所有task進入第2步。
2. Giraph啟動了zookeeper服務以後,接下來要做的就是確定每個task的角色,也就是確定它具體是master還是worker,判斷邏輯如下:
a) If not split master, everyone does the everything and/or runningZooKeeper.
b) If split master/worker, masters also run ZooKeeper
c) If split master/worker == true and giraph.zkList is set, the masterwill not instantiate a ZK instance, but will assume a quorum is already activeon the cluster for Giraph to use.
最終的結果就是,作為zookeeper伺服器的task0兼任master,這個task不參與具體運算。其他的task為worker,參與具體運算。
3. 接下來,master和worker開始做不同的事情,並且開始進行同步了,這個過程利用了剛剛建好的zookeeper服務來完成。
master要做的事情就是載入使用者輸入檔案的splits資訊,這個資訊是利用使用者定義的InputFormat(注意區分BspInputFormat)來獲取的,此時獲取的splits才是真正的資料,master會將splits資訊以znode節點的形式掛到zookeeper上,同時master根據splits資訊建立分割槽,這裡需要說明一下分割槽數和splits數目,splits數目一般是hdfs上的blocks(這個和hadoop相似)的數量,分割槽數目則不同,分割槽數目是通過系統的一個計算公式(partitionCount=PARTITION_COUNT_MULTIPLIER* availableWorkerInfos.size() * availableWorkerInfos.size())來建立,這個數目要比workers數目大,得到這個數目後,會將分割槽安排到不同的work上,並將該partionToWorker這個資訊也以znode的形式掛到zookeeper上。
其他的worker在master生成splits資訊和partion資訊之前一直處於同步等待階段,當master將資訊掛載到zookeeper後,worker從zookeeper上讀取splits資訊,並且開啟執行緒進行資料載入,資料載入過程線面會詳細講解。載入完資料以後,各個worker開始將自己load進來的點計算它屬於的分割槽號,並查到屬於哪個計算節點,並將其傳送到相應計算節點。
4. 最後,當資料都載入到各個worker上以後開始進行迭代計算。
五、 Giraph中涉及到多執行緒的地方
在Giraph中,主要涉及到多執行緒的有兩個地方,一個是worker在將split中所指的資料載入到記憶體時使用了執行緒;另一個是worker在迭代中對頂點計算時候使用了多執行緒。
先說第一個,當master把splits資訊放到zookeeper上以後,每個Worker建立N個InputsCallable執行緒讀取資料。N=Min(NUM_INPUT_THREADS,maxInputSplitThread),其中NUM_INPUT_THREADS預設值為1,maxInputSplitThread=InputSplitSize-1/maxWorkers+1。那麼,預設每個worker就是建立一個執行緒來載入資料。在每個執行緒中,它們會不停的遍歷splits列表上的資訊,當遇到一個沒有被載入的splits時,首先會在zookeeper上建立相應splits的正在載入標籤,然後載入完以後會建立已載入標籤。當載入完這個split之後繼續進行上述過程,進行載入。
再說第二個,當worker把資料載入到記憶體並且已經將頂點發送到屬於它的worker上以後,每個worker會得到若干個分割槽(原因在前面已經講過)。此時開始進行迭代計算,也是pregel的核心內容。具體的,每個worker會開啟N個執行緒來進行計算,此時worker會將本地的partionID儲存到一個佇列,partionID所對應的資料會放在一個叫做serviceWorker的物件中儲存。每個執行緒中,它會不停地遍歷屬於本worker上的partion,對每一個partion上的頂點進行遍歷執行使用者定義的compute函式,直到所有partion完成。
需要說明的是,上述的“N”是可以自定義的,不過一般都是採用預設。