1. 程式人生 > >spark2.3原始碼分析之in-memory collection

spark2.3原始碼分析之in-memory collection

AppendOnlyMap

概述

一個只可以新增資料的hash table的實現。它的key值永遠不會刪除,而每個key的value值可能會改變。

該hash table使用開放探測方法中的二次探測法儲存資料,所以內部只有一個數組的資料結構。

該hash table的大小始終為2的冪次方,最多可以支援0.7 * 2 ^ 29個元素。

該hash table為了記憶體本地性,在同一個陣列中儲存key和value值;更明確的說,元素的順序是key0, value0, key1, value1, key2, value2....

 該AppendOnlyMap允許null作為key。當null作為key時,返回的value值也為null。

除了沒有提供刪除功能外,它提供了一個map應有的插入、修改、擴容、查詢、迭代功能:

  • 插入:update方法實現。設定key和value值。
  • 修改:changeValue方法實現。修改key的value值。
  • 擴容:growTable方法實現。將該table雙倍擴容,並所有元素重雜湊。
  • 查詢:apply方法實現。用於獲取給定key的value值。
  • 迭代:destructiveSortedIterator方法實現。按照給定比較器的排序順序返回該map的迭代器,該方法不需要使用額外的記憶體就能將map上的資料排序,但是會破壞map的有效性,底層的陣列結構不能再被使用。

AppendOnlyMap可以基於比較器的排序順序返回該map的迭代器,在這一點上與SortedMap類似。

/**
 * :: DeveloperApi ::
 * A simple open hash table optimized for the append-only use case, where keys
 * are never removed, but the value for each key may be changed.
 *
 * This implementation uses quadratic probing with a power-of-2 hash table
 * size, which is guaranteed to explore all spaces for each key (see
 * http://en.wikipedia.org/wiki/Quadratic_probing).
 *
 * The map can support up to `375809638 (0.7 * 2 ^ 29)` elements.
 *
 * TODO: Cache the hash values of each key? java.util.HashMap does that.
 */
