1. 程式人生 > >spark2.3原始碼分析之in-memory collection

spark2.3原始碼分析之in-memory collection



一個只可以新增資料的hash table的實現。它的key值永遠不會刪除,而每個key的value值可能會改變。

該hash table使用開放探測方法中的二次探測法儲存資料,所以內部只有一個數組的資料結構。

該hash table的大小始終為2的冪次方,最多可以支援0.7 * 2 ^ 29個元素。

該hash table為了記憶體本地性,在同一個陣列中儲存key和value值;更明確的說,元素的順序是key0, value0, key1, value1, key2, value2....



  • 插入:update方法實現。設定key和value值。
  • 修改:changeValue方法實現。修改key的value值。
  • 擴容:growTable方法實現。將該table雙倍擴容,並所有元素重雜湊。
  • 查詢:apply方法實現。用於獲取給定key的value值。
  • 迭代:destructiveSortedIterator方法實現。按照給定比較器的排序順序返回該map的迭代器,該方法不需要使用額外的記憶體就能將map上的資料排序,但是會破壞map的有效性,底層的陣列結構不能再被使用。


 * :: DeveloperApi ::
 * A simple open hash table optimized for the append-only use case, where keys
 * are never removed, but the value for each key may be changed.
 * This implementation uses quadratic probing with a power-of-2 hash table
 * size, which is guaranteed to explore all spaces for each key (see
 * http://en.wikipedia.org/wiki/Quadratic_probing).
 * The map can support up to `375809638 (0.7 * 2 ^ 29)` elements.
 * TODO: Cache the hash values of each key? java.util.HashMap does that.
class AppendOnlyMap[K, V](initialCapacity: Int = 64)
  extends Iterable[(K, V)] with Serializable {

  import AppendOnlyMap._

  require(initialCapacity <= MAXIMUM_CAPACITY,
    s"Can't make capacity bigger than ${MAXIMUM_CAPACITY} elements")
  require(initialCapacity >= 1, "Invalid initial capacity")

  private val LOAD_FACTOR = 0.7

  private var capacity = nextPowerOf2(initialCapacity)
  private var mask = capacity - 1
  private var curSize = 0
  private var growThreshold = (LOAD_FACTOR * capacity).toInt

  // Holds keys and values in the same array for memory locality; specifically, the order of
  // elements is key0, value0, key1, value1, key2, value2, etc.
  //更明確的說,元素的順序是key0, value0, key1, value1, key2, value2....
  private var data = new Array[AnyRef](2 * capacity)

  // Treat the null key differently so we can use nulls in "data" to represent empty items.
  private var haveNullValue = false
  private var nullValue: V = null.asInstanceOf[V]

  // Triggered by destructiveSortedIterator; the underlying data array may no longer be used
  private var destroyed = false
  private val destructionMessage = "Map state is invalid from destructive sorting!"

  /** Get the value for a given key */
  def apply(key: K): V = {
    assert(!destroyed, destructionMessage)
    val k = key.asInstanceOf[AnyRef]
    if (k.eq(null)) {
      return nullValue
    var pos = rehash(k.hashCode) & mask
    var i = 1
    while (true) {
      val curKey = data(2 * pos)
      if (k.eq(curKey) || k.equals(curKey)) {
        return data(2 * pos + 1).asInstanceOf[V]
      } else if (curKey.eq(null)) {
        return null.asInstanceOf[V]
      } else {
		//pos + delta是將pos向前偏移delta個位置
		//& mask是防止向前偏移delta個位置後超出陣列下標
        val delta = i
        pos = (pos + delta) & mask
        i += 1

  /** Set the value for a key */
  def update(key: K, value: V): Unit = {
    assert(!destroyed, destructionMessage)
    val k = key.asInstanceOf[AnyRef]
    if (k.eq(null)) {
      if (!haveNullValue) {
      nullValue = value
      haveNullValue = true
    var pos = rehash(key.hashCode) & mask
    var i = 1
    while (true) {
      val curKey = data(2 * pos)
      if (curKey.eq(null)) {
        data(2 * pos) = k
        data(2 * pos + 1) = value.asInstanceOf[AnyRef]
        incrementSize()  // Since we added a new key
      } else if (k.eq(curKey) || k.equals(curKey)) {
        data(2 * pos + 1) = value.asInstanceOf[AnyRef]
      } else {
        val delta = i
        pos = (pos + delta) & mask
        i += 1

   * Set the value for key to updateFunc(hadValue, oldValue), where oldValue will be the old value
   * for key, if any, or null otherwise. Returns the newly updated value.
  def changeValue(key: K, updateFunc: (Boolean, V) => V): V = {
    assert(!destroyed, destructionMessage)
    val k = key.asInstanceOf[AnyRef]
    if (k.eq(null)) {
      if (!haveNullValue) {
      nullValue = updateFunc(haveNullValue, nullValue)
      haveNullValue = true
      return nullValue
    var pos = rehash(k.hashCode) & mask
    var i = 1
    while (true) {
      val curKey = data(2 * pos)
      if (curKey.eq(null)) {
        val newValue = updateFunc(false, null.asInstanceOf[V])
        data(2 * pos) = k
	//設定value值:data(2 * pos + 1)為newValue
        data(2 * pos + 1) = newValue.asInstanceOf[AnyRef]
        return newValue
      } else if (k.eq(curKey) || k.equals(curKey)) {
        val newValue = updateFunc(true, data(2 * pos + 1).asInstanceOf[V])
        data(2 * pos + 1) = newValue.asInstanceOf[AnyRef]
        return newValue
      } else {
        val delta = i
        pos = (pos + delta) & mask
        i += 1
    null.asInstanceOf[V] // Never reached but needed to keep compiler happy

  /** Iterator method from Iterable */
  override def iterator: Iterator[(K, V)] = {
    assert(!destroyed, destructionMessage)
    new Iterator[(K, V)] {
      var pos = -1

      /** Get the next value we should return from next(), or null if we're finished iterating */
      def nextValue(): (K, V) = {
        if (pos == -1) {    // Treat position -1 as looking at the null value
          if (haveNullValue) {
            return (null.asInstanceOf[K], nullValue)
          pos += 1
        while (pos < capacity) {
          if (!data(2 * pos).eq(null)) {
		  //返回(k, v)鍵值對
            return (data(2 * pos).asInstanceOf[K], data(2 * pos + 1).asInstanceOf[V])
          pos += 1

      override def hasNext: Boolean = nextValue() != null

      override def next(): (K, V) = {
        val value = nextValue()
        if (value == null) {
          throw new NoSuchElementException("End of iterator")
        pos += 1

  override def size: Int = curSize

  /** Increase table size by 1, rehashing if necessary */
  private def incrementSize() {
    curSize += 1
    if (curSize > growThreshold) {

   * Re-hash a value to deal better with hash functions that don't differ in the lower bits.
  private def rehash(h: Int): Int = Hashing.murmur3_32().hashInt(h).asInt()

  /** Double the table's size and re-hash everything */
  protected def growTable() {
    // capacity < MAXIMUM_CAPACITY (2 ^ 29) so capacity * 2 won't overflow
    val newCapacity = capacity * 2
    require(newCapacity <= MAXIMUM_CAPACITY, s"Can't contain more than ${growThreshold} elements")
	//建立新陣列,因為要同時儲存key和value,所以新陣列的大小為2 * newCapacity
    val newData = new Array[AnyRef](2 * newCapacity)
    val newMask = newCapacity - 1
    // Insert all our old values into the new array. Note that because our old keys are
    // unique, there's no need to check for equality here when we insert.
    var oldPos = 0
    while (oldPos < capacity) {
      if (!data(2 * oldPos).eq(null)) {
        val key = data(2 * oldPos)
        val value = data(2 * oldPos + 1)
        var newPos = rehash(key.hashCode) & newMask
        var i = 1
        var keepGoing = true
        while (keepGoing) {
          val curKey = newData(2 * newPos)
          if (curKey.eq(null)) {
            newData(2 * newPos) = key
            newData(2 * newPos + 1) = value
            keepGoing = false
          } else {
            val delta = i
            newPos = (newPos + delta) & newMask
            i += 1
      oldPos += 1
    data = newData
    capacity = newCapacity
    mask = newMask
    growThreshold = (LOAD_FACTOR * newCapacity).toInt

  private def nextPowerOf2(n: Int): Int = {
    val highBit = Integer.highestOneBit(n)
    if (highBit == n) n else highBit << 1

   * Return an iterator of the map in sorted order. This provides a way to sort the map without
   * using additional memory, at the expense of destroying the validity of the map.
  def destructiveSortedIterator(keyComparator: Comparator[K]): Iterator[(K, V)] = {
    destroyed = true
    // Pack KV pairs into the front of the underlying array
    var keyIndex, newIndex = 0
    while (keyIndex < capacity) {
      if (data(2 * keyIndex) != null) {
        data(2 * newIndex) = data(2 * keyIndex)
        data(2 * newIndex + 1) = data(2 * keyIndex + 1)
        newIndex += 1
      keyIndex += 1
    assert(curSize == newIndex + (if (haveNullValue) 1 else 0))
    new Sorter(new KVArraySortDataFormat[K, AnyRef]).sort(data, 0, newIndex, keyComparator)

    new Iterator[(K, V)] {
      var i = 0
      var nullValueReady = haveNullValue
      def hasNext: Boolean = (i < newIndex || nullValueReady)
      def next(): (K, V) = {
        if (nullValueReady) {
          nullValueReady = false
          (null.asInstanceOf[K], nullValue)
        } else {
          val item = (data(2 * i).asInstanceOf[K], data(2 * i + 1).asInstanceOf[V])
          i += 1

   * Return whether the next insert will cause the map to grow
  def atGrowThreshold: Boolean = curSize == growThreshold

private object AppendOnlyMap {
  val MAXIMUM_CAPACITY = (1 << 29)



 * An append-only map that keeps track of its estimated size in bytes.
private[spark] class SizeTrackingAppendOnlyMap[K, V]
  extends AppendOnlyMap[K, V] with SizeTracker
  override def update(key: K, value: V): Unit = {
    super.update(key, value)

  override def changeValue(key: K, updateFunc: (Boolean, V) => V): V = {
    val newValue = super.changeValue(key, updateFunc)

  override protected def growTable(): Unit = {


一個為儲存key/value鍵值對的size-tracking collection提供以下功能的通用介面。它有以下功能:

    2、支援一個記憶體效率高的sorted iterator;

該size-tracking collection指的其實就是:SizeTrackingAppendOnlyMap及其子類

 * A common interface for size-tracking collections of key-value pairs that
 *  - Have an associated partition for each key-value pair.
 *  - Support a memory-efficient sorted iterator
 *  - Support a WritablePartitionedIterator for writing the contents directly as bytes.
private[spark] trait WritablePartitionedPairCollection[K, V] {
   * Insert a key-value pair with a partition into the collection
  def insert(partition: Int, key: K, value: V): Unit

   * Iterate through the data in order of partition ID and then the given comparator. This may
   * destroy the underlying collection.
  def partitionedDestructiveSortedIterator(keyComparator: Option[Comparator[K]])
    : Iterator[((Int, K), V)]

   * Iterate through the data and write out the elements instead of returning them. Records are
   * returned in order of their partition ID and then the given comparator.
   * This may destroy the underlying collection.
  def destructiveSortedWritablePartitionedIterator(keyComparator: Option[Comparator[K]])
    : WritablePartitionedIterator = {
    val it = partitionedDestructiveSortedIterator(keyComparator)
    //實現WritablePartitionedIterator trait,並建立一個例項物件
    new WritablePartitionedIterator {
      private[this] var cur = if (it.hasNext) it.next() else null
      def writeNext(writer: DiskBlockObjectWriter): Unit = {
        writer.write(cur._1._2, cur._2)
        cur = if (it.hasNext) it.next() else null

      def hasNext(): Boolean = cur != null

      def nextPartition(): Int = cur._1._1

//伴生物件中定義的欄位和方法, 對應同名trait/class中的靜態方法
private[spark] object WritablePartitionedPairCollection {
   * A comparator for (Int, K) pairs that orders them by only their partition ID.
  def partitionComparator[K]: Comparator[(Int, K)] = new Comparator[(Int, K)] {
    override def compare(a: (Int, K), b: (Int, K)): Int = {
      a._1 - b._1

   * A comparator for (Int, K) pairs that orders them both by their partition ID and a key ordering.
  def partitionKeyComparator[K](keyComparator: Comparator[K]): Comparator[(Int, K)] = {
    new Comparator[(Int, K)] {
      override def compare(a: (Int, K), b: (Int, K)): Int = {
        val partitionDiff = a._1 - b._1
        if (partitionDiff != 0) {
        } else {
          keyComparator.compare(a._2, b._2)

 * Iterator that writes elements to a DiskBlockObjectWriter instead of returning them. Each element
 * has an associated partition.
private[spark] trait WritablePartitionedIterator {
  def writeNext(writer: DiskBlockObjectWriter): Unit

  def hasNext(): Boolean

  def nextPartition(): Int


WritablePartitionPairCollection的實現,它是map的包裝器,該map的key值是一個(partition ID, K)元組。

如果說在父類AppendOnlyMap中,元素的儲存格式為key0, value0, key1, value1, key2, value2....


元素的順序是(partitionId, k)0, value0, (partitionId, k)1, value1, (partitionId, k)2, value2....

圖片引自: here

所以,它的迭代器迭代的元素型別也是((Int, K), V)

 * Implementation of WritablePartitionedPairCollection that wraps a map in which the keys are tuples
 * of (partition ID, K)
   WritablePartitionPairCollection的實現,它是map的包裝器,該map的key值是一個(partition ID, K)元組
private[spark] class PartitionedAppendOnlyMap[K, V]
  extends SizeTrackingAppendOnlyMap[(Int, K), V] with WritablePartitionedPairCollection[K, V] {

  def partitionedDestructiveSortedIterator(keyComparator: Option[Comparator[K]])
    : Iterator[((Int, K), V)] = {
//該map繼承了WritablePartitionPairCollection trait
    val comparator = keyComparator.map(partitionKeyComparator).getOrElse(partitionComparator)
  def insert(partition: Int, key: K, value: V): Unit = {
    update((partition, key), value)



該buffer底層有一個可擴容的陣列結構。在該陣列中同時儲存key和value值,從而很方便地使用KVArraySortDataFormat進行排序。更明確的說,元素的順序是key0, value0, key1, value1, key2, value2....



  • 插入:insert方法實現。元素會被插入陣列的末端。在該陣列中同時儲存元素的key和value值,元素的key值是一個(partition ID, K)元組。
  • 擴容:growArray方法實現。確定新陣列的capacity,然後將原陣列的內容複製到新陣列。
  • 迭代:partitionedDestructiveSortedIterator方法實現。


PartitionedAppendOnlyMap PartitionedPairBuffer
插入 使用二次探測法確定元素在陣列中的位置,找到為空的位置才插入 直接插入到陣列末端
擴容 建立新容量的陣列,需對原陣列的所有元素進行重雜湊,並用二次探測法確定在新陣列中的下標位置 建立新容量的陣列,然後呼叫System.arraycopy方法直接將原陣列內容複製到新陣列
迭代 迭代對底層陣列是破壞性的 迭代對底層陣列實際上不是破壞性的
修改 支援。二次探測法確定key在陣列中的下標位置,並修改它的value值 不支援
查詢 支援。二次探測法確定key在陣列中的下標位置,並返回它的value值 不支援
用途 ExternalSorter需要map端的聚合時 ExternalSorter不需要map端的聚合時
 * Append-only buffer of key-value pairs, each with a corresponding partition ID, that keeps track
 * of its estimated size in bytes.
 * The buffer can support up to 1073741819 elements.
private[spark] class PartitionedPairBuffer[K, V](initialCapacity: Int = 64)
  extends WritablePartitionedPairCollection[K, V] with SizeTracker
  import PartitionedPairBuffer._

  require(initialCapacity <= MAXIMUM_CAPACITY,
    s"Can't make capacity bigger than ${MAXIMUM_CAPACITY} elements")
  require(initialCapacity >= 1, "Invalid initial capacity")

  // Basic growable array data structure. We use a single array of AnyRef to hold both the keys
  // and the values, so that we can sort them efficiently with KVArraySortDataFormat.
  private var capacity = initialCapacity
  private var curSize = 0
  private var data = new Array[AnyRef](2 * initialCapacity)

  /** Add an element into the buffer */
  def insert(partition: Int, key: K, value: V): Unit = {
    if (curSize == capacity) {
    data(2 * curSize) = (partition, key.asInstanceOf[AnyRef])
    data(2 * curSize + 1) = value.asInstanceOf[AnyRef]
    curSize += 1

  /** Double the size of the array because we've reached capacity */
  private def growArray(): Unit = {
    if (capacity >= MAXIMUM_CAPACITY) {
      throw new IllegalStateException(s"Can't insert more than ${MAXIMUM_CAPACITY} elements")
    val newCapacity =
      if (capacity * 2 > MAXIMUM_CAPACITY) { // Overflow
      } else {
        capacity * 2
    val newArray = new Array[AnyRef](2 * newCapacity)
    System.arraycopy(data, 0, newArray, 0, 2 * capacity)
    data = newArray
    capacity = newCapacity

  /** Iterate through the data in a given order. For this class this is not really destructive. */
  override def partitionedDestructiveSortedIterator(keyComparator: Option[Comparator[K]])
    : Iterator[((Int, K), V)] = {
    val comparator = keyComparator.map(partitionKeyComparator).getOrElse(partitionComparator)
    new Sorter(new KVArraySortDataFormat[(Int, K), AnyRef]).sort(data, 0, curSize, comparator)

  private def iterator(): Iterator[((Int, K), V)] = new Iterator[((Int, K), V)] {
    var pos = 0

    override def hasNext: Boolean = pos < curSize

    override def next(): ((Int, K), V) = {
      if (!hasNext) {
        throw new NoSuchElementException
      val pair = (data(2 * pos).asInstanceOf[(Int, K)], data(2 * pos + 1).asInstanceOf[V])
      pos += 1

private object PartitionedPairBuffer {