大資料09--MapReduce習題~統計單詞數量
阿新 • • 發佈:2019-01-12
一、利用MapReduce計算單詞
WordcountDriver
** * 相當於一個yarn叢集的客戶端 * 需要在此封裝我們的mr程式的相關執行引數,指定jar包 * 最後提交給yarn */ public class WordcountDriver { public static void main(String[] args) throws Exception { System.setProperty("hadoop.home.dir", "e:/hadoop-2.8.3"); System.setProperty("HADOOP_USER_NAME", "root") ; if (args == null || args.length == 0) { return; } //該物件會預設讀取環境中的 hadoop 配置。當然,也可以通過 set 重新進行配置 Configuration conf = new Configuration(); //job 是 yarn 中任務的抽象。 Job job = Job.getInstance(conf); /*job.setJar("/home/hadoop/wc.jar");*/ //指定本程式的jar包所在的本地路徑 job.setJarByClass(WordcountDriver.class); //指定本業務job要使用的mapper/Reducer業務類 job.setMapperClass(WordcountMapper.class); job.setReducerClass(WordcountReducer.class); //指定mapper輸出資料的kv型別。需要和 Mapper 中泛型的型別保持一致 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); //指定最終輸出的資料的kv型別。這裡也是 Reduce 的 key,value型別。 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setNumReduceTasks(2); //指定job的輸入原始檔案所在目錄 FileInputFormat.setInputPaths(job, new Path(args[0])); //指定job的輸出結果所在目錄 FileOutputFormat.setOutputPath(job, new Path(args[1])); //改變切塊大小 FileInputFormat.setMinInputSplitSize(job, 301349250); //將job中配置的相關引數,以及job所用的java類所在的jar包,提交給yarn去執行 /*job.submit();*/ boolean res = job.waitForCompletion(true); System.exit(res?0:1); } //C:\Users\Administrator\hdfsDemo01\out\artifacts\hdfsDemo01_jar
WordcountMapper
package com.neusoft; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; /** * 該類做為一個 mapTask 使用。類聲名中所使用的四個泛型意義為別為: * * KEYIN: 預設情況下,是mr框架所讀到的一行文字的起始偏移量,Long, * 但是在hadoop中有自己的更精簡的序列化介面,所以不直接用Long,而用LongWritable * VALUEIN: 預設情況下,是mr框架所讀到的一行文字的內容,String,同上,用Text * KEYOUT: 是使用者自定義邏輯處理完成之後輸出資料中的key,在此處是單詞,String,同上,用Text * VALUEOUT:是使用者自定義邏輯處理完成之後輸出資料中的value,在此處是單詞次數,Integer,同上,用IntWritable */ public class WordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable> { /** * map階段的業務邏輯就寫在自定義的map()方法中 maptask會對每一行輸入資料呼叫一次我們自定義的map()方法 */ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 將maptask傳給我們的文字內容先轉換成String,空格將這一行切分成單詞 String[] splits = value.toString().split(" "); // 將單詞輸出為<單詞,1> for (String split:splits){ // 將單詞作為key,將次數1作為value,以便於後續的資料分發, // 可以根據單詞分發,以便於相同單詞會到相同的reduce task context.write(new Text(split),new IntWritable(1)); } } } // Mapperc處理完以後的結果<i,1><have,1><a,1><dream,1><a,1><dream,1>
WordcountReducer
package com.neusoft; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; /** * 與 Mapper 類似,繼承的同事聲名四個泛型。 * KEYIN, VALUEIN 對應 mapper輸出的KEYOUT,VALUEOUT型別對應 * KEYOUT, VALUEOUT 是自定義reduce邏輯處理結果的輸出資料型別。此處 keyOut 表示單個單詞,valueOut 對應的是總次數 */ public class WordcountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{ /** * Mapperc處理完以後的結果<i,1><have,1><a,1><dream,1><a,1><dream,1> * 入參key,是一組相同單詞kv對的key * 框架自動整合成(1-7):dream<list(1,1)> ====> dteam--key list(1,1)---list * reduce */ @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int count=0; for(IntWritable value : values){ count += value.get(); } //輸出每一個單詞出現的次數 context.write(key, new IntWritable(count)); } }