1. 程式人生 > >Spark 核心 RDD 剖析(上)

Spark 核心 RDD 剖析(上)

本文將通過描述 Spark RDD 的五大核心要素來描述 RDD,若希望更全面瞭解 RDD 的知識,請移步 RDD 論文:RDD:基於記憶體的叢集計算容錯抽象

Spark 的五大核心要素包括:

  • partition
  • partitioner
  • compute func
  • dependency
  • preferredLocation

下面一一來介紹

(一): partition

partition 個數怎麼定

RDD 由若干個 partition 組成,共有三種生成方式:

  • 從 Scala 集合中建立,通過呼叫 SparkContext#makeRDDSparkContext#parallelize
  • 載入外部資料來建立 RDD,例如從 HDFS 檔案、mysql 資料庫讀取資料等
  • 由其他 RDD 執行 transform 操作轉換而來

那麼,在使用上述方法生成 RDD 的時候,會為 RDD 生成多少個 partition 呢?一般來說,載入 Scala 集合或外部資料來建立 RDD 時,是可以指定 partition 個數的,若指定了具體值,那麼 partition 的個數就等於該值,比如:

val rdd1 = sc.makeRDD( scalaSeqData, 3 )    //< 指定 partition 數為3
val rdd2 = sc.textFile( hdfsFilePath, 10 )  //< 指定 partition 數為10

若沒有指定具體的 partition 數時的 partition 數為多少呢?

  • 對於從 Scala 集合中轉換而來的 RDD:預設的 partition 數為 defaultParallelism,該值在不同的部署模式下不同:
    • Local 模式:本機 cpu cores 的數量
    • Mesos 模式:8
    • Yarn:max(2, 所有 executors 的 cpu cores 個數總和)
  • 對於從外部資料載入而來的 RDD:預設的 partition 數為 min(defaultParallelism, 2)
  • 對於執行轉換操作而得到的 RDD:視具體操作而定,如 map 得到的 RDD 的 partition 數與 父 RDD 相同;union 得到的 RDD 的 partition 數為父 RDDs 的 partition 數之和...

partition 的定義

我們常說,partition 是 RDD 的資料單位,代表了一個分割槽的資料。但這裡千萬不要搞錯了,partition 是邏輯概念,是代表了一個分片的資料,而不是包含或持有一個分片的資料。

真正直接持有資料的是各個 partition 對應的迭代器,要再次注意的是,partition 對應的迭代器訪問資料時也不是把整個分割槽的資料一股腦載入持有,而是像常見的迭代器一樣一條條處理。舉個例子,我們把 HDFS 上10G 的檔案載入到 RDD 做處理時,並不會消耗10G 的空間,如果沒有 shuffle 操作(shuffle 操作會持有較多資料在記憶體),那麼這個操作的記憶體消耗是非常小的,因為在每個 task 中都是一條條處理處理的,在某一時刻只會持有一條資料。這也是初學者常有的理解誤區,一定要注意 Spark 是基於記憶體的計算,但不會傻到什麼時候都把所有資料全放到記憶體。

讓我們來看看 Partition 的定義幫助理解:

trait Partition extends Serializable {
  def index: Int

  override def hashCode(): Int = index
}

在 trait Partition 中僅包含返回其索引的 index 方法。很多具體的 RDD 也會有自己實現的 partition,比如:

KafkaRDDPartition 提供了獲取 partition 所包含的 kafka msg 條數的方法

class KafkaRDDPartition(
  val index: Int,
  val topic: String,
  val partition: Int,
  val fromOffset: Long,
  val untilOffset: Long,
  val host: String,
  val port: Int
) extends Partition {
  /** Number of messages this partition refers to */
  def count(): Long = untilOffset - fromOffset
}

UnionRDD 的 partition 類 UnionPartition 提供了獲取依賴的父 partition 及獲取優先位置的方法

private[spark] class UnionPartition[T: ClassTag](
    idx: Int,
    @transient private val rdd: RDD[T],
    val parentRddIndex: Int,
    @transient private val parentRddPartitionIndex: Int)
  extends Partition {

  var parentPartition: Partition = rdd.partitions(parentRddPartitionIndex)

  def preferredLocations(): Seq[String] = rdd.preferredLocations(parentPartition)

  override val index: Int = idx
}

partition 與 iterator 方法

