1. 程式人生 > >spark的RDD練習(關於求學生的成績)

spark的RDD練習(關於求學生的成績)

給定資料如下:

資料
班級ID 姓名 年齡 性別 科目 成績 12 張三 25 男 chinese 50 12 張三 25 男 math 60 12 張三 25 男 english 70 12 李四 20 男 chinese 50 12 李四 20 男 math 50 12 李四 20 男 english 50 12 王芳 19 女 chinese 70 12 王芳 19 女 math 70 12 王芳 19 女 english 70 13 張大三 25 男 chinese 60 13 張大三 25 男 math 60 13 張大三 25 男 english 70 13 李大四 20 男 chinese 50 13 李大四 20 男 math 60 13 李大四 20 男 english 50 13 王小芳 19 女 chinese 70 13 王小芳 19 女 math 80 13 王小芳 19 女 english 70

需求:

需求

1. 一共有多少人蔘加考試? 1.1 一共有多少個小於20歲的人蔘加考試? 1.2 一共有多少個等於20歲的人蔘加考試? 1.3 一共有多少個大於20歲的人蔘加考試?

2. 一共有多個男生參加考試? 2.1 一共有多少個女生參加考試?

3. 12班有多少人蔘加考試? 3.1 13班有多少人蔘加考試?

4. 語文科目的平均成績是多少? 4.1 數學科目的平均成績是多少? 4.2 英語科目的平均成績是多少?

5. 單個人平均成績是多少?

6. 12班平均成績是多少? 6.1 12班男生平均總成績是多少? 6.2 12班女生平均總成績是多少? 6.3 同理求13班相關成績

7. 全校語文成績最高分是多少? 7.1 12班語文成績最低分是多少? 7.2 13班數學最高成績是多少?

8. 總成績大於150分的12班的女生有幾個?

9. 總成績大於150分,且數學大於等於70,且年齡大於等於20歲的學生的平均成績是多少?

這裡收集了2個人做的方式,其中有的會重複,但是主要是看方法

方式一:

需求如下:
1. 一共有多少人蔘加考試?
val file = sc.textFile("file:///jar/score")
val name = file.map(x => {val line = x.split(" ");line(0) + "," + line(1)})
val numPeo = name.distinct.count()


1.1 一共有多少個小於20歲的人蔘加考試?
val file = sc.textFile("file:///jar/score")
val age = file.map(x => {val line = x.split(" ");line(0) + "," + line(1) + "," + line(2)})
val numPeo = age.distinct.filter(_.split(",")(2).toInt<20).count()

1.2  一共有多少個等於20歲的人蔘加考試?
val file = sc.textFile("file:///jar/score")
val age = file.map(x => {val line = x.split(" ");line(0) + "," + line(1) + "," + line(2)})
val numPeo = age.distinct.filter(_.split(",")(2).toInt == 20).count()

1.3 一共有多少個大於20歲的人蔘加考試?
val file = sc.textFile("file:///jar/score")
val age = file.map(x => {val line = x.split(" ");line(0) + "," + line(1) + "," + line(2)})
val numPeo = age.distinct.filter(_.split(",")(2).toInt == 20).count()

2. 一共有多個男生參加考試?
val file = sc.textFile("file:///jar/score")
val sex = file.map(x => {val line = x.split(" ");line(0) + "," + line(1) + "," + line(3)})
val numPeo = sex.distinct.filter(_.split(",")(2) == "男").count()

2.1 一共有多少個女生參加考試?
val file = sc.textFile("file:///jar/score")
val sex = file.map(x => {val line = x.split(" ");line(0) + "," + line(1) + "," + line(3)})
val numPeo = sex.distinct.filter(_.split(",")(2) == "女").count()

3. 12班有多少人蔘加考試?
val file = sc.textFile("file:///jar/score")
val classNum = file.map(x => {val line = x.split(" ");line(0) + "," + line(1) })
val numPeo = classNum.distinct.filter(_.split(",")(0).toInt == 12).count()
sc.makeRDD(Array(numPeo)).saveAsTextFile("file:///jar/result/class12numPeo")

