Mapreduce自定義資料型別
Hadoop自帶的資料型別:
Intwritable,LongWritable,Text,xxWritable.
某些情況下:使用自定義的資料型別方便一些(類似java中的pojo)。
實現:
實現writableComparable介面即可。
場景例如:
成績表:由語文,數學,英文組成。
想讓按照總成績進行排名。如果成績相同,則按照語文,數學,英文來排序。
一、自定義ScoreWritable實現writableComparable介面:
package com.day07; import org.apache.hadoop.io.WritableComparable; import java.io.*; public class ScoreWritable implements WritableComparable<ScoreWritable> { int chinese; int math; int english; int sum;
public ScoreWritable() { } public ScoreWritable(int chinese, int math, int english) { this.chinese = chinese; this.math = math; this.english = english; this.sum=chinese+english+math; } @Override public String toString() { return "ScoreWritable{" + "chinese=" + chinese + ", math=" + math + ", english=" + english + ", sum=" + sum + '}'; } public int getChinese() { return chinese; } public void setChinese(int chinese) { this.chinese = chinese; } public int getMath() { return math; } public void setMath(int math) { this.math = math; } public int getEnglish() { return english; } public void setEnglish(int english) { this.english = english; } public int getSum() { return sum; } public void setSum(int sum) { this.sum = sum; } //比較 public int compareTo(ScoreWritable that) { //先比較總成績 if (this.sum>that.getSum()){ return -1; }else if(this.sum<that.getSum()){ return 1; }else{ if (this.chinese>that.getChinese()){ return -1; }else if (that.chinese<that.getChinese()){ return 1; }else { return -(this.math-that.getMath()); } } } //序列化--dataOutput(data流):可以自定義序列化物件,節省空間,hadoop用的就是這個流 public void write(DataOutput out) throws IOException { out.writeInt(chinese); out.writeInt(math); out.writeInt(english); out.writeInt(sum); } //反序列化 public void readFields(DataInput in) throws IOException { this.chinese = in.readInt(); this.math = in.readInt(); this.english = in.readInt(); this.sum = in.readInt(); } }
注意:
最好實現toString方法。
二、編寫ScoreJob類用於測試自定義的ScoreWritable
package com.day05; import com.day03.MaxSaleJob; import com.google.common.io.Resources; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class ScoreJob { public static class ScoreMapper extends Mapper<LongWritable,Text,ScoreWritable,NullWritable>{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //super.map(key, value, context); String[] grades = value.toString().split(","); ScoreWritable score = new ScoreWritable(Integer.parseInt(grades[0]), Integer.parseInt(grades[1]), Integer.parseInt(grades[2])); context.write(score,NullWritable.get()); } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration coreSiteConf = new Configuration(); coreSiteConf.addResource(Resources.getResource("core-site-local.xml")); //設定一個任務 Job job = Job.getInstance(coreSiteConf, "score"); //設定job的執行類 job.setJarByClass(ScoreJob.class); //mrdemo/target/mrdemo-1.0-SNAPSHOT.jar //job.setJar("mrdemo/target/mrdemo-1.0-SNAPSHOT.jar"); //設定Map和Reduce處理類 job.setMapperClass(ScoreMapper.class); //map輸出型別 job.setMapOutputKeyClass(ScoreWritable.class); job.setMapOutputValueClass(NullWritable.class); //設定job/reduce輸出型別 /*job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class);*/ //設定任務的輸入路徑 FileInputFormat.addInputPath(job, new Path("/score/")); FileSystem fileSystem = FileSystem.get(coreSiteConf); if(fileSystem.exists(new Path("/out/"))){
//刪除存在檔案,並遍歷刪除 fileSystem.delete(new Path("/out/"),true); }; FileOutputFormat.setOutputPath(job, new Path("/out/")); //執行任務 boolean flag = job.waitForCompletion(true); if(flag){ FSDataInputStream open = fileSystem.open(new Path("/out/part-r-00000")); byte[] buffer = new byte[1024]; IOUtils.readFully(open,buffer,0,open.available()); System.out.println(new String(buffer)); } } }
三、測試結果,類似於一下內容