Spark分組二次排序
阿新 • • 發佈:2019-02-07
package com.ibeifeng.spark.core
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable.ArrayBuffer
import scala.util.{Random, Try}
object TopN {
def main(args: Array[String]): Unit = {
val hdfs = "hdfs://192.168.1.102:8020"
//設定配置屬性
val conf = SparkConf()
.setMaster("dataNode1")
.setAppName("Secnodary-Sort")
.set("mapreduce.framework.name", "yarn")
.set("spark.rdd.compress", "true")
.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
.set("spark.storage.memoryFraction", "0.5")
.set("spark.akka.frameSize", "100")
.set("spark.default.parallelism", "1")
val sc = SparkContext.getOrCreate(conf)
//利用textFile方法建立RDD
val fileRDD: RDD[String] = sc.textFile(s"hdfs://${hdfs}/Data/emp.data")
val wordRDD: RDD[(String, Int)] = fileRDD.map(line => {
val arr = line.split(" ")
//排除資料異常和空格
(Try(arr(0).trim),Try(1).trim.toInt)
})
.groupByKey()
.sortByKey(true)
.map(x => (x._1,x._2.sortWith(_ > _)))
//結果資料輸出到HDFS
wordRDD.saveAsTextFile(s"${hdfs}/interviewData/resultData")