3.1 13班有多少人蔘加考試?
val file = sc.textFile("file:///jar/score")
val classNum = file.map(x => {val line = x.split(" ");line(0) + "," + line(1) })
val numPeo = classNum.distinct.filter(_.split(",")(0).toInt == 13).count()
sc.makeRDD(Array(numPeo)).saveAsTextFile("file:///jar/result/class13numPeo")

4. 語文科目的平均成績是多少?
val chineseLine = file.map(x => {val line = x.split(" "); line(4)+ "," + line(5)})
val chineseGennal = chineseLine.filter(_.split(",")(0) == "chinese")
val chineseLength = chineseGennal.count.toInt//6
val chineseSum = chineseGennal.map(_.split(",")(1).toInt).reduce(_ + _)//350
val chineseAvg = chineseSum/chineseLength//58
sc.makeRDD(Array(chineseGennal.map(_.split(",")(1).toInt)
						      .reduce(_ + _)/chineseGennal.count.toInt))
							  .saveAsTextFile("file:///jar/result/chineseAvg")

4.1 數學科目的平均成績是多少?
val mathLine = file.map(x => {val line = x.split(" "); line(4)+ "," + line(5)})
val mathGennal = mathLine.filter(_.split(",")(0) == "math")
val mathLength = mathGennal.count.toInt
val mathSum = mathGennal.map(_.split(",")(1).toInt).reduce(_ + _)
val mathAvg = mathSum/mathLength
sc.makeRDD(Array(mathGennal.map(_.split(",")(1).toInt)
						   .reduce(_ + _)/mathGennal.count.toInt))
						   .saveAsTextFile("file:///jar/result/mathAvg")

4.2 英語科目的平均成績是多少?
val englishLine = file.map(x => {val line = x.split(" "); line(4)+ "," + line(5)})
val englishGennal = englishLine.filter(_.split(",")(0) == "english")
val englishLength = englishGennal.count.toInt
val englishSum = englishGennal.map(_.split(",")(1).toInt).reduce(_ + _)
val englishAvg = englishSum/englishLength
sc.makeRDD(Array(englishGennal.map(_.split(",")(1).toInt)
							  .reduce(_ + _)/englishGennal.count.toInt))
							  .saveAsTextFile("file:///jar/result/englishAvg")

5. 單個人平均成績是多少?
val scoreLine = file.map(x => {val line = x.split(" "); (line(0)+","+line(1),line(5).toInt)})
val perScore = scoreLine.map(a => (a._1,(a._2,1)))
			            .reduceByKey((a,b) => (a._1+b._1,a._2+b._2))
						.map(y => (y._1,y._2._1/y._2._2))
						.saveAsTextFile("file:///jar/result/perScore")

6. 12班平均成績是多少?
val classScore12 = file.map(x => {val line = x.split(" "); (line(0),line(5).toInt)})
					   .filter(a =>(a._1 == "12"))
classScore12.map(a => (a._1,(a._2,1)))
		    .reduceByKey((a,b) => (a._1+b._1,a._2+b._2))
		    .map(y => (y._1,y._2._1/y._2._2))//12,60
		    .saveAsTextFile("file:///jar/result/perClass12")

6.1 12班男生平均總成績是多少?
val BoyclassScore12 = file.map(x => {val line = x.split(" "); (line(0) + "," + line(3) + "," + line(5).toInt)}).filter(_.split(",")(0) == "12").filter(_.split(",")(1)=="男")
val BoyclassScore12Num = BoyclassScore12.count//6
val BoyclassScore12Sum= BoyclassScore12.map(y => {val row = y.split(",");row(2).toInt}).reduce(_+_)//330
val BoyperClass12 = BoyclassScore12Sum/BoyclassScore12Num//55


6.2 12班女生平均總成績是多少?
val GirlclassScore12 = file.map(x => {val line = x.split(" "); (line(0) + "," + line(3) + "," + line(5).toInt)}).filter(_.split(",")(0) == "12").filter(_.split(",")(1)=="女")
val GirlclassScore12Num = GirlclassScore12.count//3
val GirlclassScore12Sum= GirlclassScore12.map(y => {val row = y.split(",");row(2).toInt}).reduce(_+_)//210
val GirlperClass12 = GirlclassScore12Sum/GirlclassScore12Num//70