RDD 的 def iterator(split: Partition, context: TaskContext): Iterator[T] 方法用來獲取 split 指定的 Partition 對應的資料的迭代器,有了這個迭代器就能一條一條取出資料來按 compute chain 來執行一個個transform 操作。iterator 的實現如下:

  final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
    if (storageLevel != StorageLevel.NONE) {
      SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel)
    } else {
      computeOrReadCheckpoint(split, context)
    }
  }

def 前加了 final 說明該函式是不能被子類重寫的,其先判斷 RDD 的 storageLevel 是否為 NONE,若不是,則嘗試從快取中讀取,讀取不到則通過計算來獲取該 Partition 對應的資料的迭代器;若是,嘗試從 checkpoint 中獲取 Partition 對應資料的迭代器,若 checkpoint 不存在則通過計算來獲取。

剛剛介紹瞭如果從 cache 或者 checkpoint 無法獲得 Partition 對應的資料的迭代器,則需要通過計算來獲取,這將會呼叫到 def compute(split: Partition, context: TaskContext): Iterator[T] 方法,各個 RDD 最大的不同也體現在該方法中。後文會詳細介紹該方法

(二): partitioner

partitioner 即分割槽器,說白了就是決定 RDD 的每一條訊息應該分到哪個分割槽。但只有 k, v 型別的 RDD 才能有 partitioner(當然,非 key, value 型別的 RDD 的 partitioner 為 None),非 key, value 型別的 RDD 的 partition 為 None。

partitioner 為 None 的 RDD 的 partition 的資料要麼對應資料來源的某一段資料,要麼來自對父 RDDs 的 partitions 的處理結果。

我們先來看看 Partitioner 的定義及註釋說明:

abstract class Partitioner extends Serializable {
  //< 返回 partition 數量
  def numPartitions: Int
  //< 返回 key 應該屬於哪個 partition
  def getPartition(key: Any): Int
}

Partitioner 共有兩種實現,分別是 HashPartitioner 和 RangePartitioner

HashPartitioner

先來看 HashPartitioner 的實現(省去部分程式碼):

class HashPartitioner(partitions: Int) extends Partitioner {
  require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")

  def numPartitions: Int = partitions

  def getPartition(key: Any): Int = key match {
    case null => 0
    case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
  }

  ...
}

// x 對 mod 求於,若結果為正,則返回該結果;若結果為負,返回結果加上 mod
def nonNegativeMod(x: Int, mod: Int): Int = {
  val rawMod = x % mod
  rawMod + (if (rawMod < 0) mod else 0)
}

numPartitions 直接返回主建構函式中傳入的 partitions 引數,之前在有本書裡看到說 Partitioner 不僅決定了一條 record 應該屬於哪個 partition,還決定了 partition 的數量,其實這句話的後半段的有誤的,Partitioner 並不能決定一個 RDD 的 partition 數,Partitioner 方法返回的 partition 數是直接返回外部傳入的值。

getPartition 方法也不復雜,主要做了:

  1. 為引數 key 計算一個 hash 值
  2. 若該雜湊值對 partition 個數取與結果為正,則該結果即該 key 歸屬的 partition index;否則,以該結果加上 partition 個數為 partition index

從上面的分析來看,當 key, value 型別的 RDD 的 key 的 hash 值分佈不均勻時,會導致各個 partition 的資料量不均勻,極端情況下一個 partition 會持有整個 RDD 的資料而其他 partition 則不包含任何資料,這顯然不是我們希望看到的,這時就需要 RangePartitioner 出馬了。

RangePartitioner

上文也提到了,HashPartitioner 可能會導致各個 partition 資料量相差很大的情況。這時,初衷為使各個 partition 資料分佈儘量均勻的 RangePartitioner 便有了用武之地。

RangePartitioner 將一個範圍內的資料對映到 partition,這樣兩個 partition 之間要麼是一個 partition 的資料都比另外一個大,或者小。RangePartitioner採用水塘抽樣演算法,比 HashPartitioner 耗時,具體可見:Spark分割槽器HashPartitioner和RangePartitioner程式碼詳解



文/牛肉圓粉不加蔥(簡書作者)
原文連結:http://www.jianshu.com/p/54b3a4e786d9
著作權歸作者所有,轉載請聯絡作者獲得授權,並標註“簡書作者”。