@DeveloperApi
class AppendOnlyMap[K, V](initialCapacity: Int = 64)
  extends Iterable[(K, V)] with Serializable {

  import AppendOnlyMap._

  require(initialCapacity <= MAXIMUM_CAPACITY,
    s"Can't make capacity bigger than ${MAXIMUM_CAPACITY} elements")
  require(initialCapacity >= 1, "Invalid initial capacity")

  private val LOAD_FACTOR = 0.7

  private var capacity = nextPowerOf2(initialCapacity)
  private var mask = capacity - 1
  private var curSize = 0
  private var growThreshold = (LOAD_FACTOR * capacity).toInt

  // Holds keys and values in the same array for memory locality; specifically, the order of
  // elements is key0, value0, key1, value1, key2, value2, etc.
  //為了記憶體本地性在同一個陣列中儲存key和value值;
  //更明確的說,元素的順序是key0, value0, key1, value1, key2, value2....
  private var data = new Array[AnyRef](2 * capacity)

  // Treat the null key differently so we can use nulls in "data" to represent empty items.
  private var haveNullValue = false
  private var nullValue: V = null.asInstanceOf[V]

  // Triggered by destructiveSortedIterator; the underlying data array may no longer be used
  private var destroyed = false
  private val destructionMessage = "Map state is invalid from destructive sorting!"

  /** Get the value for a given key */
//獲取給定key的value值
  def apply(key: K): V = {
    assert(!destroyed, destructionMessage)
    val k = key.asInstanceOf[AnyRef]
    if (k.eq(null)) {
      return nullValue
    }
//獲取key的hashCode並和mask相與,獲取元素應該存放的陣列下標(通過pos*2和pos*2+1)
    var pos = rehash(k.hashCode) & mask
    var i = 1
    while (true) {
//key在data陣列中的下標為0,2,4,6
//假設data陣列只儲存key,則下標為0,1,2,3..
//所以2*pos為key在data陣列中的下標,2*pos+1為相應value在data陣列中的下標
      val curKey = data(2 * pos)
      if (k.eq(curKey) || k.equals(curKey)) {
	     //返回value
        return data(2 * pos + 1).asInstanceOf[V]
      } else if (curKey.eq(null)) {
        return null.asInstanceOf[V]
      } else {
	    //目標位置的key與要查詢的key不一樣,則使用二次探測法繼續查詢
	    //使用開放地址法的二次探測法繼續探測
		//二次探測就是線上性探測上做一個修改而成的,
		//線性探測中,遇到衝突就自增1,而二次探測中,就是把這個自增1去掉換成一個固定值或自定義值
		//pos + delta是將pos向前偏移delta個位置
		//& mask是防止向前偏移delta個位置後超出陣列下標
        val delta = i
        pos = (pos + delta) & mask
        i += 1
      }
    }
    null.asInstanceOf[V]
  }

  /** Set the value for a key */
  def update(key: K, value: V): Unit = {
    assert(!destroyed, destructionMessage)
    val k = key.asInstanceOf[AnyRef]
    if (k.eq(null)) {
      if (!haveNullValue) {
        incrementSize()
      }
      nullValue = value
      haveNullValue = true
      return
    }
	//獲取key的hashCode並和mask相與
    var pos = rehash(key.hashCode) & mask
    var i = 1
    while (true) {
      val curKey = data(2 * pos)
      if (curKey.eq(null)) {
        data(2 * pos) = k
        data(2 * pos + 1) = value.asInstanceOf[AnyRef]
        incrementSize()  // Since we added a new key
        return
      } else if (k.eq(curKey) || k.equals(curKey)) {
        data(2 * pos + 1) = value.asInstanceOf[AnyRef]
        return
      } else {
	    //使用開放地址法的二次探測法繼續探測
        val delta = i
        pos = (pos + delta) & mask
        i += 1
      }
    }
  }

  /**
   * Set the value for key to updateFunc(hadValue, oldValue), where oldValue will be the old value
   * for key, if any, or null otherwise. Returns the newly updated value.
   */
  def changeValue(key: K, updateFunc: (Boolean, V) => V): V = {
    assert(!destroyed, destructionMessage)
    val k = key.asInstanceOf[AnyRef]
    if (k.eq(null)) {
      if (!haveNullValue) {
        incrementSize()
      }
      nullValue = updateFunc(haveNullValue, nullValue)
      haveNullValue = true
      return nullValue
    }
    var pos = rehash(k.hashCode) & mask
    var i = 1
    while (true) {
      val curKey = data(2 * pos)
      /*updateFunc函式的作用是,如果有值執行aggregator的mergeValue函式,如果沒值執行aggregator的createCombiner函式
     */
      if (curKey.eq(null)) {
       //updateFunc函式執行aggregator的createCombiner函式
        val newValue = updateFunc(false, null.asInstanceOf[V])
	  //設定key值:data(2*pos)為k 
        data(2 * pos) = k
	//設定value值:data(2 * pos + 1)為newValue
        data(2 * pos + 1) = newValue.asInstanceOf[AnyRef]
        incrementSize()
        return newValue
      } else if (k.eq(curKey) || k.equals(curKey)) {
       //updateFunc函式執行aggregator的mergeValue函式,將舊值合併成新值
        val newValue = updateFunc(true, data(2 * pos + 1).asInstanceOf[V])
        data(2 * pos + 1) = newValue.asInstanceOf[AnyRef]
        return newValue
      } else {
	    //使用開放地址法的二次探測法繼續探測
        val delta = i
        pos = (pos + delta) & mask
        i += 1
      }
    }
    null.asInstanceOf[V] // Never reached but needed to keep compiler happy
  }

  /** Iterator method from Iterable */
  override def iterator: Iterator[(K, V)] = {
    assert(!destroyed, destructionMessage)
    new Iterator[(K, V)] {
      var pos = -1

      /** Get the next value we should return from next(), or null if we're finished iterating */
	  //在next方法中會呼叫該方法,從而能返回下一個value值
      def nextValue(): (K, V) = {
        if (pos == -1) {    // Treat position -1 as looking at the null value
          if (haveNullValue) {
            return (null.asInstanceOf[K], nullValue)
          }
          pos += 1
        }
        while (pos < capacity) {
          if (!data(2 * pos).eq(null)) {
		  //返回(k, v)鍵值對
            return (data(2 * pos).asInstanceOf[K], data(2 * pos + 1).asInstanceOf[V])
          }
          pos += 1
        }
        null
      }

      override def hasNext: Boolean = nextValue() != null

      override def next(): (K, V) = {
        val value = nextValue()
        if (value == null) {
          throw new NoSuchElementException("End of iterator")
        }
        pos += 1
        value
      }
    }
  }

  override def size: Int = curSize

  /** Increase table size by 1, rehashing if necessary */
  private def incrementSize() {
    curSize += 1
    if (curSize > growThreshold) {
      growTable()
    }
  }

  /**
   * Re-hash a value to deal better with hash functions that don't differ in the lower bits.
   */
  private def rehash(h: Int): Int = Hashing.murmur3_32().hashInt(h).asInt()

  /** Double the table's size and re-hash everything */
  //將table雙倍擴容,並所有元素重雜湊
  protected def growTable() {
    // capacity < MAXIMUM_CAPACITY (2 ^ 29) so capacity * 2 won't overflow
	//新的capacity為原capacity的2倍 
    val newCapacity = capacity * 2
    require(newCapacity <= MAXIMUM_CAPACITY, s"Can't contain more than ${growThreshold} elements")
	//建立新陣列,因為要同時儲存key和value,所以新陣列的大小為2 * newCapacity
    val newData = new Array[AnyRef](2 * newCapacity)
	//新的掩碼
    val newMask = newCapacity - 1
    // Insert all our old values into the new array. Note that because our old keys are
    // unique, there's no need to check for equality here when we insert.
    var oldPos = 0
    while (oldPos < capacity) {
      if (!data(2 * oldPos).eq(null)) {
	    //獲取原data陣列的key和value
        val key = data(2 * oldPos)
        val value = data(2 * oldPos + 1)
		//重雜湊,獲取key值在新data陣列的下標位置
        var newPos = rehash(key.hashCode) & newMask
        var i = 1
        var keepGoing = true
        while (keepGoing) {
          val curKey = newData(2 * newPos)
          if (curKey.eq(null)) {
		    //設定在新data陣列的下標位置的key和value值
            newData(2 * newPos) = key
            newData(2 * newPos + 1) = value
            keepGoing = false
          } else {
		    //使用開放地址法的二次探測法繼續探測
            val delta = i
            newPos = (newPos + delta) & newMask
            i += 1
          }
        }
      }
      oldPos += 1
    }
    data = newData
    capacity = newCapacity
    mask = newMask
    growThreshold = (LOAD_FACTOR * newCapacity).toInt
  }

  //返回值總與引數n相等,
  private def nextPowerOf2(n: Int): Int = {
    val highBit = Integer.highestOneBit(n)
    if (highBit == n) n else highBit << 1
  }

  /**
   * Return an iterator of the map in sorted order. This provides a way to sort the map without
   * using additional memory, at the expense of destroying the validity of the map.
   */
//按照排序的順序返回該map的迭代器,該方法不需要使用額外的記憶體就能將map上的資料排序,但是會破壞
   //map的有效性
  def destructiveSortedIterator(keyComparator: Comparator[K]): Iterator[(K, V)] = {
    destroyed = true
    // Pack KV pairs into the front of the underlying array
    //將不為null的kv鍵值對移到底層陣列的前端,而null鍵和null值的kv對移到陣列末端
    var keyIndex, newIndex = 0
    while (keyIndex < capacity) {
      if (data(2 * keyIndex) != null) {
        data(2 * newIndex) = data(2 * keyIndex)
        data(2 * newIndex + 1) = data(2 * keyIndex + 1)
        newIndex += 1
      }
      keyIndex += 1
    }
    assert(curSize == newIndex + (if (haveNullValue) 1 else 0))
   
//在data陣列上排序
    new Sorter(new KVArraySortDataFormat[K, AnyRef]).sort(data, 0, newIndex, keyComparator)

//返回迭代器
    new Iterator[(K, V)] {
      var i = 0
      var nullValueReady = haveNullValue
      def hasNext: Boolean = (i < newIndex || nullValueReady)
      def next(): (K, V) = {
        if (nullValueReady) {
          nullValueReady = false
          (null.asInstanceOf[K], nullValue)
        } else {
          val item = (data(2 * i).asInstanceOf[K], data(2 * i + 1).asInstanceOf[V])
          i += 1
          item
        }
      }
    }
  }

  /**
   * Return whether the next insert will cause the map to grow
   */
  def atGrowThreshold: Boolean = curSize == growThreshold
}

