1. 程式人生 > 其它 >第五章_Spark核心程式設計_Rdd_分割槽器

第五章_Spark核心程式設計_Rdd_分割槽器

1. 什麼是Rdd的分割槽器?

  *      key-value型別的Rdd在Shuffle時,會根據key的特質進行分割槽
  *      分割槽器就是 Partitioner的一個實現類
  *           通過指定 numPartitions 確定分割槽個數
  *                  getPartition(key: Any) 確定分割槽規則

2. Spark 分割槽器的種類?

  *     HashPartitioner(預設分割槽器)
  *         分割槽規則 : hash(key) % numPartitions
  *         缺點 : 當 大量key相同時,會導致分割槽不均,導致資料傾斜
  
* * RangePartitioner * 沒研究明白,待補充 * * 自定義分割槽器 * 實現 Partitioner介面,自定義

3. note

  *   1.Key-Value型別RDD才有分割槽器,非Key-Value型別RDD的分割槽值是None
  *   2.每個RDD的分割槽編號範圍為0~numPartitions-1,其決定資料所在分割槽

4. Spark分割槽器和MapReduce分割槽器的區別(自定義分割槽器)?

  *     MapReduce :
  *         def getPartition(key
: Nothing, value: Nothing, numPartitions: Int): Int
* 可以根據 key,value 進行分割槽 * Spark : * def getPartition(key: Any): Int * 只能根據 key 進行分割槽

5. 程式碼示例

  //自定義分割槽器
  //必須是 org.apache.spark.Partitioner的實現類
  //需求 : 將key按進入分割槽器的序號分割槽(將key打散)
  class custPartitioner(partitions: Int) extends
Partitioner { override def numPartitions: Int = partitions var index = 0 override def getPartition(key: Any): Int = { index += 1 index % numPartitions } } /*HashPartitioner*/ object HashPartitionerTest extends App { val sparkconf: SparkConf = new SparkConf().setMaster("local").setAppName("distinctTest") val sc: SparkContext = new SparkContext(sparkconf) val rdd: RDD[(Int, String)] = sc.makeRDD(List((1, "x"), (1, "x"), (1, "x"), (1, "x"), (1, "x"), (2, "x"), (2, "x")), 2) //使用 HashPartitioner 分割槽器 //@partitions 指定分割槽個數 val rdd1: RDD[(Int, String)] = rdd.partitionBy(new HashPartitioner(3)) //使用 RangePartitioner 分割槽器 val rdd2: RDD[(Int, String)] = rdd.partitionBy(new RangePartitioner(3, rdd)) //使用 自定義分割槽器 val rdd3: RDD[(Int, String)] = rdd.partitionBy(new custPartitioner(3)) // 使用 自定義分割槽器 // 使用匿名內部類 // 根據 key的奇偶性分割槽 private val rdd4: RDD[(Int, String)] = rdd.partitionBy( new Partitioner { override def numPartitions: Int = 3 //指定分割槽個數 override def getPartition(key: Any): Int = { if (key.asInstanceOf[Int].abs % 2 == 0) { 0 } else { 1 } } } ) //rdd1.saveAsTextFile("Spark_319/src/output/01") //rdd2.saveAsTextFile("Spark_319/src/output/02") rdd3.saveAsTextFile("Spark_319/src/output/03") sc.stop() }
-- HashPartitioner
part-00000
part-00001
    (1,x)
    (1,x)
    (1,x)
    (1,x)
    (1,x)
part-00002
    (2,x)
    (2,x)

-- custPartitioner
part-00000
    (1,x)
    (2,x)
part-00001
    (1,x)
    (1,x)
    (2,x)
part-00002
    (1,x)
    (1,x)