Spark RDD之Partition
RDD概述
RDD是一個抽象類,主要包含五個部分:
- partitions列表
- 計算每一個split的函式
- 依賴rdd的列表(dependencies)
- 鍵值對rdd的partitioner
- 計算每個split的首選位置列表
其中最後兩個部分是可選的,以上五個部分對應著五個方法:
@DeveloperApi
def compute(split: Partition, context: TaskContext): Iterator[T]
protected def getPartitions: Array[Partition]
protected def getDependencies: Seq[Dependency[_]] = deps
protected def getPreferredLocations(split: Partition): Seq[String] = Nil
@transient val partitioner: Option[Partitioner] = None
Partition
一份待處理的原始資料會被按照相應的邏輯(例如jdbc和hdfs的split邏輯)切分成n份,每份資料對應到RDD中的一個Partition,Partition的數量決定了task的數量,影響著程式的並行度。Partition的原始碼如下:
trait Partition extends Serializable {
def index: Int
override def hashCode(): Int = index
override def equals(other: Any): Boolean = super.equals(other)
}
Partition和RDD是伴生的,即每一種RDD都有其對應的Partition實現,所以,分析Partition主要是分析其子類,我們關注兩個常用的子類,JdbcPartition和HadoopPartition。
JdbcPartition
JdbcPartition類包含於JdbcRDD.scala檔案,它繼承了Partition類,多加了兩個長整型屬性lower和upper。
private[spark] class JdbcPartition(idx: Int, val lower: Long, val upper: Long) extends Partition {
override def index: Int = idx
}
JdbcPartition被定義為一個半私有類,只有父包和子包可以訪問,對外暴露的介面是jdbcRDD,該RDD定義如下:
class JdbcRDD[T: ClassTag](
sc: SparkContext,
getConnection: () => Connection,
sql: String,
lowerBound: Long,
upperBound: Long,
numPartitions: Int,
mapRow: (ResultSet) => T = JdbcRDD.resultSetToObjectArray _)
extends RDD[T](sc, Nil) with Logging
其中有兩個重要方法getPartitions
和compute
:
override def getPartitions: Array[Partition] = {
val length = BigInt(1) + upperBound - lowerBound
(0 until numPartitions).map { i =>
val start = lowerBound + ((i * length) / numPartitions)
val end = lowerBound + (((i + 1) * length) / numPartitions) - 1
new JdbcPartition(i, start.toLong, end.toLong)
}.toArray
}
getPartitions
主要做的操作是將資料根據傳入的numPartitions引數進行分片並封裝成多個JdbcPartition類返回。
override def compute(thePart: Partition, context: TaskContext): Iterator[T] = new NextIterator[T]
{
context.addTaskCompletionListener{ context => closeIfNeeded() }
val part = thePart.asInstanceOf[JdbcPartition]
val conn = getConnection()
val stmt = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
stmt.setLong(1, part.lower)
stmt.setLong(2, part.upper)
val rs = stmt.executeQuery()
}
compute
將傳入的Partition強行轉化為JdbcPartition,連線資料庫並且對sql語句進行預處理,將JdbcPartition中的lower和upper作為資料庫中id的上下界傳入,並執行查詢語句。也就是說,每次呼叫compute時,不會對所有資料進行操作,而是隻對資料的一部分(也就是一個Partition)進行操作。
HadoopPartition
HadoopPartition與Partition相比,增加了一個Int整型和一個InputSplit型別(Hadoop中的資料分片,可以理解為另一類Partition)的資料,重寫了hashCode和equals方法,並增加了一個獲取Hadoop環境引數列表的方法getPipeEnvVars()
:
private[spark] class HadoopPartition(rddId: Int, override val index: Int, s: InputSplit)
extends Partition {
val inputSplit = new SerializableWritable[InputSplit](s)
override def hashCode(): Int = 31 * (31 + rddId) + index
override def equals(other: Any): Boolean = super.equals(other)
def getPipeEnvVars(): Map[String, String] = {
val envVars: Map[String, String] = if (inputSplit.value.isInstanceOf[FileSplit]) {
val is: FileSplit = inputSplit.value.asInstanceOf[FileSplit]
Map("map_input_file" -> is.getPath().toString(),
"mapreduce_map_input_file" -> is.getPath().toString())
} else {
Map()
}
envVars
}
}
與JdbcPartition相類似,HadoopPartition也被定義為一個半私有類,只有父包和子包可以訪問,對外暴露的介面是HadoopRDD,其中的獲取分片和計算的方法如下:
override def getPartitions: Array[Partition] = {
val jobConf = getJobConf()
SparkHadoopUtil.get.addCredentials(jobConf)
val allInputSplits = getInputFormat(jobConf).getSplits(jobConf, minPartitions)
val inputSplits = if (ignoreEmptySplits) {
allInputSplits.filter(_.getLength > 0)
} else {
allInputSplits
}
val array = new Array[Partition](inputSplits.size)
for (i <- 0 until inputSplits.size) {
array(i) = new HadoopPartition(id, i, inputSplits(i))
}
array
}
getPartitions
只是將Hadoop中的資料分片通過一系列處理之後,遍歷得到Spark中的Partition,並且設定了hash值的偏移量i。
override def compute(theSplit: Partition, context: TaskContext): InterruptibleIterator[(K, V)] = {
val iter = new NextIterator[(K, V)] {
private val split = theSplit.asInstanceOf[HadoopPartition]
private var reader: RecordReader[K, V] = null
private val inputFormat = getInputFormat(jobConf)
reader = inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL)
private val key: K = if (reader == null) null.asInstanceOf[K] else reader.createKey()
private val value: V = if (reader == null) null.asInstanceOf[V] else reader.createValue()
override def getNext(): (K, V) = {
try {
finished = !reader.next(key, value)
} catch {
}
(key, value)
}
}
new InterruptibleIterator[(K, V)](context, iter)
}
getPartitions
將傳入的Partition強行轉化為HadoopPartition,呼叫Hadoop的API將InputSplit轉化為InputFormat,獲取到InputFormat之後通過配置Reader讀取inputFormat中的資料並返回一個迭代器。
partition的數量如果在初始化SparkContext時沒有指定,則預設讀取spark.default.parallelism
中的配置,也可以通過傳參指定例如上述的JdbcPartition。同時Transformation也會影響partition的數目,例如union則是兩個rdd的partition相加,filter、map則是繼承父RDD的partition數,intersection是取兩者最大。
Partition數量的影響:
Partition數量太少:資源不能充分利用,例如local模式下,有16core,但是Partition數量僅為8的話,有一半的core沒利用到。 Partition數量太多:資源利用沒問題,但是導致task過多,task的序列化和傳輸的時間開銷增大。