//伴生物件
private object AppendOnlyMap {
  val MAXIMUM_CAPACITY = (1 << 29)
}

 SizeTrackingAppendOnlyMap

AppendOnlyMap的子類,可以追蹤位元組的預估大小。

/**
 * An append-only map that keeps track of its estimated size in bytes.
 */
private[spark] class SizeTrackingAppendOnlyMap[K, V]
  extends AppendOnlyMap[K, V] with SizeTracker
{
  override def update(key: K, value: V): Unit = {
    super.update(key, value)
    super.afterUpdate()
  }

  override def changeValue(key: K, updateFunc: (Boolean, V) => V): V = {
    val newValue = super.changeValue(key, updateFunc)
    super.afterUpdate()
    newValue
  }

  override protected def growTable(): Unit = {
    super.growTable()
    resetSamples()
  }
}

WritablePartitionedPairCollection

一個為儲存key/value鍵值對的size-tracking collection提供以下功能的通用介面。它有以下功能:

    1、每個鍵值對有一個與之關聯的partition;
    2、支援一個記憶體效率高的sorted iterator;
    3、支援一個WritablePartitionedIterator介面,用於將內容直接寫為位元組

該size-tracking collection指的其實就是:SizeTrackingAppendOnlyMap及其子類

/**
 * A common interface for size-tracking collections of key-value pairs that
 *
 *  - Have an associated partition for each key-value pair.
 *  - Support a memory-efficient sorted iterator
 *  - Support a WritablePartitionedIterator for writing the contents directly as bytes.
 */
