mapreduce設置setMapOutputKeyClass與setMapOutputValueClass原因
阿新 • • 發佈:2018-04-14
HR 錯誤 了解 override getclass imp new mis AR
一般的mapreduce的wordcount程序如下:
public class WcMapper extends Mapper<LongWritable, Text, Text, LongWritable> { @Override protected void map(LongWritable key, Text value, Context ctx) throws IOException, InterruptedException { String[] words = value.toString().split(" "); for (int i = 0; i < words.length; i++) { ctx.write(new Text(words[i]), new LongWritable(1L)); } } }
public class WcReduer extends Reducer<Text, LongWritable, Text, LongWritable> { LongWritable count = new LongWritable(); @Override protected void reduce(Text key, Iterable<LongWritable> values, Context ctx) throws IOException, InterruptedException { Iterator<LongWritable> itr = values.iterator(); long sum = 0L; while (itr.hasNext()) { sum = sum + itr.next().get(); } count.set(sum); ctx.write(key, count); } }
驅動作業代碼:
public class JobClient { public static void main(String[] args) throws Exception { Job job = Job.getInstance(); job.setJarByClass(JobClient.class); job.setMapperClass(WcMapper.class); job.setReducerClass(WcReduer.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); job.setJobName("wordcount"); FileInputFormat.addInputPath(job, new Path("/daxin/hadoop-mapreduce/words")); FileOutputFormat.setOutputPath(job, new Path("/daxin/hadoop-mapreduce/wordcount-result")); job.waitForCompletion(true); } }
提交作業會報錯:
Error: java.io.IOException: Type mismatch in key from map: expected org.apache.hadoop.io.LongWritable, received org.apache.hadoop.io.Text at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:1072) at org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:715) at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89) at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.write(WrappedMapper.java:112) at com.daxin.blog.WcMapper.map(WcMapper.java:20) at com.daxin.blog.WcMapper.map(WcMapper.java:13) at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:146) at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:787) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341) at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:164) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657) at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
通過異常信息我們可以定位錯誤在源碼中的位置:org.apache.hadoop.mapred.MapTask.MapOutputBuffer#collect,具體關鍵源碼如下:
public synchronized void collect(K key, V value, final int partition ) throws IOException { reporter.progress(); if (key.getClass() != keyClass) { throw new IOException("Type mismatch in key from map: expected " + keyClass.getName() + ", received " + key.getClass().getName()); } if (value.getClass() != valClass) { throw new IOException("Type mismatch in value from map: expected " + valClass.getName() + ", received " + value.getClass().getName()); } ..... }
此處key.getClass可以確定是Text,需要確定keyClass是什麽類型。下面就將確定一下keyClass類型,可以發現keyClass賦值源碼:
keyClass = (Class<K>)job.getMapOutputKeyClass();
getMapOutputKeyClass源碼:
public Class<?> getMapOutputKeyClass() { Class<?> retv = getClass(JobContext.MAP_OUTPUT_KEY_CLASS, null, Object.class); if (retv == null) { retv = getOutputKeyClass(); } return retv; }
其中MAP_OUTPUT_KEY_CLASS則是獲取map輸出的key的類型,由於我們驅動代碼沒有設置因此此處得到的值為默認值null,接下在調用getOutputKeyClass方法:
public Class<?> getOutputKeyClass() { return getClass(JobContext.OUTPUT_KEY_CLASS, LongWritable.class, Object.class); }
public static final String OUTPUT_KEY_CLASS = "mapreduce.job.output.key.class";
通過獲取OUTPUT_KEY_CLASS的類型,OUTPUT_KEY_CLASS表示的是作業的key的輸出類型,但是由於我們沒有設置因此獲取默認值為LongWritable。但是實際上我們的MapTask輸出的key為Text,因而報如上類型不匹配錯誤。同理Map的value也有類似問題。為了解決此問題就需要顯式的設置MapTask的Key、Value輸出類型。代碼如下:
job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class);
mapreduce設置setMapOutputKeyClass與setMapOutputValueClass原因