6.3.0 13班平均成績是多少?
val classScore13 = file.map(x => {val line = x.split(" "); (line(0),line(5).toInt)}).filter(a =>(a._1 == "13"))
val perClass13 = classScore13.map(a => (a._1,(a._2,1))).reduceByKey((a,b) => (a._1+b._1,a._2+b._2)).map(y => (y._1,y._2._1/y._2._2))//12,63

6.3.1 13班男生平均總成績是多少?
val BoyclassScore13 = file.map(x => {val line = x.split(" "); (line(0) + "," + line(3) + "," + line(5).toInt)}).filter(_.split(",")(0) == "13").filter(_.split(",")(1)=="男")
val BoyclassScore13Num = BoyclassScore13.count//6
val BoyclassScore13Sum= BoyclassScore13.map(y => {val row = y.split(",");row(2).toInt}).reduce(_+_)//350
val BoyperClass13 = BoyclassScore13Sum/BoyclassScore13Num//58
6.3.2 13班女生平均總成績是多少?
val GirlclassScore13 = file.map(x => {val line = x.split(" "); (line(0) + "," + line(3) + "," + line(5).toInt)}).filter(_.split(",")(0) == "13").filter(_.split(",")(1)=="女")
val GirlclassScore13Num = GirlclassScore13.count//3
val GirlclassScore13Sum= GirlclassScore13.map(y => {val row = y.split(",");row(2).toInt}).reduce(_+_)//220
val GirlperClass13 = GirlclassScore13Sum/GirlclassScore13Num//73

7. 全校語文成績最高分是多少?
val chineseLine = file.map(x => {val line = x.split(" "); line(4)+ "," + line(5)})
val chineseMax = chineseLine.distinct
							.filter(_.split(",")(0) == "chinese")
							.max


7.1 12班語文成績最低分是多少?
val chineseLine12 = file.map(x => {val line = x.split(" "); line(0)+ "," + line(4)+ "," + line(5)})
val chineseMin12 = chineseLine12.distinct.
				   				.filter(_.split(",")(0).toInt == 12).
				   				.filter(_.split(",")(1) == "chinese")
                   				.min
                   				.saveAsTextFile("file:///jar/result/chineseMin12")

val chineseMax = file.map(x => {val line = x.split(" "); (line(4),line(5).toInt)})
sc.makeRDD(Array(chineseMax.filter( a=>(a._1.equals("chinese")))
						   .map(a => (a._2)).max))
                           .saveAsTextFile("file:///jar/result/chineseMin12")

7.2 13班數學最高成績是多少?
val mathLine13 = file.map(x => {val line = x.split(" "); line(0)+ "," + line(4)+ "," + line(5)})
val mathMax13 = mathLine13.distinct.filter(_.split(",")(0).toInt == 13).filter(_.split(",")(1) == "math").max//mathMax13: String = 13,math,80

val mathLine13 = file.map(x => {val line = x.split(" "); (line(0)+ "," + line(4),line(5).toInt)})
val mathMax13 = mathLine13.filter(a => (a._1.split(",")(1).equals("math")) && (a._1.split(",")(0).equals("13"))).max//mathMax13: (String, Int) = (13,math,80)

8. 總成績大於150分的12班的女生有幾個?
val sumScore12Line = file.map(x => {val line = x.split(" "); (line(0)+","+line(1)+","+line(3),line(5).toInt)})
val sumScore12Dayu150 = sumScore12Line.reduceByKey(_+_).filter(a => (a._2>150 && a._1.split(",")(0).equals("12") && a._1.split(",")(2).equals("女"))).count

9. 總成績大於150分,且數學大於等於70,且年齡大於等於19歲的學生的平均成績是多少?
val complex1 = file.map(x => {val line = x.split(" "); (line(0)+","+line(1)+","+line(3),line(5).toInt)})
val complex2 = file.map(x => {val line = x.split(" "); (line(0)+","+line(1)+","+line(3)+","+line(4),line(5).toInt)})
 //過濾出總分大於150的,並求出平均成績