private[spark] trait WritablePartitionedPairCollection[K, V] {
  /**
   * Insert a key-value pair with a partition into the collection
     插入一個鍵值對和一個partition到該集合中
   */
  def insert(partition: Int, key: K, value: V): Unit

  /**
   * Iterate through the data in order of partition ID and then the given comparator. This may
   * destroy the underlying collection.
   */
   //抽象方法,供子類實現,迭代資料按照它們的partitionId的順序,然後按照給定的comparator的順序。
   //這個可能會破壞底層的collection
  def partitionedDestructiveSortedIterator(keyComparator: Option[Comparator[K]])
    : Iterator[((Int, K), V)]

  /**
   * Iterate through the data and write out the elements instead of returning them. Records are
   * returned in order of their partition ID and then the given comparator.
   * This may destroy the underlying collection.
     呼叫partitionedDestructiveSortedIterator方法,使用比較器將集合上的元素排序,
     並返回排序後的集合的迭代器,然後用該迭代器迭代元素,寫入磁碟檔案。
   */
  def destructiveSortedWritablePartitionedIterator(keyComparator: Option[Comparator[K]])
    : WritablePartitionedIterator = {
    //抽象方法partitionedDestructiveSortedIterator()被子類實現後,在此處被呼叫,屬於模板設計模式的使用
    val it = partitionedDestructiveSortedIterator(keyComparator)
    //實現WritablePartitionedIterator trait,並建立一個例項物件
    new WritablePartitionedIterator {
	  //獲取當前的迭代元素
      private[this] var cur = if (it.hasNext) it.next() else null
      
	  //用DiskBlockObjectWriter寫入當前的迭代元素
      def writeNext(writer: DiskBlockObjectWriter): Unit = {
        writer.write(cur._1._2, cur._2)
        cur = if (it.hasNext) it.next() else null
      }

      def hasNext(): Boolean = cur != null

      def nextPartition(): Int = cur._1._1
    }
  }
}

