1. 程式人生 > >spark的外排:AppendOnlyMap與ExternalAppendOnlyMap

spark的外排:AppendOnlyMap與ExternalAppendOnlyMap

數據大小 iou 簡單 容量 .cn ner 實現思路 next 返回值

相信很多人和我一樣, 在控制臺中總是可以看到會打印出如下的語句:

INFO ExternalAppendOnlyMap: Thread 94 spilling in-memory map of 63.2 MB to disk (7 times so far)

經過查詢一下,摘抄入下:

AppendOnlyMap/ExternalAppendOnlyMap在spark被廣泛使用,如join中shuffle的reduce階段以及combineByKey操作等。

AppendOnlyMap
AppendOnlyMap是spark自己實現的Map,只能添加數據,不能remove。該Map是使用開放定址法中的二次探測法,不用自帶的HashMap等應該是節省空間,提高性能。


數組中a=keyi, a[i+1]=valuei, 即用兩個位置來存儲kv對。
growThreshold=LOAD_FACTOR * capacity, 當添加的元素超過該值時,數組會進行grow, capacity翻倍,同時所有的kv都會進行rehash重新分配位置。
主要方法:

1.apply
即調用Map(key)訪問value時。根據key的hash(具體是murmur3_32)找到位置,如果目標位置的key與要查找的key不一樣,則使用二次探測法繼續查找,直到找到為止。

2.update
找到對應key的value位置,將新value覆蓋原value。

3.changeValue
spark中用的比較多的方法

該方法最核心的其實是外部傳進來的updateFunc(hadValue, oldValue), updateFunc一般是當hadValue為false時createCombiner(v)作為新value, hadValue為true時mergeValue(oldValue,v),將v加到oldValue中。

iterator
一般是在RDD的compute中會調用該方法,作為RDD操作的Iterator, 即其下遊RDD可以為此為數據源。主要也是實現hasNext和next方法,它們都調用了nextValue方法。
nextValue則從pos(初始化是0)開始遍歷,直到找到data(2*pos)!=null的,則將結果返回。
hasNext是判斷nextValue()!=null。
next是得到nextValue()的返回值,且將pos+=1。

destructiveSortedIterator
轉化成一般的數組,並按key對kv進行排序。失去了Map的特性。主要用於外排時將Map排序輸出到disk中。
實現思路是:將原數組的值不斷向數組左端緊湊移動,且將原先占用兩個位置的kv轉成(k,v)只占一個位置,然後對數組按key進行kv排序。排序方法是KCComparator,即按key的hashcode進行排序。然後創建一個Iterator,其hasNext和next都是對新的數組進行相應遍歷操作。

spark早期版本采用的是AppendOnlyMap來實現shuffle reduce階段數據的聚合,當數據量不大時沒什麽問題,但當數據量很大時就會占用大量內存,最後可能OOM。所以從spark 0.9開始就引入了ExternalAppendOnlyMap來代替AppendOnlyMap。

ExternalAppendOnlyMap
note:當spark.shuffle.spill=true時會啟用ExternalAppendOnlyMap,默認為true. 為false時就啟用AppendOnlyMap

技術分享

ExternalAppendOnlyMap也在內存維護了一個SizeTrackingAppendOnlyMap(繼承於AppendOnlyMap),當該Map元素數超過一定值時就spill到磁盤。最後ExternalAppendOnlyMap其實是維護了一個內存Map:currentMap以及多個diskMap:spillMaps。

主要屬性和參數:

currentMap
SizeTrackingAppendOnlyMap,繼承於AppendOnlyMap。是ExternalAppendOnlyMap的內存Map。

spilledMaps
new ArrayBuffer[DiskMapIterator], 每個DiskMapIterator都指向了相應的spill到disk上的文件數據。

maxMemoryThreshold
該值決定了用於該worker上同時運行的任務的currentMap的大小之和,即num(running tasks) * size(各task的currentMap)。該值由spark.shuffle.memoryFraction和spark.shuffle.safetyFraction決定,具體計算方式如下:

