spark的RDD練習(關於求學生的成績)
阿新 • • 發佈:2018-12-20
給定資料如下:
班級ID 姓名 年齡 性別 科目 成績 12 張三 25 男 chinese 50 12 張三 25 男 math 60 12 張三 25 男 english 70 12 李四 20 男 chinese 50 12 李四 20 男 math 50 12 李四 20 男 english 50 12 王芳 19 女 chinese 70 12 王芳 19 女 math 70 12 王芳 19 女 english 70 13 張大三 25 男 chinese 60 13 張大三 25 男 math 60 13 張大三 25 男 english 70 13 李大四 20 男 chinese 50 13 李大四 20 男 math 60 13 李大四 20 男 english 50 13 王小芳 19 女 chinese 70 13 王小芳 19 女 math 80 13 王小芳 19 女 english 70 |
需求:
1. 一共有多少人蔘加考試? 1.1 一共有多少個小於20歲的人蔘加考試? 1.2 一共有多少個等於20歲的人蔘加考試? 1.3 一共有多少個大於20歲的人蔘加考試? 2. 一共有多個男生參加考試? 2.1 一共有多少個女生參加考試? 3. 12班有多少人蔘加考試? 3.1 13班有多少人蔘加考試? 4. 語文科目的平均成績是多少? 4.1 數學科目的平均成績是多少? 4.2 英語科目的平均成績是多少? 5. 單個人平均成績是多少? 6. 12班平均成績是多少? 6.1 12班男生平均總成績是多少? 6.2 12班女生平均總成績是多少? 6.3 同理求13班相關成績 7. 全校語文成績最高分是多少? 7.1 12班語文成績最低分是多少? 7.2 13班數學最高成績是多少? 8. 總成績大於150分的12班的女生有幾個? 9. 總成績大於150分,且數學大於等於70,且年齡大於等於20歲的學生的平均成績是多少? |
這裡收集了2個人做的方式,其中有的會重複,但是主要是看方法
方式一:
需求如下: 1. 一共有多少人蔘加考試? val file = sc.textFile("file:///jar/score") val name = file.map(x => {val line = x.split(" ");line(0) + "," + line(1)}) val numPeo = name.distinct.count() 1.1 一共有多少個小於20歲的人蔘加考試? val file = sc.textFile("file:///jar/score") val age = file.map(x => {val line = x.split(" ");line(0) + "," + line(1) + "," + line(2)}) val numPeo = age.distinct.filter(_.split(",")(2).toInt<20).count() 1.2 一共有多少個等於20歲的人蔘加考試? val file = sc.textFile("file:///jar/score") val age = file.map(x => {val line = x.split(" ");line(0) + "," + line(1) + "," + line(2)}) val numPeo = age.distinct.filter(_.split(",")(2).toInt == 20).count() 1.3 一共有多少個大於20歲的人蔘加考試? val file = sc.textFile("file:///jar/score") val age = file.map(x => {val line = x.split(" ");line(0) + "," + line(1) + "," + line(2)}) val numPeo = age.distinct.filter(_.split(",")(2).toInt == 20).count() 2. 一共有多個男生參加考試? val file = sc.textFile("file:///jar/score") val sex = file.map(x => {val line = x.split(" ");line(0) + "," + line(1) + "," + line(3)}) val numPeo = sex.distinct.filter(_.split(",")(2) == "男").count() 2.1 一共有多少個女生參加考試? val file = sc.textFile("file:///jar/score") val sex = file.map(x => {val line = x.split(" ");line(0) + "," + line(1) + "," + line(3)}) val numPeo = sex.distinct.filter(_.split(",")(2) == "女").count() 3. 12班有多少人蔘加考試? val file = sc.textFile("file:///jar/score") val classNum = file.map(x => {val line = x.split(" ");line(0) + "," + line(1) }) val numPeo = classNum.distinct.filter(_.split(",")(0).toInt == 12).count() sc.makeRDD(Array(numPeo)).saveAsTextFile("file:///jar/result/class12numPeo") 3.1 13班有多少人蔘加考試? val file = sc.textFile("file:///jar/score") val classNum = file.map(x => {val line = x.split(" ");line(0) + "," + line(1) }) val numPeo = classNum.distinct.filter(_.split(",")(0).toInt == 13).count() sc.makeRDD(Array(numPeo)).saveAsTextFile("file:///jar/result/class13numPeo") 4. 語文科目的平均成績是多少? val chineseLine = file.map(x => {val line = x.split(" "); line(4)+ "," + line(5)}) val chineseGennal = chineseLine.filter(_.split(",")(0) == "chinese") val chineseLength = chineseGennal.count.toInt//6 val chineseSum = chineseGennal.map(_.split(",")(1).toInt).reduce(_ + _)//350 val chineseAvg = chineseSum/chineseLength//58 sc.makeRDD(Array(chineseGennal.map(_.split(",")(1).toInt) .reduce(_ + _)/chineseGennal.count.toInt)) .saveAsTextFile("file:///jar/result/chineseAvg") 4.1 數學科目的平均成績是多少? val mathLine = file.map(x => {val line = x.split(" "); line(4)+ "," + line(5)}) val mathGennal = mathLine.filter(_.split(",")(0) == "math") val mathLength = mathGennal.count.toInt val mathSum = mathGennal.map(_.split(",")(1).toInt).reduce(_ + _) val mathAvg = mathSum/mathLength sc.makeRDD(Array(mathGennal.map(_.split(",")(1).toInt) .reduce(_ + _)/mathGennal.count.toInt)) .saveAsTextFile("file:///jar/result/mathAvg") 4.2 英語科目的平均成績是多少? val englishLine = file.map(x => {val line = x.split(" "); line(4)+ "," + line(5)}) val englishGennal = englishLine.filter(_.split(",")(0) == "english") val englishLength = englishGennal.count.toInt val englishSum = englishGennal.map(_.split(",")(1).toInt).reduce(_ + _) val englishAvg = englishSum/englishLength sc.makeRDD(Array(englishGennal.map(_.split(",")(1).toInt) .reduce(_ + _)/englishGennal.count.toInt)) .saveAsTextFile("file:///jar/result/englishAvg") 5. 單個人平均成績是多少? val scoreLine = file.map(x => {val line = x.split(" "); (line(0)+","+line(1),line(5).toInt)}) val perScore = scoreLine.map(a => (a._1,(a._2,1))) .reduceByKey((a,b) => (a._1+b._1,a._2+b._2)) .map(y => (y._1,y._2._1/y._2._2)) .saveAsTextFile("file:///jar/result/perScore") 6. 12班平均成績是多少? val classScore12 = file.map(x => {val line = x.split(" "); (line(0),line(5).toInt)}) .filter(a =>(a._1 == "12")) classScore12.map(a => (a._1,(a._2,1))) .reduceByKey((a,b) => (a._1+b._1,a._2+b._2)) .map(y => (y._1,y._2._1/y._2._2))//12,60 .saveAsTextFile("file:///jar/result/perClass12") 6.1 12班男生平均總成績是多少? val BoyclassScore12 = file.map(x => {val line = x.split(" "); (line(0) + "," + line(3) + "," + line(5).toInt)}).filter(_.split(",")(0) == "12").filter(_.split(",")(1)=="男") val BoyclassScore12Num = BoyclassScore12.count//6 val BoyclassScore12Sum= BoyclassScore12.map(y => {val row = y.split(",");row(2).toInt}).reduce(_+_)//330 val BoyperClass12 = BoyclassScore12Sum/BoyclassScore12Num//55 6.2 12班女生平均總成績是多少? val GirlclassScore12 = file.map(x => {val line = x.split(" "); (line(0) + "," + line(3) + "," + line(5).toInt)}).filter(_.split(",")(0) == "12").filter(_.split(",")(1)=="女") val GirlclassScore12Num = GirlclassScore12.count//3 val GirlclassScore12Sum= GirlclassScore12.map(y => {val row = y.split(",");row(2).toInt}).reduce(_+_)//210 val GirlperClass12 = GirlclassScore12Sum/GirlclassScore12Num//70 6.3.0 13班平均成績是多少? val classScore13 = file.map(x => {val line = x.split(" "); (line(0),line(5).toInt)}).filter(a =>(a._1 == "13")) val perClass13 = classScore13.map(a => (a._1,(a._2,1))).reduceByKey((a,b) => (a._1+b._1,a._2+b._2)).map(y => (y._1,y._2._1/y._2._2))//12,63 6.3.1 13班男生平均總成績是多少? val BoyclassScore13 = file.map(x => {val line = x.split(" "); (line(0) + "," + line(3) + "," + line(5).toInt)}).filter(_.split(",")(0) == "13").filter(_.split(",")(1)=="男") val BoyclassScore13Num = BoyclassScore13.count//6 val BoyclassScore13Sum= BoyclassScore13.map(y => {val row = y.split(",");row(2).toInt}).reduce(_+_)//350 val BoyperClass13 = BoyclassScore13Sum/BoyclassScore13Num//58 6.3.2 13班女生平均總成績是多少? val GirlclassScore13 = file.map(x => {val line = x.split(" "); (line(0) + "," + line(3) + "," + line(5).toInt)}).filter(_.split(",")(0) == "13").filter(_.split(",")(1)=="女") val GirlclassScore13Num = GirlclassScore13.count//3 val GirlclassScore13Sum= GirlclassScore13.map(y => {val row = y.split(",");row(2).toInt}).reduce(_+_)//220 val GirlperClass13 = GirlclassScore13Sum/GirlclassScore13Num//73 7. 全校語文成績最高分是多少? val chineseLine = file.map(x => {val line = x.split(" "); line(4)+ "," + line(5)}) val chineseMax = chineseLine.distinct .filter(_.split(",")(0) == "chinese") .max 7.1 12班語文成績最低分是多少? val chineseLine12 = file.map(x => {val line = x.split(" "); line(0)+ "," + line(4)+ "," + line(5)}) val chineseMin12 = chineseLine12.distinct. .filter(_.split(",")(0).toInt == 12). .filter(_.split(",")(1) == "chinese") .min .saveAsTextFile("file:///jar/result/chineseMin12") val chineseMax = file.map(x => {val line = x.split(" "); (line(4),line(5).toInt)}) sc.makeRDD(Array(chineseMax.filter( a=>(a._1.equals("chinese"))) .map(a => (a._2)).max)) .saveAsTextFile("file:///jar/result/chineseMin12") 7.2 13班數學最高成績是多少? val mathLine13 = file.map(x => {val line = x.split(" "); line(0)+ "," + line(4)+ "," + line(5)}) val mathMax13 = mathLine13.distinct.filter(_.split(",")(0).toInt == 13).filter(_.split(",")(1) == "math").max//mathMax13: String = 13,math,80 val mathLine13 = file.map(x => {val line = x.split(" "); (line(0)+ "," + line(4),line(5).toInt)}) val mathMax13 = mathLine13.filter(a => (a._1.split(",")(1).equals("math")) && (a._1.split(",")(0).equals("13"))).max//mathMax13: (String, Int) = (13,math,80) 8. 總成績大於150分的12班的女生有幾個? val sumScore12Line = file.map(x => {val line = x.split(" "); (line(0)+","+line(1)+","+line(3),line(5).toInt)}) val sumScore12Dayu150 = sumScore12Line.reduceByKey(_+_).filter(a => (a._2>150 && a._1.split(",")(0).equals("12") && a._1.split(",")(2).equals("女"))).count 9. 總成績大於150分,且數學大於等於70,且年齡大於等於19歲的學生的平均成績是多少? val complex1 = file.map(x => {val line = x.split(" "); (line(0)+","+line(1)+","+line(3),line(5).toInt)}) val complex2 = file.map(x => {val line = x.split(" "); (line(0)+","+line(1)+","+line(3)+","+line(4),line(5).toInt)}) //過濾出總分大於150的,並求出平均成績 val com1 = complex1.map(a => (a._1, (a._2, 1))).reduceByKey((a,b) => (a._1+b._1,a._2+b._2)).filter(a => (a._2._1>150)).map(t => (t._1,t._2._1/t._2._2)) //過濾出 數學大於等於70,且年齡大於等於19歲的學生 val com2 = complex2.filter(a => {val line = a._1.split(","); line(3).equals("math") && a._2>70}) .map(a => {val line2 = a._1.split(","); (line2(0)+","+line2(1)+","+line2(2),a._2.toInt)}) (com1).join(com2).map(a =>(a._1,a._2._1))
方式2:
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
object SparkT2 {
case class Person(classID:Int,name:String,age:Int, sex:String,keMu:String, score:Int)
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("SparkWordCount").setMaster("local")
//建立SparkContext,提交作業
//map(x => Person(x(0).toInt, x(1), x(2).toInt))
val sc = new SparkContext(conf)
val rdd1: RDD[Array[String]] = sc.textFile("D:\\spark/ksdata.txt").map(_.split(" "))
val rdd2: RDD[(String, String, String, String, String, String)] = rdd1.map(x => (x(0),x(1),x(2),x(3),x(4),x(5)))
//T1
/*
1. 一共有多少人蔘加考試?
1.1 一共有多少個小於20歲的人蔘加考試?
1.2 一共有多少個等於20歲的人蔘加考試?
1.3 一共有多少個大於20歲的人蔘加考試?
*/
val rdd3: Long = rdd2.groupBy(_._2).count()
println(rdd3)
//T1.3
val rdd4 = rdd2.filter(_._3.toInt<20).groupBy(_._2).count()
println(rdd4)
val rdd5= rdd2.filter(_._3.toInt == 20).groupBy(_._2).count()
println(rdd5)
val rdd6: RDD[(String, String, String, String, String, String)] = rdd2.filter(_._3.toInt>20)
rdd6.foreach(println)
val res6 = rdd6.groupBy(_._2).count()
println(res6)
/*
2. 一共有多個男生參加考試?
2.1 一共有多少個女生參加考試?
*/
val rdd7 = rdd2.filter(_._4.equals("男")).groupBy(_._2).count()
println(rdd7)
val rdd8 = rdd2.filter(_._4.equals("女")).groupBy(_._2).count()
println(rdd8)
/*
3. 12班有多少人蔘加考試?
3.1 13班有多少人蔘加考試?
*/
val rdd9 = rdd2.filter(_._1.toInt == 12).groupBy(_._2).count()
println(rdd9)
val rdd10 = rdd2.filter(_._1.toInt == 13).groupBy(_._2).count()
println(rdd10)
4. 語文科目的平均成績是多少?
4.1 數學科目的平均成績是多少?
4.2 英語科目的平均成績是多少?
val rdd9_2: Long = rdd2.filter(_._5.equals("chinese")).count()
val rdd9_1: Int = rdd2.filter(_._5.equals("chinese")).map(x => x._6.toInt).reduce(_+_)
val rdd9 = rdd9_1/rdd9_2
println(rdd9)
val rdd10_2: Long = rdd2.filter(_._5.equals("english")).count()
val rdd10_1: Int = rdd2.filter(_._5.equals("english")).map(x => x._6.toInt).reduce(_+_)
val rdd10 = rdd10_1/rdd10_2
println(rdd10)
val rdd11_2: Long = rdd2.filter(_._5.equals("math")).count()
val rdd11_1: Int = rdd2.filter(_._5.equals("math")).map(x => x._6.toInt).reduce(_+_)
val rdd11 = rdd11_1/rdd11_2
println(rdd11)
// 5. 單個人平均成績是多少?
val rdd12_1: RDD[(String, Iterable[(String, String, String, String, String, String)])] = rdd2.groupBy(_._2)
val rdd12_2 = rdd2.groupBy(_._5).count()
println(rdd12_2)
val rdd12: RDD[(String, Long)] = rdd12_1.mapValues(x => x.map(s => s._6.toInt).reduce(_+_)/rdd12_2)
rdd12.foreach(println)
6. 12班平均成績是多少?
6.1 12班男生平均總成績是多少?
6.2 12班女生平均總成績是多少?
6.3 同理求13班相關成績*/
//T6.1
val rdd13_1: Long = rdd2.filter(_._1.toInt==12).groupBy(_._2).count()
val rdd13_2 = rdd2.groupBy(_._5).count()
val rdd13_3: RDD[(String, Long)] = rdd2.filter(_._1.toInt==12).groupBy(_._1).mapValues(x => x.map(s => s._6.toInt).reduce(_+_)/(rdd13_1*rdd13_2))
rdd13_3.foreach(println)*/
//T6.2
val r_man_1 = rdd2.filter(_._1.toInt==12).filter(_._4.equals("男")).groupBy(_._2).count()
val r_man_2: Long = rdd2.filter(_._1.toInt==12).filter(_._4.equals("男")).map(s => s._6.toInt).reduce(_+_)/r_man_1
println(r_man_2)*/
//其他同理
7. 全校語文成績最高分是多少?
7.1 12班語文成績最低分是多少?
7.2 13班數學最高成績是多少?*/
//T7.1
val rddt7: Array[(Int, String)] = rdd2.filter(_._5.equals("chinese")).map(x => (x._6.toInt,x._2)).top(1)
rdd10.foreach(print)
//T7.2
val r_man_2 = rdd2.filter(_._1.toInt==12).filter(_._5.equals("chinese")).map(x => (x._6.toInt,x._6)).takeOrdered(1)
r_man_2.foreach(println)
//T7.3
val r_math = rdd2.filter(_._1.toInt==13).filter(_._5.equals("math")).map(x => (x._6.toInt,x._2)).top(1)
r_math.foreach(println)
// 8. 總成績大於150分的12班的女生有幾個?
val rddt8: Long = rdd2.filter(x => if(x._1.toInt==12 && x._4.equals("女")) true else false).groupBy(_._2).mapValues(x => x.map(x => x._6.toInt).reduce(_+_)).map(_._2>150).count()
println(rddt8)
rddt8.foreach(println)
//總成績大於150分,且數學大於等於70,且年齡大於等於20歲的學生的平均成績是多少?
//-----------9. 總成績大於150分,且數學大於等於70,且年齡大於等於19歲的學生的平均成績是多少?--------------
//班級 科目 成績 性別 姓名 age
val quan=lines.map(x=>{
val a = x.split(" ")
(a(0),a(4),a(5).toInt,a(3),a(1),a(2).toInt)
})
//班級 科目 成績 性別 姓名 age
val mathsore1= quan.filter(_._2.equals("math")).filter(_._3>=70).filter(_._6>=19).map(x=>{
val a=1;
(x._5,a)
})
println(mathsore1.collect().toBuffer)
//ArrayBuffer((王芳,1), (王小芳,1))
val quan1= quan.map(x=>{
(x._5,x._3)
})
val rdd3=quan1.join(mathsore1)
println(rdd3.collect().toBuffer)
//ArrayBuffer((王芳,(70,1)), (王芳,(70,1)), (王芳,(70,1)), (王小芳,(70,1)), (王小芳,(80,1)), (王小芳,(70,1)))
val rdd4=rdd3.reduceByKey((a,b) => (a._1+b._1,a._2+b._2)).filter(_._2._1>150).map(x=>{
val a = (x._2._1/x._2._2).toDouble
val d = a.formatted("%.2f")
(x._1,d)
})
rdd4.foreach(println)
//(王芳,70.00)
//(王小芳,73.00)
}
}
方式2的第9問使用join的方式是比較簡便