//伴生物件
//伴生物件中定義的欄位和方法, 對應同名trait/class中的靜態方法
private[spark] object WritablePartitionedPairCollection {
  /**
   * A comparator for (Int, K) pairs that orders them by only their partition ID.
     一個(Int,k)鍵值對的比較器,僅僅根據它們的partitionId進行排序
   */
  def partitionComparator[K]: Comparator[(Int, K)] = new Comparator[(Int, K)] {
    override def compare(a: (Int, K), b: (Int, K)): Int = {
      a._1 - b._1
    }
  }

  /**
   * A comparator for (Int, K) pairs that orders them both by their partition ID and a key ordering.
     一個(Int,k)鍵值對的比較器,同時根據它們的partitionId和key值進行排序
     其實是將傳入的引數keyComparator裝飾成partitionKeyComparator
   */
  def partitionKeyComparator[K](keyComparator: Comparator[K]): Comparator[(Int, K)] = {
    new Comparator[(Int, K)] {
      override def compare(a: (Int, K), b: (Int, K)): Int = {
        //先比較partitionId的大小,如果partitionId不同,可以直接返回比較結果
        val partitionDiff = a._1 - b._1
        if (partitionDiff != 0) {
          partitionDiff
        } else {
          //如果partitionId相同,再比較key
          keyComparator.compare(a._2, b._2)
        }
      }
    }
  }
}

/**
 * Iterator that writes elements to a DiskBlockObjectWriter instead of returning them. Each element
 * has an associated partition.
   將元素寫入一個DiskBlockObjectWriter,每個元素都有一個與之關聯的partition
 */
private[spark] trait WritablePartitionedIterator {
  def writeNext(writer: DiskBlockObjectWriter): Unit

  def hasNext(): Boolean

  def nextPartition(): Int
}

PartitionedAppendOnlyMap

WritablePartitionPairCollection的實現,它是map的包裝器,該map的key值是一個(partition ID, K)元組。

如果說在父類AppendOnlyMap中,元素的儲存格式為key0, value0, key1, value1, key2, value2....

在PartitionedAppendonlyMap中,元素的儲存格式可以更加具體為

元素的順序是(partitionId, k)0, value0, (partitionId, k)1, value1, (partitionId, k)2, value2....

圖片引自: here

所以,它的迭代器迭代的元素型別也是((Int, K), V)

/**
 * Implementation of WritablePartitionedPairCollection that wraps a map in which the keys are tuples
 * of (partition ID, K)
   WritablePartitionPairCollection的實現,它是map的包裝器,該map的key值是一個(partition ID, K)元組
 */
private[spark] class PartitionedAppendOnlyMap[K, V]
  extends SizeTrackingAppendOnlyMap[(Int, K), V] with WritablePartitionedPairCollection[K, V] {

  //迭代資料按照它們的partitionId的順序,然後按照給定的comparator的順序。
  def partitionedDestructiveSortedIterator(keyComparator: Option[Comparator[K]])
    : Iterator[((Int, K), V)] = {
//該map繼承了WritablePartitionPairCollection trait
//所以它也能使用同名伴生物件WritablePartitionPairCollection的partitionKeyComparator
//和partitionComparator方法
/*
  Option.map(f:(A)=>B)方法的作用是:如果Option不為None,則將Option的value值作為函式f的引數
  執行函式f,並將f的返回值作為該方法的返回值。如果Option為None,返回None。
  在這裡的作用是:如果keyComparator存在,將之裝飾成partitionKeyComparator
*/
/*
  getOrElse方法在這裡的作用是:如果keyComparator不存在,則使用partitionComparator
*/
    val comparator = keyComparator.map(partitionKeyComparator).getOrElse(partitionComparator)
    //呼叫父類AppendOnlyMap的destructiveSortediterator方法執行排序
    destructiveSortedIterator(comparator)
  }
  
  //
  def insert(partition: Int, key: K, value: V): Unit = {
    //呼叫AppendOnlyMap的update方法,設定key和value值
    update((partition, key), value)
  }
}

PartitionedPairBuffer

一個只支援追加的儲存kv鍵值對的buffer。每個kv鍵值對都具有一個相應的partitionId。

該buffer底層有一個可擴容的陣列結構。在該陣列中同時儲存key和value值,從而很方便地使用KVArraySortDataFormat進行排序。更明確的說,元素的順序是key0, value0, key1, value1, key2, value2....

該buffer最多支援1073741819個元素。