val maxMemoryThreshold = {
     val memoryFraction = sparkConf.getDouble("spark.shuffle.memoryFraction", 0.3)
   val safetyFraction = sparkConf.getDouble("spark.shuffle.safetyFraction", 0.8)
   (Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong //即worker的內存*0.24
}

insert
插入kv對的主要方法。
shouldSpill是剩余空間是否足夠讓currentMap進行擴容,夠的話進行大小翻倍,不夠的話則將currentMap spill到disk中。
這裏需要判斷是否需要進行shouldSpill判斷,具體判斷邏輯如下:

numPairsInMemory > trackMemoryThreshold && currentMap.atGrowThreshold


numPairsInMemory為已經添加的kv數,trackMemoryThreshold為固定值1000。也就是前1000個元素是可以直接往currentMap放而不會發生spill。
由於currentMap初始時可容納kv的個數為64,則在numPairsInMemory > trackMemoryThreshold前currentMap還是會發生幾次grow。當numPairsInMemory > trackMemoryThreshold時,則currentMap本次到達growThreshold時就要進行shouldSpill的判斷。

  • 當這個結果是false時,則未達到需要進行shouldSpill判斷的條件,則直接currentMap.changeValue(key, update)將kv更新到currentMap中。
  • 當這個結果是true時,則需要進行shouldSpill到disk判斷。


shouldSpill判斷的具體步驟為:根據maxMemoryThreshold以及目前正在運行的其他task的currentMap大小 來判斷是否有足夠內存來讓currentMap的大小翻倍。

  val threadId = Thread.currentThread().getId
  val previouslyOccupiedMemory = shuffleMemoryMap.get(threadId)
  val availableMemory = maxMemoryThreshold -
  (shuffleMemoryMap.values.sum - previouslyOccupiedMemory.getOrElse(0L))
  // Assume map growth factor is 2x
  shouldSpill = availableMemory < mapSize * 2

  • shouldSpill=false:讓 shuffleMemoryMap(threadId) = mapSize * 2, 即讓當前任何占用2倍的空間。 currentMap的擴容會發生之後的currentMap.changeValue裏。
  • shouldSpill=true: 進行spill操作。



spill
將currentMap寫到disk上。具體步驟為:
1、通過currentMap.destructiveSortedIterator(KCComparator)將currentMap變成按key的hashCode進行排序的數組,並封裝成相應的Iterator。
2、遍歷1得到的Iterator,將kv write到DiskBlockObjectWriter中,但寫入量objectsWritten達到serializerBatchSize(批量寫到文件的記錄數,由spark.shuffle.spill.batchSize控制,默認是10000,太小的話則在寫文件時性能變低)時進行writer.flush()將之前的數據寫到文件中,並將spill到磁盤的大小記錄到batchSizes中,batchSizes記錄了每次spill時的數據大小,便於之後的讀取(因為批量寫到磁盤時經過了壓縮序列化,所以讀取時要讀取與寫時等量的數據才可以正常的解壓反序列化,所以batchSizes十分重要)
3、不斷重復2直到將currentMap的數據全部寫到文件中。
4、生成一個DiskMapIterator(用於讀取文件數據),將加到spillMaps中。這裏會將batchSizes放到DiskMapIterator並於從文件讀取數據。
4、reset工作:

  • 生成新的currentMap。
  • shuffleMemoryMap(Thread.currentThread().getId)=0即將使用的currentMap容量清0。
  • numPairsInMemory重置為0.



iterator
一般是在RDD的compute中會調用該方法,作為RDD操作的Iterator, 即其下遊RDD可以為此為數據源。

  • 當spillMaps為空,即只有currentMap,從未spill到disk時,直接調用currentMap.iterator()
  • 當spillMaps不空時,則要進行外排過程ExternalIterator(和Hadoop的reduce的sort階段以及hbase的memStore、storeFile遍歷類似)




ExternalIterator
外排的主要思想:各個Iterator已經按key.hashcode排好序,利用一個優先隊列保存各個Iterator, hasNext是看優先隊列是否有元素,next則是返回當前最小hashcode的最小key對應的所有value合並成的combine,即(minKey,minCombiner)。
具體實現如下:
1、各個Iterator: 由currentMap.destructiveSortedIterator形成的Iterator以及spillMaps中的DiskMapIterator
2、優先隊列為mergeHeap = new mutable.PriorityQueue[StreamBuffer],StreamBuffer的主要方法如下:

private case class StreamBuffer(iterator: Iterator[(K, C)], pairs: ArrayBuffer[(K, C)])

  extends Comparable[StreamBuffer] {



  def isEmpty = pairs.length == 0

  // Invalid if there are no more pairs in this stream

  def minKeyHash = {

    assert(pairs.length > 0)

    pairs.head._1.hashCode()

  }

  override def compareTo(other: StreamBuffer): Int = {

    // descending order because mutable.PriorityQueue dequeues the max, not the min

    if (other.minKeyHash < minKeyHash) -1 else if (other.minKeyHash == minKeyHash) 0 else 1

  }

}

}

  


