spark2.3原始碼分析之in-memory collection
AppendOnlyMap
概述
一個只可以新增資料的hash table的實現。它的key值永遠不會刪除,而每個key的value值可能會改變。
該hash table使用開放探測方法中的二次探測法儲存資料,所以內部只有一個數組的資料結構。
該hash table的大小始終為2的冪次方,最多可以支援0.7 * 2 ^ 29個元素。
該hash table為了記憶體本地性,在同一個陣列中儲存key和value值;更明確的說,元素的順序是key0, value0, key1, value1, key2, value2....
該AppendOnlyMap允許null作為key。當null作為key時,返回的value值也為null。
除了沒有提供刪除功能外,它提供了一個map應有的插入、修改、擴容、查詢、迭代功能:
- 插入:update方法實現。設定key和value值。
- 修改:changeValue方法實現。修改key的value值。
- 擴容:growTable方法實現。將該table雙倍擴容,並所有元素重雜湊。
- 查詢:apply方法實現。用於獲取給定key的value值。
- 迭代:destructiveSortedIterator方法實現。按照給定比較器的排序順序返回該map的迭代器,該方法不需要使用額外的記憶體就能將map上的資料排序,但是會破壞map的有效性,底層的陣列結構不能再被使用。
AppendOnlyMap可以基於比較器的排序順序返回該map的迭代器,在這一點上與SortedMap類似。
/** * :: DeveloperApi :: * A simple open hash table optimized for the append-only use case, where keys * are never removed, but the value for each key may be changed. * * This implementation uses quadratic probing with a power-of-2 hash table * size, which is guaranteed to explore all spaces for each key (see * http://en.wikipedia.org/wiki/Quadratic_probing). * * The map can support up to `375809638 (0.7 * 2 ^ 29)` elements. * * TODO: Cache the hash values of each key? java.util.HashMap does that. */ @DeveloperApi class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] with Serializable { import AppendOnlyMap._ require(initialCapacity <= MAXIMUM_CAPACITY, s"Can't make capacity bigger than ${MAXIMUM_CAPACITY} elements") require(initialCapacity >= 1, "Invalid initial capacity") private val LOAD_FACTOR = 0.7 private var capacity = nextPowerOf2(initialCapacity) private var mask = capacity - 1 private var curSize = 0 private var growThreshold = (LOAD_FACTOR * capacity).toInt // Holds keys and values in the same array for memory locality; specifically, the order of // elements is key0, value0, key1, value1, key2, value2, etc. //為了記憶體本地性在同一個陣列中儲存key和value值; //更明確的說,元素的順序是key0, value0, key1, value1, key2, value2.... private var data = new Array[AnyRef](2 * capacity) // Treat the null key differently so we can use nulls in "data" to represent empty items. private var haveNullValue = false private var nullValue: V = null.asInstanceOf[V] // Triggered by destructiveSortedIterator; the underlying data array may no longer be used private var destroyed = false private val destructionMessage = "Map state is invalid from destructive sorting!" /** Get the value for a given key */ //獲取給定key的value值 def apply(key: K): V = { assert(!destroyed, destructionMessage) val k = key.asInstanceOf[AnyRef] if (k.eq(null)) { return nullValue } //獲取key的hashCode並和mask相與,獲取元素應該存放的陣列下標(通過pos*2和pos*2+1) var pos = rehash(k.hashCode) & mask var i = 1 while (true) { //key在data陣列中的下標為0,2,4,6 //假設data陣列只儲存key,則下標為0,1,2,3.. //所以2*pos為key在data陣列中的下標,2*pos+1為相應value在data陣列中的下標 val curKey = data(2 * pos) if (k.eq(curKey) || k.equals(curKey)) { //返回value return data(2 * pos + 1).asInstanceOf[V] } else if (curKey.eq(null)) { return null.asInstanceOf[V] } else { //目標位置的key與要查詢的key不一樣,則使用二次探測法繼續查詢 //使用開放地址法的二次探測法繼續探測 //二次探測就是線上性探測上做一個修改而成的, //線性探測中,遇到衝突就自增1,而二次探測中,就是把這個自增1去掉換成一個固定值或自定義值 //pos + delta是將pos向前偏移delta個位置 //& mask是防止向前偏移delta個位置後超出陣列下標 val delta = i pos = (pos + delta) & mask i += 1 } } null.asInstanceOf[V] } /** Set the value for a key */ def update(key: K, value: V): Unit = { assert(!destroyed, destructionMessage) val k = key.asInstanceOf[AnyRef] if (k.eq(null)) { if (!haveNullValue) { incrementSize() } nullValue = value haveNullValue = true return } //獲取key的hashCode並和mask相與 var pos = rehash(key.hashCode) & mask var i = 1 while (true) { val curKey = data(2 * pos) if (curKey.eq(null)) { data(2 * pos) = k data(2 * pos + 1) = value.asInstanceOf[AnyRef] incrementSize() // Since we added a new key return } else if (k.eq(curKey) || k.equals(curKey)) { data(2 * pos + 1) = value.asInstanceOf[AnyRef] return } else { //使用開放地址法的二次探測法繼續探測 val delta = i pos = (pos + delta) & mask i += 1 } } } /** * Set the value for key to updateFunc(hadValue, oldValue), where oldValue will be the old value * for key, if any, or null otherwise. Returns the newly updated value. */ def changeValue(key: K, updateFunc: (Boolean, V) => V): V = { assert(!destroyed, destructionMessage) val k = key.asInstanceOf[AnyRef] if (k.eq(null)) { if (!haveNullValue) { incrementSize() } nullValue = updateFunc(haveNullValue, nullValue) haveNullValue = true return nullValue } var pos = rehash(k.hashCode) & mask var i = 1 while (true) { val curKey = data(2 * pos) /*updateFunc函式的作用是,如果有值執行aggregator的mergeValue函式,如果沒值執行aggregator的createCombiner函式 */ if (curKey.eq(null)) { //updateFunc函式執行aggregator的createCombiner函式 val newValue = updateFunc(false, null.asInstanceOf[V]) //設定key值:data(2*pos)為k data(2 * pos) = k //設定value值:data(2 * pos + 1)為newValue data(2 * pos + 1) = newValue.asInstanceOf[AnyRef] incrementSize() return newValue } else if (k.eq(curKey) || k.equals(curKey)) { //updateFunc函式執行aggregator的mergeValue函式,將舊值合併成新值 val newValue = updateFunc(true, data(2 * pos + 1).asInstanceOf[V]) data(2 * pos + 1) = newValue.asInstanceOf[AnyRef] return newValue } else { //使用開放地址法的二次探測法繼續探測 val delta = i pos = (pos + delta) & mask i += 1 } } null.asInstanceOf[V] // Never reached but needed to keep compiler happy } /** Iterator method from Iterable */ override def iterator: Iterator[(K, V)] = { assert(!destroyed, destructionMessage) new Iterator[(K, V)] { var pos = -1 /** Get the next value we should return from next(), or null if we're finished iterating */ //在next方法中會呼叫該方法,從而能返回下一個value值 def nextValue(): (K, V) = { if (pos == -1) { // Treat position -1 as looking at the null value if (haveNullValue) { return (null.asInstanceOf[K], nullValue) } pos += 1 } while (pos < capacity) { if (!data(2 * pos).eq(null)) { //返回(k, v)鍵值對 return (data(2 * pos).asInstanceOf[K], data(2 * pos + 1).asInstanceOf[V]) } pos += 1 } null } override def hasNext: Boolean = nextValue() != null override def next(): (K, V) = { val value = nextValue() if (value == null) { throw new NoSuchElementException("End of iterator") } pos += 1 value } } } override def size: Int = curSize /** Increase table size by 1, rehashing if necessary */ private def incrementSize() { curSize += 1 if (curSize > growThreshold) { growTable() } } /** * Re-hash a value to deal better with hash functions that don't differ in the lower bits. */ private def rehash(h: Int): Int = Hashing.murmur3_32().hashInt(h).asInt() /** Double the table's size and re-hash everything */ //將table雙倍擴容,並所有元素重雜湊 protected def growTable() { // capacity < MAXIMUM_CAPACITY (2 ^ 29) so capacity * 2 won't overflow //新的capacity為原capacity的2倍 val newCapacity = capacity * 2 require(newCapacity <= MAXIMUM_CAPACITY, s"Can't contain more than ${growThreshold} elements") //建立新陣列,因為要同時儲存key和value,所以新陣列的大小為2 * newCapacity val newData = new Array[AnyRef](2 * newCapacity) //新的掩碼 val newMask = newCapacity - 1 // Insert all our old values into the new array. Note that because our old keys are // unique, there's no need to check for equality here when we insert. var oldPos = 0 while (oldPos < capacity) { if (!data(2 * oldPos).eq(null)) { //獲取原data陣列的key和value val key = data(2 * oldPos) val value = data(2 * oldPos + 1) //重雜湊,獲取key值在新data陣列的下標位置 var newPos = rehash(key.hashCode) & newMask var i = 1 var keepGoing = true while (keepGoing) { val curKey = newData(2 * newPos) if (curKey.eq(null)) { //設定在新data陣列的下標位置的key和value值 newData(2 * newPos) = key newData(2 * newPos + 1) = value keepGoing = false } else { //使用開放地址法的二次探測法繼續探測 val delta = i newPos = (newPos + delta) & newMask i += 1 } } } oldPos += 1 } data = newData capacity = newCapacity mask = newMask growThreshold = (LOAD_FACTOR * newCapacity).toInt } //返回值總與引數n相等, private def nextPowerOf2(n: Int): Int = { val highBit = Integer.highestOneBit(n) if (highBit == n) n else highBit << 1 } /** * Return an iterator of the map in sorted order. This provides a way to sort the map without * using additional memory, at the expense of destroying the validity of the map. */ //按照排序的順序返回該map的迭代器,該方法不需要使用額外的記憶體就能將map上的資料排序,但是會破壞 //map的有效性 def destructiveSortedIterator(keyComparator: Comparator[K]): Iterator[(K, V)] = { destroyed = true // Pack KV pairs into the front of the underlying array //將不為null的kv鍵值對移到底層陣列的前端,而null鍵和null值的kv對移到陣列末端 var keyIndex, newIndex = 0 while (keyIndex < capacity) { if (data(2 * keyIndex) != null) { data(2 * newIndex) = data(2 * keyIndex) data(2 * newIndex + 1) = data(2 * keyIndex + 1) newIndex += 1 } keyIndex += 1 } assert(curSize == newIndex + (if (haveNullValue) 1 else 0)) //在data陣列上排序 new Sorter(new KVArraySortDataFormat[K, AnyRef]).sort(data, 0, newIndex, keyComparator) //返回迭代器 new Iterator[(K, V)] { var i = 0 var nullValueReady = haveNullValue def hasNext: Boolean = (i < newIndex || nullValueReady) def next(): (K, V) = { if (nullValueReady) { nullValueReady = false (null.asInstanceOf[K], nullValue) } else { val item = (data(2 * i).asInstanceOf[K], data(2 * i + 1).asInstanceOf[V]) i += 1 item } } } } /** * Return whether the next insert will cause the map to grow */ def atGrowThreshold: Boolean = curSize == growThreshold } //伴生物件 private object AppendOnlyMap { val MAXIMUM_CAPACITY = (1 << 29) }
SizeTrackingAppendOnlyMap
AppendOnlyMap的子類,可以追蹤位元組的預估大小。
/**
* An append-only map that keeps track of its estimated size in bytes.
*/
private[spark] class SizeTrackingAppendOnlyMap[K, V]
extends AppendOnlyMap[K, V] with SizeTracker
{
override def update(key: K, value: V): Unit = {
super.update(key, value)
super.afterUpdate()
}
override def changeValue(key: K, updateFunc: (Boolean, V) => V): V = {
val newValue = super.changeValue(key, updateFunc)
super.afterUpdate()
newValue
}
override protected def growTable(): Unit = {
super.growTable()
resetSamples()
}
}
WritablePartitionedPairCollection
一個為儲存key/value鍵值對的size-tracking collection提供以下功能的通用介面。它有以下功能:
1、每個鍵值對有一個與之關聯的partition;
2、支援一個記憶體效率高的sorted iterator;
3、支援一個WritablePartitionedIterator介面,用於將內容直接寫為位元組
該size-tracking collection指的其實就是:SizeTrackingAppendOnlyMap及其子類
/**
* A common interface for size-tracking collections of key-value pairs that
*
* - Have an associated partition for each key-value pair.
* - Support a memory-efficient sorted iterator
* - Support a WritablePartitionedIterator for writing the contents directly as bytes.
*/
private[spark] trait WritablePartitionedPairCollection[K, V] {
/**
* Insert a key-value pair with a partition into the collection
插入一個鍵值對和一個partition到該集合中
*/
def insert(partition: Int, key: K, value: V): Unit
/**
* Iterate through the data in order of partition ID and then the given comparator. This may
* destroy the underlying collection.
*/
//抽象方法,供子類實現,迭代資料按照它們的partitionId的順序,然後按照給定的comparator的順序。
//這個可能會破壞底層的collection
def partitionedDestructiveSortedIterator(keyComparator: Option[Comparator[K]])
: Iterator[((Int, K), V)]
/**
* Iterate through the data and write out the elements instead of returning them. Records are
* returned in order of their partition ID and then the given comparator.
* This may destroy the underlying collection.
呼叫partitionedDestructiveSortedIterator方法,使用比較器將集合上的元素排序,
並返回排序後的集合的迭代器,然後用該迭代器迭代元素,寫入磁碟檔案。
*/
def destructiveSortedWritablePartitionedIterator(keyComparator: Option[Comparator[K]])
: WritablePartitionedIterator = {
//抽象方法partitionedDestructiveSortedIterator()被子類實現後,在此處被呼叫,屬於模板設計模式的使用
val it = partitionedDestructiveSortedIterator(keyComparator)
//實現WritablePartitionedIterator trait,並建立一個例項物件
new WritablePartitionedIterator {
//獲取當前的迭代元素
private[this] var cur = if (it.hasNext) it.next() else null
//用DiskBlockObjectWriter寫入當前的迭代元素
def writeNext(writer: DiskBlockObjectWriter): Unit = {
writer.write(cur._1._2, cur._2)
cur = if (it.hasNext) it.next() else null
}
def hasNext(): Boolean = cur != null
def nextPartition(): Int = cur._1._1
}
}
}
//伴生物件
//伴生物件中定義的欄位和方法, 對應同名trait/class中的靜態方法
private[spark] object WritablePartitionedPairCollection {
/**
* A comparator for (Int, K) pairs that orders them by only their partition ID.
一個(Int,k)鍵值對的比較器,僅僅根據它們的partitionId進行排序
*/
def partitionComparator[K]: Comparator[(Int, K)] = new Comparator[(Int, K)] {
override def compare(a: (Int, K), b: (Int, K)): Int = {
a._1 - b._1
}
}
/**
* A comparator for (Int, K) pairs that orders them both by their partition ID and a key ordering.
一個(Int,k)鍵值對的比較器,同時根據它們的partitionId和key值進行排序
其實是將傳入的引數keyComparator裝飾成partitionKeyComparator
*/
def partitionKeyComparator[K](keyComparator: Comparator[K]): Comparator[(Int, K)] = {
new Comparator[(Int, K)] {
override def compare(a: (Int, K), b: (Int, K)): Int = {
//先比較partitionId的大小,如果partitionId不同,可以直接返回比較結果
val partitionDiff = a._1 - b._1
if (partitionDiff != 0) {
partitionDiff
} else {
//如果partitionId相同,再比較key
keyComparator.compare(a._2, b._2)
}
}
}
}
}
/**
* Iterator that writes elements to a DiskBlockObjectWriter instead of returning them. Each element
* has an associated partition.
將元素寫入一個DiskBlockObjectWriter,每個元素都有一個與之關聯的partition
*/
private[spark] trait WritablePartitionedIterator {
def writeNext(writer: DiskBlockObjectWriter): Unit
def hasNext(): Boolean
def nextPartition(): Int
}
PartitionedAppendOnlyMap
WritablePartitionPairCollection的實現,它是map的包裝器,該map的key值是一個(partition ID, K)元組。
如果說在父類AppendOnlyMap中,元素的儲存格式為key0, value0, key1, value1, key2, value2....
則在PartitionedAppendonlyMap中,元素的儲存格式可以更加具體為
元素的順序是(partitionId, k)0, value0, (partitionId, k)1, value1, (partitionId, k)2, value2....
圖片引自: here
所以,它的迭代器迭代的元素型別也是((Int, K), V)
/**
* Implementation of WritablePartitionedPairCollection that wraps a map in which the keys are tuples
* of (partition ID, K)
WritablePartitionPairCollection的實現,它是map的包裝器,該map的key值是一個(partition ID, K)元組
*/
private[spark] class PartitionedAppendOnlyMap[K, V]
extends SizeTrackingAppendOnlyMap[(Int, K), V] with WritablePartitionedPairCollection[K, V] {
//迭代資料按照它們的partitionId的順序,然後按照給定的comparator的順序。
def partitionedDestructiveSortedIterator(keyComparator: Option[Comparator[K]])
: Iterator[((Int, K), V)] = {
//該map繼承了WritablePartitionPairCollection trait
//所以它也能使用同名伴生物件WritablePartitionPairCollection的partitionKeyComparator
//和partitionComparator方法
/*
Option.map(f:(A)=>B)方法的作用是:如果Option不為None,則將Option的value值作為函式f的引數
執行函式f,並將f的返回值作為該方法的返回值。如果Option為None,返回None。
在這裡的作用是:如果keyComparator存在,將之裝飾成partitionKeyComparator
*/
/*
getOrElse方法在這裡的作用是:如果keyComparator不存在,則使用partitionComparator
*/
val comparator = keyComparator.map(partitionKeyComparator).getOrElse(partitionComparator)
//呼叫父類AppendOnlyMap的destructiveSortediterator方法執行排序
destructiveSortedIterator(comparator)
}
//
def insert(partition: Int, key: K, value: V): Unit = {
//呼叫AppendOnlyMap的update方法,設定key和value值
update((partition, key), value)
}
}
PartitionedPairBuffer
一個只支援追加的儲存kv鍵值對的buffer。每個kv鍵值對都具有一個相應的partitionId。
該buffer底層有一個可擴容的陣列結構。在該陣列中同時儲存key和value值,從而很方便地使用KVArraySortDataFormat進行排序。更明確的說,元素的順序是key0, value0, key1, value1, key2, value2....
該buffer最多支援1073741819個元素。
相比PartitionedAppendOnlyMap,該buffer只提供了插入、擴容、迭代功能:
- 插入:insert方法實現。元素會被插入陣列的末端。在該陣列中同時儲存元素的key和value值,元素的key值是一個(partition ID, K)元組。
- 擴容:growArray方法實現。確定新陣列的capacity,然後將原陣列的內容複製到新陣列。
- 迭代:partitionedDestructiveSortedIterator方法實現。
PartitionedAppendOnlyMap和PartitionedPairBuffer的比較如下:
PartitionedAppendOnlyMap | PartitionedPairBuffer | |
插入 | 使用二次探測法確定元素在陣列中的位置,找到為空的位置才插入 | 直接插入到陣列末端 |
擴容 | 建立新容量的陣列,需對原陣列的所有元素進行重雜湊,並用二次探測法確定在新陣列中的下標位置 | 建立新容量的陣列,然後呼叫System.arraycopy方法直接將原陣列內容複製到新陣列 |
迭代 | 迭代對底層陣列是破壞性的 | 迭代對底層陣列實際上不是破壞性的 |
修改 | 支援。二次探測法確定key在陣列中的下標位置,並修改它的value值 | 不支援 |
查詢 | 支援。二次探測法確定key在陣列中的下標位置,並返回它的value值 | 不支援 |
用途 | ExternalSorter需要map端的聚合時 | ExternalSorter不需要map端的聚合時 |
/**
* Append-only buffer of key-value pairs, each with a corresponding partition ID, that keeps track
* of its estimated size in bytes.
*
* The buffer can support up to 1073741819 elements.
*/
private[spark] class PartitionedPairBuffer[K, V](initialCapacity: Int = 64)
extends WritablePartitionedPairCollection[K, V] with SizeTracker
{
import PartitionedPairBuffer._
require(initialCapacity <= MAXIMUM_CAPACITY,
s"Can't make capacity bigger than ${MAXIMUM_CAPACITY} elements")
require(initialCapacity >= 1, "Invalid initial capacity")
// Basic growable array data structure. We use a single array of AnyRef to hold both the keys
// and the values, so that we can sort them efficiently with KVArraySortDataFormat.
private var capacity = initialCapacity
private var curSize = 0
private var data = new Array[AnyRef](2 * initialCapacity)
/** Add an element into the buffer */
def insert(partition: Int, key: K, value: V): Unit = {
if (curSize == capacity) {
growArray()
}
data(2 * curSize) = (partition, key.asInstanceOf[AnyRef])
data(2 * curSize + 1) = value.asInstanceOf[AnyRef]
curSize += 1
afterUpdate()
}
/** Double the size of the array because we've reached capacity */
private def growArray(): Unit = {
if (capacity >= MAXIMUM_CAPACITY) {
throw new IllegalStateException(s"Can't insert more than ${MAXIMUM_CAPACITY} elements")
}
val newCapacity =
if (capacity * 2 > MAXIMUM_CAPACITY) { // Overflow
MAXIMUM_CAPACITY
} else {
capacity * 2
}
val newArray = new Array[AnyRef](2 * newCapacity)
System.arraycopy(data, 0, newArray, 0, 2 * capacity)
data = newArray
capacity = newCapacity
resetSamples()
}
/** Iterate through the data in a given order. For this class this is not really destructive. */
override def partitionedDestructiveSortedIterator(keyComparator: Option[Comparator[K]])
: Iterator[((Int, K), V)] = {
val comparator = keyComparator.map(partitionKeyComparator).getOrElse(partitionComparator)
new Sorter(new KVArraySortDataFormat[(Int, K), AnyRef]).sort(data, 0, curSize, comparator)
iterator
}
private def iterator(): Iterator[((Int, K), V)] = new Iterator[((Int, K), V)] {
var pos = 0
override def hasNext: Boolean = pos < curSize
override def next(): ((Int, K), V) = {
if (!hasNext) {
throw new NoSuchElementException
}
val pair = (data(2 * pos).asInstanceOf[(Int, K)], data(2 * pos + 1).asInstanceOf[V])
pos += 1
pair
}
}
}
//伴生物件
private object PartitionedPairBuffer {
val MAXIMUM_CAPACITY: Int = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH / 2
}