1. 程式人生 > >Spark RDD API 基本操作

Spark RDD API 基本操作

object RddTest {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[*]").setAppName("rdd test")
    val sc = new SparkContext(conf)
    //mapTest(sc)
    //val file= sc.textFile("/user-logs-large.txt")
    //file.flatMap(_.split("\\s")).foreach(println)
    //distinctTest(sc)
// filterTest(sc) // keyByTest(sc) //sortByTest(sc) // rePartitionTest(sc) groupBy(sc) sc.stop() } //flatMap map mapPatitions def mapTest(sc: SparkContext) = { val file = sc.textFile("/user-logs-large.txt") val mapResult = file.map(x => { val info = x.split("\\s") (info(
0), info(1)) }) //mapResult.take(10).foreach(println) //分割槽轉換 val mapPartitionResult = file.mapPartitions(x => { var info = new Array[String](3) for (line <- x) yield { info = line.split("\\s") (info(0), info(1)) } }) //mapPartitionResult.take(10).foreach(println)
//通過轉換把一條new_tweet的記錄轉換成2login的記錄 val flatMapResult = file.flatMap(x => { val info = x.split("\\s") info(0) match { case "new_tweet" => for (i <- 1 to 2) yield s"${info(0)} login ${info(2)}" case _ => Array(x) } }) //flatMapResult.take(10).foreach(println) } //distinct排重 def distinctTest(sc: SparkContext) = { val file = sc.textFile("/user-logs-large.txt", 3) val userRdd = file.map(_.split("\\t")(0)).distinct() userRdd.foreach(println) } //過濾 def filterTest(sc: SparkContext) = { val file = sc.textFile("/user-logs-large.txt", 3) val loginFilter = file.filter(_.split("\\s")(1) == "login") loginFilter.foreach(println) println(loginFilter.count()) } //keyBy 結果的key值是自定義的,v是原資料x def keyByTest(sc: SparkContext) = { val file = sc.textFile("/user-logs-large.txt", 3) val userActionType = file.keyBy(x => { val info = x.split("\\s") s"${info(0)}-----${info(1)}" }) userActionType.foreach(println) } //sortBy排序 def sortByTest(sc: SparkContext) = { val file = sc.textFile("/spark/part-00001") val sortByResult = file.sortBy(x => x.split("\\s+")(1).toInt) sortByResult.foreach(println) } //topN def topNTest(sc: SparkContext) = { val list = List(1, 2, 5, 11, 545, 22, 12, 55) val rdd = sc.parallelize(list, 2) val takeOrdered = rdd.takeOrdered(3) takeOrdered.foreach(println) //預設升序 val topN = rdd.top(3) topN.foreach(println) //預設降序 } //重新分割槽 def rePartitionTest(sc: SparkContext) = { val file = sc.textFile("/user-logs-large.txt") val result = file.repartition(5) file.foreachPartition(x => { println(s"fileRdd分割槽,該分割槽資料:${x.size} ") }) //reParttion分割槽 寬依賴 result.foreachPartition(x => { var sum = 0 x.foreach(x => sum += 1) println(s"resultRdd分割槽,該分割槽資料:$sum ") }) //coalsce分割槽 窄依賴 val coalResult = file.coalesce(3) coalResult.foreachPartition(x => { println(s"coalResultRdd:${x.size}") }) } //groupBy def groupBy(sc: SparkContext) = { val file = sc.textFile("/user-logs-large.txt") val groupBy = file.groupBy(x => x.split("\\s")(0)) groupBy.foreachPartition(x => { println(s"GroupByRdd分割槽,該分割槽資料:${x.size} ") }) groupBy.foreach(x => { println(s"GroupByRdd的一條記錄,key${x._1},value上集合的記錄是:${x._2.size}") }) //計算使用者登入次數 groupBy.foreach(x=>{ var sum=0 x._2.foreach(line=>{ line.split("\\s")(1) match{ case "login"=>sum+=1 case _=> } }) println(s"user:${x._1},logintimes:$sum") }) } def aggSumTest(sc: SparkContext)={ val list=List(3,5,8,9,12,55) val rdd=sc.parallelize(list,3) //reduce計算sum val reduceSum=rdd.reduce(_+_) //fold計算sum val foldRedult=rdd.fold(0)(_+_) //aggreagate把元素連成一個字串 val aggregateResult=rdd.aggregate("")((c,v)=>{ c match{ case ""=>v.toString case _=>s"$c,$v" } },(c1,c2)=>{ c1 match{ case ""=>c2 case _=>s"$c1,$c2" } }) println(s"reduceResult: $reduceSum") println(s"foldRedult: $foldRedult") println(s"aggregateResult: $aggregateResult") } //persist def persistTest(sc: SparkContext)={ val file=sc.textFile("/user-logs-large.txt") file.cache() file.persist(StorageLevel.MEMORY_ONLY) //計算使用者的數量 file.map(x=>x.split("\\s")(0)).distinct().count //計算ip的數量 file.map(x=>x.split("\\s")(2)).distinct().count }