Giraph原始碼分析(三)—— 訊息通訊
由前文知道每個BSPServiceWorker有一個WorkerServer物件,WorkerServer物件裡面又有ServerData物件,作為資料實。ServerData中包含該Worker的partitionStore、edgeStore、incomingMessageStore、currentMessageStore、聚集值等。其中incomingMessageStore物件為MessageStoreByPartition(介面)型別,也就是說訊息時按照分割槽來儲存的。MessageStoreByPartition介面的關係圖如下:
在SimpleMessageStore抽象類中,有一個ConcurrentMap<Integer,ConcurrentMap<I,T>>型別的變數map,用來儲存訊息。第一層是pairtitionID到傳送到該partition訊息的對映;第二層是VertexID 到傳送給該Vertex的訊息佇列。
《Giraph通訊模組分析》:http://my.oschina.net/skyaugust/blog/95182
每個頂點的訊息列表具體為ExtendedDataOutput型別,它繼承DataOutput介面,增加了幾個方法而已。每個訊息是以位元組形式寫入到ExtendedDataOutput物件中的。
傳送訊息時,採用非同步式通訊。
圖頂點的計算處理與訊息通訊併發執行,在計算過程中就可以傳送訊息,將大規模訊息傳送分散在不同的時間段,避免瞬時網路通訊阻塞,但是接受端需要額外的空間,儲存臨時接收到的訊息,相當於空間換時間。而集中式通訊,圖頂點的計算處理與訊息通訊序列進行,在計算完畢後,統一發送訊息,控制和實現方式簡單,可在傳送端對訊息進行最大程度優化,但容易造成瞬時間的網路通訊阻塞以及增加發送端的訊息儲存開銷。
不同Worker間的訊息通訊使用RPC方式,具體為Netty。同一Worker內,連續兩次迭代的訊息直接通過記憶體操作,把要傳送的訊息直接複製到Worker的incomingMessageStore中。下面詳述訊息的儲存格式和傳送機制。
Giraph使用Cache來快取訊息,當訊息達到一定閾值後,一次性發送。
既按照bulk模式進行,不會一條一條資訊傳送。向某個頂點發送的訊息是按照<destVertexId,Message> pair儲存在ByteArrayVertexIdData<I,T>中(實際為ByteArrayVertexIdMessages<I,M>型別)。介紹如下: org.apache.giraph.utils.ByteArrayVertexIdData<I,T>
功能:把<頂點ID,data> Pair 儲存在一個 byte陣列中。裡面有 ExtendedDataOutput物件用來儲存資料。
該類中還有一個內部類:VertexIdDataIterator,該內部類繼承 VertexIdIterator類。
org.apache.giraph.comm.SendCache用來快取傳送的資訊,然後以“Bulk”模式傳送。在Giraph中,每個Worker上可以對應多個分割槽。訊息快取的閾值是以Worker為單位計算,而不是Partition。
SendCache中有ByteArrayVertexIdData<I,T>[ ] dataCache陣列用來儲存傳送給每個Partition的訊息;有int[ ] dataSizes陣列用於記錄向每個Worker傳送的訊息大小,若大於MAX_MSG_REQUEST_SIZE(預設為512KB)就把此Worker上的所有Partition快取的訊息傳送到給該Worker,同一Worker內訊息也是如此快取;有int[ ] initBufferSizes陣列用於記錄每個Worker上的每個Partition的初始化ByteArrayVertexIdData中ExtendedDataOutput物件的大小,同一Worker上的所有Partition初始值相同,該值為平均值。記MAX_MSG_REQUEST_SIZE(message request size)值為M, 該Worker上有P個 partitions,ADDTITIONNAL_MSG_REQUEST_SIZE(比平均值大的因子)預設為0.2f,記為A。則每個Partition的初始大小為:M*(1+A) / P .
由前文知道,每個Worker都有一個NettyWorkerClientRequestProcessor<I,V,E,M>用來發送訊息。該類中有SendMessageCache物件用來快取向外傳送的資訊。NettyWorkerClientRequestProcessor類中的sendMessageRequest(I,M)
方法如下,用於向某個頂點destVertexId傳送訊息message。
方法解釋:首先根據destVertexId得到對應的partitionId和WorkerInfo,然後把訊息add到SendMessageCache中,並返回向該頂點所屬Worker傳送的訊息大小workerMessageSize。若該值大於預設值512KB,則把此Worker對應的所有Partition訊息從SendMessageCache中刪除,把刪除的訊息賦值給workerMessages,其型別為PairList<Integer,ByteArrayVertexIdMessages<I,M>> ,key為partitionId,value為傳送給該partition的訊息列表,最後呼叫doRequest()方法傳送資訊。doRequest()方法如下:
可以看到在傳送訊息時,先判斷是否在同一Worker上。如果是的話,呼叫SendWorkerMessagesRequest<T,M>的doRequest傳送訊息;否則使用WorkerClient(底層使用Netty)進行訊息傳送。下面著重討論同一Worker內的機制。
org.apache.giraph.comm.requests.SendWorkerMessagesRequest類中的doRequest方法如下:
引數為該Worker的ServerData,程式碼中的partitionVertexData實際為PairList<Integer,ByteArrayVertexIdMessages<I,M>>workerMessages。遍歷<partitionID,對應的訊息列表>來新增到ServerData中的incomingMessageStore中。
ByteArrayMessagesPerVertexStore類中的addPartitionMessages()方法如下:
當用戶使用了Combiner,incomingMessageStore對應的型別則為OneMessagePerVertexStore,該類為每個頂點只儲存一個訊息,而非訊息佇列。 結構如下圖:
當新增一條訊息時,會把頂點已對應的訊息和要新增的訊息呼叫combine()方法進行合併,然後儲存在上述結構圖中。addPartitionMessages()方法如下:
在ComputeCallable中的call()方法呼叫computePartition(Partition)計算完所有Partition上的頂點後,呼叫WorkerClientRequestProcessor.flush()方法把所有剩餘的訊息傳