1. 程式人生 > >spark shuffle讀操作

spark shuffle讀操作

提出問題

1. shuffle過程的資料是如何傳輸過來的,是按檔案來傳輸,還是隻傳輸該reduce對應在檔案中的那部分資料?

2. shuffle讀過程是否有溢位操作?是如何處理的?

3. shuffle讀過程是否可以排序、聚合?是如何做的?

。。。。。。

概述

在 spark shuffle的寫操作之準備工作 中的 ResultTask 和 ShuffleMapTask 看到了,rdd讀取資料是呼叫了其 iterator 方法。

計算或者讀取RDD

org.apache.spark.rdd.RDD#iterator原始碼如下,它是一個final方法,只在此有實現,子類不允許重實現這個方法:

思路:如果是已經快取下來了,則呼叫 org.apache.spark.rdd.RDD#getOrCompute 方法,通過底層的儲存系統或者重新計算來獲取父RDD的map資料。否則呼叫 org.apache.spark.rdd.RDD#computeOrReadCheckpoint ,從checkpoint中讀取或者是通過計算來來獲取父RDD的map資料。

我們逐一來看其依賴方法:

org.apache.spark.rdd.RDD#getOrCompute 原始碼如下:

首先先通過Spark底層的儲存系統獲取 block。如果底層儲存沒有則呼叫 org.apache.spark.rdd.RDD#computeOrReadCheckpoint,其原始碼如下:

主要通過三種途徑獲取資料 -- 通過spark 底層的儲存系統、通過父RDD的checkpoint、直接計算。

處理返回的資料

讀取完畢之後,資料的處理基本上一樣,都使用 org.apache.spark.InterruptibleIterator 以迭代器的形式返回,org.apache.spark.InterruptibleIterator 原始碼如下:

比較簡單,使用委託模式,將迭代下一個行為委託給受委託類。

 

下面我們逐一來看三種獲取資料的實現細節。

通過spark 底層的儲存系統

其核心原始碼如下:

思路:首先先從本地或者是遠端executor中的儲存系統中獲取到block,如果是block存在,則直接返回,如果不存在,則呼叫 computeOrReadCheckpoint計算或者通過讀取父RDD的checkpoint來獲取RDD的分割槽資訊,並且將根據其持久化級別(即StorageLevel)將資料做持久化。 關於持久化的內容 可以參考 Spark 原始碼分析系列 中的 Spark儲存部分 做深入瞭解。

通過父RDD的checkpoint

其核心原始碼如下:

通過父RDD的checkpoint也是需要通過spark底層儲存系統或者是直接計算來得出資料的。

不做過多的說明。

下面我們直接進入主題,看shuffle的讀操作是如何進行的。

直接計算

其核心方法如下:

首先,org.apache.spark.rdd.RDD#compute是一個抽象方法。

我們來看shuffle過程reduce的讀map資料的實現。

表示shuffle結果的是 org.apache.spark.rdd.ShuffledRDD。

其compute 方法如下:

整體思路:首先從 shuffleManager中獲取一個 ShuffleReader 物件,並呼叫該reader物件的read方法將資料讀取出來,最後將讀取結果強轉為Iterator[(K,C)]

該shuffleManager指的是org.apache.spark.shuffle.sort.SortShuffleManager。

其 getReader 原始碼如下:

簡單來說明一下引數:

handle:是一個ShuffleHandle的例項,它有三個子類,可以參照 spark shuffle的寫操作之準備工作 做深入瞭解。

startPartition:表示開始partition的index

endPartition:表示結束的partition的index

context:表示Task執行的上下文物件

其返回的是一個 org.apache.spark.shuffle.BlockStoreShuffleReader 物件,下面直接來看這個物件。

BlockStoreShuffleReader

這個類的繼承關係如下:

其中ShuffleReader的說明如下:

Obtained inside a reduce task to read combined records from the mappers.

ShuffleReader只有一個read方法,其子類BlockStoreShuffleReader也比較簡單,也只有一個實現了的read方法。

下面我們直接來看這個方法的原始碼。

在上圖,把整個流程劃分為5個步驟 -- 獲取block輸入流、反序列化輸入流、新增監控、資料聚合、資料排序。

下面我們分別來看這5個步驟。這5個流程中輸入流和迭代器都沒有把大資料量的資料一次性全部載入到記憶體中。並且即使在資料聚合和資料排序階段也沒有,但是會有資料溢位的操作。我們下面具體來看每一步的具體流程是如何進行的。 

獲取block輸入流

其核心原始碼如下:

我們先來對 ShuffleBlockFetcherIterator 做進一步瞭解。

使用ShuffleBlockFetcherIterator獲取輸入流

這個類就是用來獲取block的輸入流的。

blockId等相關資訊傳入構造方法

其構造方法如下:

它繼承了Iterator trait,是一個 [(BlockId,InputStream)] 的迭代器。

對構造方法引數做進一步說明:

context:TaskContext,是作業執行的上下文物件