val com1 = complex1.map(a => (a._1, (a._2, 1))).reduceByKey((a,b) => (a._1+b._1,a._2+b._2)).filter(a => (a._2._1>150)).map(t => (t._1,t._2._1/t._2._2))
//過濾出 數學大於等於70,且年齡大於等於19歲的學生
val com2 = complex2.filter(a => {val line = a._1.split(","); line(3).equals("math") && a._2>70})
				   .map(a => {val line2 = a._1.split(","); (line2(0)+","+line2(1)+","+line2(2),a._2.toInt)})

(com1).join(com2).map(a =>(a._1,a._2._1))

方式2:



import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object SparkT2 {
  case class Person(classID:Int,name:String,age:Int, sex:String,keMu:String, score:Int)
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("SparkWordCount").setMaster("local")
    //建立SparkContext,提交作業
    //map(x => Person(x(0).toInt, x(1), x(2).toInt))
    val sc = new SparkContext(conf)
    val rdd1: RDD[Array[String]] = sc.textFile("D:\\spark/ksdata.txt").map(_.split(" "))

    val rdd2: RDD[(String, String, String, String, String, String)] = rdd1.map(x => (x(0),x(1),x(2),x(3),x(4),x(5)))
   //T1
    /*
      1. 一共有多少人蔘加考試?
      1.1 一共有多少個小於20歲的人蔘加考試?
      1.2 一共有多少個等於20歲的人蔘加考試?
      1.3 一共有多少個大於20歲的人蔘加考試?
      */
    val rdd3: Long = rdd2.groupBy(_._2).count()
    println(rdd3)
    //T1.3
    val rdd4 = rdd2.filter(_._3.toInt<20).groupBy(_._2).count()
    println(rdd4)
    val rdd5= rdd2.filter(_._3.toInt == 20).groupBy(_._2).count()
    println(rdd5)
    val rdd6: RDD[(String, String, String, String, String, String)] = rdd2.filter(_._3.toInt>20)
    rdd6.foreach(println)
    val res6 = rdd6.groupBy(_._2).count()
    println(res6)
    /*
      2. 一共有多個男生參加考試?
      2.1 一共有多少個女生參加考試?
      */
    val rdd7 = rdd2.filter(_._4.equals("男")).groupBy(_._2).count()
    println(rdd7)
    val rdd8 = rdd2.filter(_._4.equals("女")).groupBy(_._2).count()
    println(rdd8)

    /*
      3. 12班有多少人蔘加考試?
      3.1 13班有多少人蔘加考試?
    */
    val rdd9 = rdd2.filter(_._1.toInt == 12).groupBy(_._2).count()
    println(rdd9)
    val rdd10 = rdd2.filter(_._1.toInt == 13).groupBy(_._2).count()
    println(rdd10)

   
    4. 語文科目的平均成績是多少?
    4.1 數學科目的平均成績是多少?
    4.2 英語科目的平均成績是多少?
    
    val rdd9_2: Long = rdd2.filter(_._5.equals("chinese")).count()
    val rdd9_1: Int = rdd2.filter(_._5.equals("chinese")).map(x => x._6.toInt).reduce(_+_)
    val rdd9 = rdd9_1/rdd9_2
    println(rdd9)
    val rdd10_2: Long = rdd2.filter(_._5.equals("english")).count()
    val rdd10_1: Int = rdd2.filter(_._5.equals("english")).map(x => x._6.toInt).reduce(_+_)
    val rdd10 = rdd10_1/rdd10_2
    println(rdd10)
    val rdd11_2: Long = rdd2.filter(_._5.equals("math")).count()
    val rdd11_1: Int = rdd2.filter(_._5.equals("math")).map(x => x._6.toInt).reduce(_+_)
    val rdd11 = rdd11_1/rdd11_2
    println(rdd11)



//    5. 單個人平均成績是多少?
    val rdd12_1: RDD[(String, Iterable[(String, String, String, String, String, String)])] = rdd2.groupBy(_._2)
    val rdd12_2 = rdd2.groupBy(_._5).count()
    println(rdd12_2)
    val rdd12: RDD[(String, Long)] = rdd12_1.mapValues(x => x.map(s => s._6.toInt).reduce(_+_)/rdd12_2)
    rdd12.foreach(println)



    6. 12班平均成績是多少?
    6.1 12班男生平均總成績是多少?
    6.2 12班女生平均總成績是多少?
    6.3 同理求13班相關成績*/
    //T6.1
   val rdd13_1: Long = rdd2.filter(_._1.toInt==12).groupBy(_._2).count()
    val rdd13_2 = rdd2.groupBy(_._5).count()
    val rdd13_3: RDD[(String, Long)] = rdd2.filter(_._1.toInt==12).groupBy(_._1).mapValues(x => x.map(s => s._6.toInt).reduce(_+_)/(rdd13_1*rdd13_2))
   rdd13_3.foreach(println)*/
    //T6.2
    val r_man_1 = rdd2.filter(_._1.toInt==12).filter(_._4.equals("男")).groupBy(_._2).count()
    val r_man_2: Long = rdd2.filter(_._1.toInt==12).filter(_._4.equals("男")).map(s => s._6.toInt).reduce(_+_)/r_man_1
    println(r_man_2)*/
  //其他同理

  7. 全校語文成績最高分是多少?
  7.1 12班語文成績最低分是多少?
  7.2 13班數學最高成績是多少?*/
  //T7.1
  val rddt7: Array[(Int, String)] = rdd2.filter(_._5.equals("chinese")).map(x => (x._6.toInt,x._2)).top(1)
  rdd10.foreach(print)
    
  //T7.2
  val r_man_2 = rdd2.filter(_._1.toInt==12).filter(_._5.equals("chinese")).map(x => (x._6.toInt,x._6)).takeOrdered(1)
  r_man_2.foreach(println)
  
  //T7.3
   val r_math = rdd2.filter(_._1.toInt==13).filter(_._5.equals("math")).map(x => (x._6.toInt,x._2)).top(1)
  r_math.foreach(println)

//    8. 總成績大於150分的12班的女生有幾個?
  val rddt8: Long = rdd2.filter(x => if(x._1.toInt==12 && x._4.equals("女")) true else false).groupBy(_._2).mapValues(x => x.map(x => x._6.toInt).reduce(_+_)).map(_._2>150).count()
  println(rddt8)
    rddt8.foreach(println)
//總成績大於150分,且數學大於等於70,且年齡大於等於20歲的學生的平均成績是多少?

//-----------9. 總成績大於150分,且數學大於等於70,且年齡大於等於19歲的學生的平均成績是多少?--------------

    //班級  科目  成績  性別 姓名 age

    val quan=lines.map(x=>{
      val a = x.split(" ")
      (a(0),a(4),a(5).toInt,a(3),a(1),a(2).toInt)
    })

    //班級  科目  成績  性別 姓名 age
    val mathsore1= quan.filter(_._2.equals("math")).filter(_._3>=70).filter(_._6>=19).map(x=>{
      val a=1;
      (x._5,a)
    })
  println(mathsore1.collect().toBuffer)
    //ArrayBuffer((王芳,1), (王小芳,1))
    val quan1= quan.map(x=>{
      (x._5,x._3)
    })
    val rdd3=quan1.join(mathsore1)
    println(rdd3.collect().toBuffer)
   //ArrayBuffer((王芳,(70,1)), (王芳,(70,1)), (王芳,(70,1)), (王小芳,(70,1)), (王小芳,(80,1)), (王小芳,(70,1)))

    val rdd4=rdd3.reduceByKey((a,b) => (a._1+b._1,a._2+b._2)).filter(_._2._1>150).map(x=>{
      val a = (x._2._1/x._2._2).toDouble
      val d = a.formatted("%.2f")
      (x._1,d)
    })
    rdd4.foreach(println)
    //(王芳,70.00)
    //(王小芳,73.00)
  }
}

方式2的第9問使用join的方式是比較簡便