相比PartitionedAppendOnlyMap,該buffer只提供了插入、擴容、迭代功能:

  • 插入:insert方法實現。元素會被插入陣列的末端。在該陣列中同時儲存元素的key和value值,元素的key值是一個(partition ID, K)元組。
  • 擴容:growArray方法實現。確定新陣列的capacity,然後將原陣列的內容複製到新陣列。
  • 迭代:partitionedDestructiveSortedIterator方法實現。

PartitionedAppendOnlyMap和PartitionedPairBuffer的比較如下:

PartitionedAppendOnlyMap PartitionedPairBuffer
插入 使用二次探測法確定元素在陣列中的位置,找到為空的位置才插入 直接插入到陣列末端
擴容 建立新容量的陣列,需對原陣列的所有元素進行重雜湊,並用二次探測法確定在新陣列中的下標位置 建立新容量的陣列,然後呼叫System.arraycopy方法直接將原陣列內容複製到新陣列
迭代 迭代對底層陣列是破壞性的 迭代對底層陣列實際上不是破壞性的
修改 支援。二次探測法確定key在陣列中的下標位置,並修改它的value值 不支援
查詢 支援。二次探測法確定key在陣列中的下標位置,並返回它的value值 不支援
用途 ExternalSorter需要map端的聚合時 ExternalSorter不需要map端的聚合時
/**
 * Append-only buffer of key-value pairs, each with a corresponding partition ID, that keeps track
 * of its estimated size in bytes.
 *
 * The buffer can support up to 1073741819 elements.
 */
private[spark] class PartitionedPairBuffer[K, V](initialCapacity: Int = 64)
  extends WritablePartitionedPairCollection[K, V] with SizeTracker
{
  import PartitionedPairBuffer._

  require(initialCapacity <= MAXIMUM_CAPACITY,
    s"Can't make capacity bigger than ${MAXIMUM_CAPACITY} elements")
  require(initialCapacity >= 1, "Invalid initial capacity")

  // Basic growable array data structure. We use a single array of AnyRef to hold both the keys
  // and the values, so that we can sort them efficiently with KVArraySortDataFormat.
  private var capacity = initialCapacity
  private var curSize = 0
  private var data = new Array[AnyRef](2 * initialCapacity)

  /** Add an element into the buffer */
  def insert(partition: Int, key: K, value: V): Unit = {
    if (curSize == capacity) {
      growArray()
    }
    data(2 * curSize) = (partition, key.asInstanceOf[AnyRef])
    data(2 * curSize + 1) = value.asInstanceOf[AnyRef]
    curSize += 1
    afterUpdate()
  }

  /** Double the size of the array because we've reached capacity */
  private def growArray(): Unit = {
    if (capacity >= MAXIMUM_CAPACITY) {
      throw new IllegalStateException(s"Can't insert more than ${MAXIMUM_CAPACITY} elements")
    }
    val newCapacity =
      if (capacity * 2 > MAXIMUM_CAPACITY) { // Overflow
        MAXIMUM_CAPACITY
      } else {
        capacity * 2
      }
    val newArray = new Array[AnyRef](2 * newCapacity)
    System.arraycopy(data, 0, newArray, 0, 2 * capacity)
    data = newArray
    capacity = newCapacity
    resetSamples()
  }

  /** Iterate through the data in a given order. For this class this is not really destructive. */
  override def partitionedDestructiveSortedIterator(keyComparator: Option[Comparator[K]])
    : Iterator[((Int, K), V)] = {
    val comparator = keyComparator.map(partitionKeyComparator).getOrElse(partitionComparator)
    new Sorter(new KVArraySortDataFormat[(Int, K), AnyRef]).sort(data, 0, curSize, comparator)
    iterator
  }

  private def iterator(): Iterator[((Int, K), V)] = new Iterator[((Int, K), V)] {
    var pos = 0

    override def hasNext: Boolean = pos < curSize

    override def next(): ((Int, K), V) = {
      if (!hasNext) {
        throw new NoSuchElementException
      }
      val pair = (data(2 * pos).asInstanceOf[(Int, K)], data(2 * pos + 1).asInstanceOf[V])
      pos += 1
      pair
    }
  }
}

//伴生物件
private object PartitionedPairBuffer {
  val MAXIMUM_CAPACITY: Int = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH / 2
}