shuffleClieent:預設為 NettyBlockTransferService,如果使用外部shuffle系統則使用 ExternalShuffleClient

blockManager:底層儲存系統的核心類

blocksByAddress:需要的block的blockManager的資訊以及block的資訊。

通過 org.apache.spark.MapOutputTracker#getMapSizesByExecutorId 獲取,其原始碼如下:

org.apache.spark.MapOutputTrackerWorker#getStatuses 其原始碼如下:

思路:如果有shuffleId對應的MapStatus則返回,否則使用 MapOutputTrackerMasterEndpointRef 請求 driver端的 MapOutputTrackerMaster 返回 對應的MapStatus資訊。

 org.apache.spark.MapOutputTracker#convertMapStatuses 原始碼如下:

思路:將MapStatus轉換為一個可以迭代檢視BlockManagerId、BlockId以及對應大小的迭代器。

streamWrapper:輸入流的解密以及解壓縮操作的包裝器,其依賴方法 org.apache.spark.serializer.SerializerManager#wrapStream 原始碼如下:

這部分在 spark 原始碼分析之十三 -- SerializerManager剖析 部分有相關剖析,不再說明。

maxBytesInFlight: max size (in bytes) of remote blocks to fetch at any given point. 
maxReqsInFlight: max number of remote requests to fetch blocks at any given point.
maxBlocksInFlightPerAddress: max number of shuffle blocks being fetched at any given point
maxReqSizeShuffleToMem: max size (in bytes) of a request that can be shuffled to memory.
detectCorrupt: whether to detect any corruption in fetched blocks.

讀取資料

在迭代方法next中不斷去讀取遠端的block以及本地的block輸入流。不做詳細剖析,見 ShuffleBlockFetcherIterator.scala 中next 相關方法的剖析。

反序列化輸入流

核心方法如下:

其依賴方法 scala.collection.Iterator#flatMap 原始碼如下:

 可見,即使是在這裡,資料並沒有全部落到記憶體中。流跟管道的概念很類似,資料並沒有一次性載入到記憶體中。它只不過是在使用迭代器的不斷銜接,最終形成了新的處理鏈。在這個鏈中的每一個環節,資料都是懶載入式的被載入到記憶體中,這在處理大資料量的時候是一個很好的技巧。當然也是責任鏈的一種具體實現方式。

新增監控

其實這一步跟上一步本質上區別並不大,都是在責任鏈上添加了一個新的環節,其核心原始碼如下:

其中,核心方法 scala.collection.Iterator#map 原始碼如下:

又是一個新的迭代器處理環節被加到責任鏈中。

資料聚合

資料聚合其實也很簡單。

其核心原始碼如下:

在聚合的過程中涉及到了資料的溢位操作,如果有溢位操作還涉及 ExternalSorter的溢位合併操作。

其核心原始碼不做進一步解釋,有興趣可以看 spark shuffle寫操作三部曲之SortShuffleWriter 做進一步瞭解。

資料排序

資料排序其實也很簡單。如果使用了排序,則使用ExternalSorter則在分割槽內部進行排序。

其核心原始碼如下:

其內部使用了ExternalSorter進行排序,其中也涉及到了溢位操作的處理。有興趣可以看 spark shuffle寫操作三部曲之SortShuffleWriter 做進一步瞭解。

總結

主要從實現細節和設計思路上來說。

實現細節

首先在實現細節上,先使用ShuffleBlockFetcherIterator獲取本地或遠端節點上的block並轉化為流,最終返回一小部分資料的迭代器,隨後序列化、解壓縮、解密流操作被放在一個迭代器中該迭代器後執行,然後添加了監控相關的迭代器、資料聚合相關的迭代器、資料排序相關的迭代器等等。這些迭代器保證了處理大量資料的高效性,在資料聚合和排序階段,大資料量被不斷溢位到磁碟中,資料最終還是以迭代器形式返回,確保了記憶體不會被大資料量佔用,提高了資料的吞吐量和處理資料的高效性。

設計思路

在設計上,主要說三點:

  1. 責任鏈和迭代器的混合使用,即使得程式易擴充套件,處理環節可插拔,處理流程清晰易懂。
  2. 關於聚合和排序的使用,在前面文章中shuffle寫操作也提到了,聚合和排序的類是獨立出來的,跟shuffle的處理耦合性很低,這使得在shuffle的讀和寫階段的資料記憶體排序聚合溢位操作的處理類可以重複使用。
  3. shuffle資料的設計也很巧妙,shuffle的資料是按reduceId分割槽的,分割槽資訊被儲存在索引檔案中,這使得每一個reduce task只需要取得一個檔案中屬於它分割槽的那部分shuffle資料就可以了,極大地減少無用了資料量的網路傳輸,提高了shuffle的效率。還值得說的是,shuffle資料的格式是一個約定,不管map階段的資料是如何被處理,最終資料形式肯定是約定好的,這使得map和reduce階段的處理類之間的耦合性大大地降低。

至此,spark 的shuffle階段的細節就徹底剖析完畢。

最後,明天週末,玩得開