1. 程式人生 > 實用技巧 >spark-二次排序

spark-二次排序

排序檔案:

3,2
5,2
5,3
5,9
6,2
9,1
9,3
8,4

方法一:
 1 package spark.rdd
 2 
 3 import org.apache.spark.rdd.RDD
 4 import org.apache.spark.{SparkConf, SparkContext}
 5 
 6 class SecondarySortByKey(val first:Int, val second:Int) extends Ordered[SecondarySortByKey] with Serializable{
 7   override def compare(that: SecondarySortByKey): Int = {
8 if(this.first-that.first != 0){ 9 this.first - that.first 10 } else { 11 this.second - that.second 12 } 13 } 14 } 15 object SecondarySortApp { 16 // 第一列升序,第二列降序,巧妙使用List的預設排序方法 17 def main(args: Array[String]): Unit = { 18 val conf = new SparkConf().setAppName("SortByKey").setMaster("local[*]")
19 val sc = new SparkContext(conf) 20 sc.setLogLevel("ERROR") 21 val data = sc.textFile("/test/file/secondarySort.txt") 22 val lines = data.map(line => (new SecondarySortByKey(line.split(",")(0).toInt,line.split(",")(1).toInt),line)) 23 val sorted = lines.sortByKey(true) 24 sorted.map(line => line._2).collect().foreach(println(_))
25 } 26 27 28 }

第二種方法:

 1 def main(args: Array[String]): Unit = {
 2   val conf = new SparkConf().setAppName("SortByKey").setMaster("local[*]")
 3   val sc = new SparkContext(conf)
 4   sc.setLogLevel("ERROR")
 5   val data = sc.textFile("/test/file/secondarySort.txt",1)
 6   //第一個列升序
 7   val value: RDD[(String, String)] = data.coalesce(1,false).map(line => (line, line)).sortByKey(true)
 8   val value1: RDD[(String, List[String])] = data.map(line => (line.split(",")(0), line)).groupByKey(1).sortByKey(true).map(line => (line._1, line._2.toList.sortWith(_.compareTo(_) > 0)))
 9   value1.map(_._2).flatMap(_.mkString("@").split("@")).foreach(println)
10 }

注意:預設分割槽產生的影響。