Spark(九)【RDD的分割槽和自定義Partitioner】
目錄
spark的分割槽
Spark目前支援Hash分割槽和Range分割槽,使用者也可以自定義分割槽,Hash分割槽為當前的預設分割槽,Spark中分割槽器直接決定了RDD中分割槽的個數、RDD中每條資料經過Shuffle過程屬於哪個分割槽和Reduce的個數。
注意
(1)只有Key-Value型別的RDD才有分割槽器的,非Key-Value型別的RDD,分割槽器的值是None
(2)每個RDD的分割槽ID範圍:0~numPartitions-1,決定這個值是屬於那個分割槽的。
檢視RDD的分割槽器
scala> val pairs = sc.parallelize(List((1,1),(2,2),(3,3)))
pairs: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[3] at
scala> pairs.partitioner
res1: Option[org.apache.spark.Partitioner] = None
對RDD進行重新分割槽
val partitioned = pairs.partitionBy(new HashPartitioner(2)) partitioned: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[4] at partitionBy at <console>:27
一. Hash分割槽
HashPartitioner分割槽的原理:對於給定的key,計算其hashCode,併除以分割槽的個數取餘,如果餘數小於0,則用餘數+分割槽的個數(否則加0),最後返回的值就是這個key所屬的分割槽ID。
聚類! key相同,hashCode相同,分配到同一個區
問題:資料傾斜,每個分割槽中資料量的不均勻
二. Ranger分割槽
將一定範圍內的數對映到某一個分割槽內,儘量保證每個分割槽中資料量的均勻,而且分割槽與分割槽之間是有序的,一個分割槽中的元素肯定都是比另一個分割槽內的元素小或者大,但是分割槽內的元素是不能保證順序的
實現過程:
①抽樣產生邊界陣列
②將元素根據邊界陣列判斷屬於哪個區
三. 自定義Partitioner
實現過程
要實現自定義的分割槽器,你需要繼承 org.apache.spark.Partitioner 類並實現下面三個方法。
(1)numPartitions: Int:返回創建出來的分割槽數。
(2)getPartition(key: Any): Int:返回給定鍵的分割槽編號(0到numPartitions-1)。
(3)equals():Java 判斷相等性的標準方法。這個方法的實現非常重要,Spark 需要用這個方法來檢查你的分割槽器物件是否和其他分割槽器例項相同,這樣 Spark 才可以判斷兩個 RDD 的分割槽方式是否相同
使用
使用自定義的 Partitioner 是很容易的:只要把它傳給 partitionBy() 方法即可。
案例
需求:有以下資料,希望年齡相同的進入同一個區。
User("tom", 12), User("kobe", 12), User("mick", 22), User("jack", 23)
import org.apache.spark.{Partitioner, SparkConf, SparkContext}
/**
* @description: TODO
* @author: HaoWu
* @create: 2020年08月03日
*/
object MyPartitionerTest {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("RDDTest").setMaster("local[*]")
val sc = new SparkContext(conf)
val list = List(User("tom", 12), User("kobe", 12), User("mick", 22), User("jack", 23))
val result = sc.makeRDD(list).map {
case User(name, age) => (age, name)
}.partitionBy(new MyPartitioner(3))
result.saveAsTextFile("output")
}
}
/**
* User樣例類
*/
case class User(name: String, age: Int)
/**
* 自定義分割槽器
*/
class MyPartitioner(num: Int) extends Partitioner {
//設定分割槽數
override def numPartitions: Int = num
//分割槽規則
override def getPartition(key: Any): Int = {
//判斷是否為Int型別
if (!key.isInstanceOf[Int]) {
0
} else {
//Hash分割槽具有聚類的作用,相同age的會被分如同一個區
key.asInstanceOf[Int] % numPartitions
}
}
}