結合案例講解MapReduce重要知識點 ----------- 自定義MapReduce資料型別(1)重寫Writable介面
阿新 • • 發佈:2018-12-20
重寫Writable介面
如下程式碼就是自定義mr資料型別,在wordcount類使用它。
WordCountWritable
import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.Writable; /** * 自定義wc的資料型別: * @author lyd */ public class WordCountWritable implements Writable{ public String word; public int counter; public WordCountWritable(){ } public WordCountWritable(String word, int counter) { this.word = word; this.counter = counter; } /** * 寫 */ @Override public void write(DataOutput out) throws IOException { out.writeUTF(word); out.writeInt(counter); } /** * 讀 */ @Override public void readFields(DataInput in) throws IOException { this.word = in.readUTF(); this.counter = in.readInt(); } /** * @return the word */ public String getWord() { return word; } /** * @param word the word to set */ public void setWord(String word) { this.word = word; } /** * @return the counter */ public int getCounter() { return counter; } /** * @param counter the counter to set */ public void setCounter(int counter) { this.counter = counter; } /* (non-Javadoc) * @see java.lang.Object#toString() */ @Override public String toString() { return word + ":" + counter; } }
WordCount
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import edu.qianfeng.mr.day01.WordCountWritable; public class WordCount { public static class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable>{ Text word = new Text(); IntWritable one = new IntWritable(1); @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { //獲取行資料 String line = value.toString(); //對資料進行拆分 [hello,qianfeng,hi,qianfeng] [hello,1603] [hi,hadoop,hi,spark] String [] words = line.split(" "); //迴圈陣列 for (String s : words) { word.set(s); context.write(word, one); } } } /** * 自定義reducer類 * @author lyd * */ public static class MyReducer extends Reducer<Text, IntWritable, WordCountWritable, NullWritable>{ @Override protected void reduce(Text key, Iterable<IntWritable> value,Context context) throws IOException, InterruptedException { //定義一個計數器 int counter = 0; //迴圈奇數 for (IntWritable i : value) { counter += i.get(); } //建立資料型別物件 WordCountWritable wc = new WordCountWritable(key.toString(), counter); //reduce階段的最終輸出 context.write(wc, null); } } /** * job的主入口 * @param args */ public static void main(String[] args) { try { //獲取配置物件 Configuration conf = new Configuration(); //建立job Job job = new Job(conf, "wordcount"); //為job設定執行主類 job.setJarByClass(WordCount.class); //設定map階段的屬性 job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(args[0])); //設定reduce階段的屬性 job.setReducerClass(MyReducer.class); job.setOutputKeyClass(WordCountWritable.class); job.setOutputValueClass(NullWritable.class); FileOutputFormat.setOutputPath(job, new Path(args[1])); //提交執行作業job 並列印資訊 int isok = job.waitForCompletion(true)?0:1; //退出job System.exit(isok); } catch (IOException | ClassNotFoundException | InterruptedException e) { e.printStackTrace(); } } }