求該成績表當中出現相同分數的分數,還有次數,以及該分數的人數
阿新 • • 發佈:2018-12-03
/** * 求該成績表當中出現相同分數的分數,還有次數,以及該分數的人數 * @author Administrator * */ public class HomeWork03 { //key---科目+分數 Text value----人名 Text static class MyMapper extends Mapper<LongWritable, Text, Text, Text>{ Text mk=new Text(); Text mv=new Text(); @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException { //獲取每一行內容 進行切分 String[] datas = value.toString().split(","); if(datas.length==3){ //key value封裝 mk.set(datas[0]+"\t"+datas[2]); mv.set(datas[1]); context.write(mk, mv); } } } //相同科目 相同分數的 人數 都是誰 key:相同科目 相同分數的 value:人數 都是誰 static class MyReducer extends Reducer<Text, Text, Text, Text>{ Text rv=new Text(); @Override //key==computer 85 values<huangzitao,liujialing,huangxiaoming> protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException { //迴圈遍歷values 注意:values只能迴圈遍歷一次 指標的操作 //計數變數 int count=0; StringBuffer sb=new StringBuffer(); for(Text v:values){ count++; sb.append(v.toString()).append(","); } //傳送 //huangzitao,liujialing,huangxiaoming, rv.set(count+"\t"+sb.substring(0, sb.length()-1)); context.write(key, rv); } } //main public static void main(String[] args) { System.setProperty("HADOOP_USER_NAME", "hadoop"); //將mapper reducer類進行一個封裝 封裝為一個任務----job(作業) //載入配置檔案 Configuration conf=new Configuration(); //啟動一個Job 建立一個job物件 try { Job job=Job.getInstance(conf); //設定這個job //設定整個job的主函式入口 job.setJarByClass(HomeWork03.class); //設定job的mappper的類 job.setMapperClass(MyMapper.class); //設定job的reducer的類 job.setReducerClass(MyReducer.class); //設定map輸出key value的型別 //指定了泛型 這裡為什麼還要設定一次 泛型的作用範圍 編譯的時候生效 執行的時候泛型會自動擦除 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); //設定reduce的輸出的k v型別 以下方法設定的是mr的最終輸出 job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); //指定需要統計的檔案的輸入路徑 FileInputFormat 檔案輸入類 Path inpath=new Path("hdfs://master:9000/stuout_01"); FileInputFormat.addInputPath(job, inpath); //指定輸出目錄 輸出路徑不能存在的 否則會報錯 預設輸出是覆蓋式的輸出 如果輸出目錄存在 有可能造成原始資料的丟失 Path outpath=new Path("hdfs://master:9000/stuout_03"); FileOutputFormat.setOutputPath(job, outpath); //提交job 執行這一句的時候 job才會提交 上面做的一系列的工作 都是設定job //job.submit(); job.waitForCompletion(true); } catch (Exception e) { e.printStackTrace(); } } }