Spark Scalaa 幾個常用的示例
阿新 • • 發佈:2018-12-25
SparkWordCount 類原始碼 standalong 模式 import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ object SparkWordCount { def FILE_NAME:String = "word_count_results_"; def main(args:Array[String]) { if (args.length < 1) { println("Usage:SparkWordCount FileName"); System.exit(1); } val conf = new SparkConf().setAppName("Spark Exercise: Spark Version Word Count Program"); val sc = new SparkContext(conf); val textFile = sc.textFile(args(0)); val wordCounts = textFile.flatMap(line => line.split(" ")).map( word => (word, 1)).reduceByKey((a, b) => a + b) wordCounts.saveAsTextFile(FILE_NAME+System.currentTimeMillis()); println("Word Count program running results are successfully saved."); } } -------- ./spark-submit \ --class com.ibm.spark.exercise.basic.SparkWordCount \ --master spark://hadoop036166:7077 \ --num-executors 3 \ --driver-memory 6g --executor-memory 2g \ --executor-cores 2 \ /home/fams/sparkexercise.jar \ hdfs://hadoop036166:9000/user/fams/*.txt 求平均值 import org.apache.spark.SparkConf import org.apache.spark.SparkContext object AvgAgeCalculator { def main(args:Array[String]) { if (args.length < 1){ println("Usage:AvgAgeCalculator datafile") System.exit(1) } val conf = new SparkConf().setAppName("Spark Exercise:Average Age Calculator") val sc = new SparkContext(conf) val dataFile = sc.textFile(args(0), 5); val count = dataFile.count() val ageData = dataFile.map(line => line.split(" ")(1)) val totalAge = ageData.map(age => Integer.parseInt( String.valueOf(age))).collect().reduce((a,b) => a+b) println("Total Age:" + totalAge + ";Number of People:" + count ) val avgAge : Double = totalAge.toDouble / count.toDouble println("Average Age is " + avgAge) } } -------------------------- ./spark-submit \ --class com.ibm.spark.exercise.basic.AvgAgeCalculator \ --master spark://hadoop036166:7077 \ --num-executors 3 \ --driver-memory 6g \ --executor-memory 2g \ --executor-cores 2 \ /home/fams/sparkexercise.jar \ hdfs://hadoop036166:9000/user/fams/inputfiles/sample_age_data.txt 求男性/女性 最高 最低身高 ----------------------- object PeopleInfoCalculator { def main(args:Array[String]) { if (args.length < 1){ println("Usage:PeopleInfoCalculator datafile") System.exit(1) } val conf = new SparkConf().setAppName("Spark Exercise:People Info(Gender & Height) Calculator") val sc = new SparkContext(conf) val dataFile = sc.textFile(args(0), 5); val maleData = dataFile.filter(line => line.contains("M")).map( line => (line.split(" ")(1) + " " + line.split(" ")(2))) val femaleData = dataFile.filter(line => line.contains("F")).map( line => (line.split(" ")(1) + " " + line.split(" ")(2))) val maleHeightData = maleData.map(line => line.split(" ")(1).toInt) val femaleHeightData = femaleData.map(line => line.split(" ")(1).toInt) val lowestMale = maleHeightData.sortBy(x => x,true).first() val lowestFemale = femaleHeightData.sortBy(x => x,true).first() val highestMale = maleHeightData.sortBy(x => x, false).first() val highestFemale = femaleHeightData.sortBy(x => x, false).first() println("Number of Male Peole:" + maleData.count()) println("Number of Female Peole:" + femaleData.count()) println("Lowest Male:" + lowestMale) println("Lowest Female:" + lowestFemale) println("Highest Male:" + highestMale) println("Highest Female:" + highestFemale) } } ./spark-submit \ --class com.ibm.spark.exercise.basic.PeopleInfoCalculator \ --master spark://hadoop036166:7077 \ --num-executors 3 \ --driver-memory 6g \ --executor-memory 3g \ --executor-cores 2 \ /home/fams/sparkexercise.jar \ hdfs://hadoop036166:9000/user/fams/inputfiles/sample_people_info.txt 每行資料出現的次數最高的 ============= import org.apache.spark.SparkConf import org.apache.spark.SparkContext object TopKSearchKeyWords { def main(args:Array[String]){ if (args.length < 2) { println("Usage:TopKSearchKeyWords KeyWordsFile K"); System.exit(1) } val conf = new SparkConf().setAppName("Spark Exercise:Top K Searching Key Words") val sc = new SparkContext(conf) val srcData = sc.textFile(args(0)) val countedData = srcData.map(line => (line.toLowerCase(),1)).reduceByKey((a,b) => a+b) val sortedData = countedData.map{ case (k,v) => (v,k) }.sortByKey(false) val topKData = sortedData.take(args(1).toInt).map{ case (v,k) => (k,v) } topKData.foreach(println) } } ./spark-submit \ --class com.ibm.spark.exercise.basic.TopKSearchKeyWords \ --master spark://hadoop036166:7077 \ --num-executors 3 \ --driver-memory 6g \ --executor-memory 2g \ --executor-cores 2 \ /home/fams/sparkexercise.jar \ hdfs://hadoop036166:9000/user/fams/inputfiles/search_key_words.txt