1. 程式人生 > >求該成績表當中出現相同分數的分數,還有次數,以及該分數的人數

求該成績表當中出現相同分數的分數,還有次數,以及該分數的人數

/**
 * 求該成績表當中出現相同分數的分數,還有次數,以及該分數的人數
 * @author Administrator
 *
 */
public class HomeWork03 {

	//key---科目+分數   Text   value----人名    Text
	static class MyMapper extends Mapper<LongWritable, Text, Text, Text>{
		Text mk=new Text();
		Text mv=new Text();
		@Override
		protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
				throws IOException, InterruptedException {
			//獲取每一行內容  進行切分
			String[] datas = value.toString().split(",");
			if(datas.length==3){
				//key   value封裝
				mk.set(datas[0]+"\t"+datas[2]);
				mv.set(datas[1]);
				context.write(mk, mv);
			}
			
		}
	}
	//相同科目  相同分數的  人數     都是誰     key:相同科目  相同分數的    value:人數     都是誰 
	static class MyReducer extends Reducer<Text, Text, Text, Text>{
		Text rv=new Text();
		@Override
		//key==computer	85	values<huangzitao,liujialing,huangxiaoming>
		protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context)
				throws IOException, InterruptedException {
			//迴圈遍歷values   注意:values只能迴圈遍歷一次  指標的操作   
			//計數變數
			int count=0;
			StringBuffer sb=new StringBuffer();
			for(Text v:values){
				count++;
				sb.append(v.toString()).append(",");
			}
			//傳送
			//huangzitao,liujialing,huangxiaoming,
			rv.set(count+"\t"+sb.substring(0, sb.length()-1));
			context.write(key, rv);
		}
	}
	
	//main
	public static void main(String[] args) {
		System.setProperty("HADOOP_USER_NAME", "hadoop");
		//將mapper  reducer類進行一個封裝  封裝為一個任務----job(作業)
		//載入配置檔案
		Configuration conf=new Configuration();
		//啟動一個Job  建立一個job物件
		try {
			Job job=Job.getInstance(conf);
			//設定這個job
			//設定整個job的主函式入口
			job.setJarByClass(HomeWork03.class);
			
			//設定job的mappper的類
			job.setMapperClass(MyMapper.class);
			
			//設定job的reducer的類
			job.setReducerClass(MyReducer.class);
			
			
			//設定map輸出key   value的型別
			//指定了泛型  這裡為什麼還要設定一次   泛型的作用範圍  編譯的時候生效   執行的時候泛型會自動擦除
			job.setMapOutputKeyClass(Text.class);
			job.setMapOutputValueClass(Text.class);
			
			//設定reduce的輸出的k   v型別  以下方法設定的是mr的最終輸出
			job.setOutputKeyClass(Text.class);
			job.setOutputValueClass(Text.class);
			
			
			//指定需要統計的檔案的輸入路徑  FileInputFormat  檔案輸入類
			Path inpath=new Path("hdfs://master:9000/stuout_01");
			FileInputFormat.addInputPath(job, inpath);
			
			//指定輸出目錄  輸出路徑不能存在的  否則會報錯  預設輸出是覆蓋式的輸出  如果輸出目錄存在  有可能造成原始資料的丟失
			Path outpath=new Path("hdfs://master:9000/stuout_03");
			FileOutputFormat.setOutputPath(job, outpath);
			
			//提交job  執行這一句的時候 job才會提交  上面做的一系列的工作  都是設定job
			//job.submit();
			job.waitForCompletion(true);
			
		} catch (Exception e) {
			
			e.printStackTrace();
		}
	}

}