StreamBuffer存的是某個Iterator,以及該Iterator按某個key.hashCode聚合的結果。其compareTo決定了其在mergeHeap的位置。StreamBuffer的key.hashCode都是一樣的,這樣minKeyHash可以從其存儲的數據集中隨便取一個就行。這裏會讓hashCode相同的兩個key同時存到這個StreamBuffer中,也就是key不相同,這裏會有問題嗎,後面的講到的mergeIfKeyExists會進行key是否相同的判斷。


3、將各個Iterator轉成StreamBuffer, 這個過程需要獲得各個Iterator最小的keyHash對應的所有kv對,具體實現是getMorePairs方法。

private def getMorePairs(it: Iterator[(K, C)]): ArrayBuffer[(K, C)] = {
  val kcPairs = new ArrayBuffer[(K, C)]
  if (it.hasNext) {
    var kc = it.next()
    kcPairs += kc
    val minHash = kc._1.hashCode()
    while (it.hasNext && kc._1.hashCode() == minHash) {
      kc = it.next()
      kcPairs += kc
    }
  }
  kcPairs
}


該方法十分簡單,就是獲得第一個key.hashCode即最小的minHash(因為Iterator已經按key.hashCode排好序),然後獲得和minHash相同的所有kv對。
4、hasNext:mergeHeap優先隊列是否為空
5、next: 外排的核心邏輯。
a、mergeHeap.dequeue()將隊列頂最小的StreamBuffer出隊列並加到mergedBuffers(mergedBuffers為了記錄出隊的StreamBuffer,便於下一輪繼續加入)中,得到minHash,以及(minKey, minCombiner)。
b、然後要去剩下的StreamBuffer上獲得和minHash相同的kv對,並與(minKey, minCombiner)進行合並。從隊列頂不斷dequeue與minHash相同的StreamBuffer並加到mergedBuffers中,每取到一個StreamBuffer則進行value合並,合並具體調用mergeIfKeyExists。

private def mergeIfKeyExists(key: K, baseCombiner: C, buffer: StreamBuffer): C = {
  var i = 0
  while (i < buffer.pairs.length) {
    val (k, c) = buffer.pairs(i)
    if (k == key) {
      buffer.pairs.remove(i)
      //baseCombiner即b中的minCombiner。這裏mergeCombiners的原因是在currentMap中的updateFunc時生成的是Combiner
      return mergeCombiners(baseCombiner, c)
    }
    i += 1
  }
  baseCombiner
}


這裏只有與minKey相同的kv才會被選取與minCombiner進行合並且從對應的StreamBuffer中移除,否則仍保留。
c、遍歷mergedBuffers即dequeue的各StreamBuffer判斷其是否還有kv對,沒有的話則重新調用getMorePairs獲得下一波kv對。 然後將StreamBuffer再次enqueue到mergeHeap中進行重新排序。當然如果某個StreamBuffer還是沒kv對,則說明對應的Iterator已經遍歷完,不需要再加到mergeHeap中。
d、返回(minKey,minCombiner)

DiskMapIterator
從disk文件讀取數據形成Iterator。
hasNext:是否讀到文件末尾
next: 先調用nextBatchStream()將batchSizes.remove(0)即當前要讀的數據量的數據讀到bufferedStream中,然後每次next都從該緩存中獲得kv對,當緩存中數據取完時又調用nextBatchStream()重新從文件批量讀取下塊數據

spark的外排:AppendOnlyMap與ExternalAppendOnlyMap