MapReduce案例學習(9) 將全體員工按照總收入(工資+提成)從高到低排列,要求列出姓名及其總收入
阿新 • • 發佈:2018-12-25
map階段:將employee物件作為key,value直接設定為NullWritable
reduce階段:在對reduce的輸入引數value進行遍歷時,裡面的物件都是根據key自動排好序的,所以直接把相關資訊拼接輸出。
package week06; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; /* * Employee Object */ public class Employee implements WritableComparable<Employee> { private String empno; private String ename; private String job; private String mgr; private String hiredate; private int sal; private int comm; private String deptno; private String dname; private String loc; private boolean valid = true; public void set(Employee emp) { this.valid = emp.isValid(); this.empno = emp.getEmpno(); this.ename = emp.getEname(); this.job = emp.getJob(); this.mgr = emp.getMgr(); this.hiredate = emp.getHiredate(); this.sal = emp.getSal(); this.comm = emp.getComm(); this.deptno = emp.getDeptno(); this.dname = emp.getDname(); this.loc = emp.getLoc(); } //Emp_Test8使用這個compareTo方法 // public int compareTo(Employee bean) { // if (this.sal >= bean.getSal()) { // return -1; // } else { // return 1; // } // } //Emp_Test9使用這個compareTo方法 public int compareTo(Employee bean) { int total = this.sal + this.comm; int bean_total = bean.getSal() + bean.getComm(); if (total >= bean_total) { return -1; } else { return 1; } } public void write(DataOutput out) throws IOException { out.writeUTF(empno); out.writeUTF(ename); out.writeUTF(job); out.writeUTF(mgr); out.writeUTF(hiredate); out.writeInt(sal); out.writeInt(comm); out.writeUTF(deptno); out.writeUTF(dname); out.writeUTF(loc); out.writeBoolean(valid); } // 注意必須和write方法中的寫入順序和型別保持一致,否則會出錯。 public void readFields(DataInput in) throws IOException { this.empno = in.readUTF(); this.ename = in.readUTF(); this.job = in.readUTF(); this.mgr = in.readUTF(); this.hiredate = in.readUTF(); this.sal = in.readInt(); this.comm = in.readInt(); this.deptno = in.readUTF(); this.dname = in.readUTF(); this.loc = in.readUTF(); this.valid = in.readBoolean(); } /** * 格式化物件 */ public static Employee parser(String line) { // System.out.println(line); // "7369 SMITH CLERK 7902 1980-12-17 800 20" Employee emp = new Employee(); String[] arr = line.split("\t"); if (arr.length >= 8) { emp.setEmpno(arr[0]); emp.setEname(arr[1]); emp.setJob(arr[2]); emp.setMgr(arr[3]); emp.setHiredate(arr[4]); if ("".equals(arr[5]) || arr[5] == null) { emp.setSal(0); } else { try { emp.setSal(new Integer(arr[5])); } catch (Exception e) { emp.setSal(0); } } if ("".equals(arr[6]) || arr[6] == null) { emp.setComm(0); } else { try { emp.setComm(new Integer(arr[6])); } catch (Exception e) { emp.setComm(0); } } emp.setDeptno(arr[7]); if ("10".equals(emp.getDeptno())) { emp.setDname("ACCOUNTING"); emp.setLoc("NEW YORK"); } else if ("20".equals(emp.getDeptno())) { emp.setDname("RESEARCH"); emp.setLoc("DALLAS"); } else if ("30".equals(emp.getDeptno())) { emp.setDname("SALES"); emp.setLoc("CHICAGO"); } else if ("40".equals(emp.getDeptno())) { emp.setDname("OPERATIONS"); emp.setLoc("BOSTON"); } else { emp.setDname("other"); emp.setLoc("other"); } emp.setValid(true); } else { emp.setValid(false); } return emp; } public String getEmpno() { return empno; } public void setEmpno(String empno) { this.empno = empno; } public String getEname() { return ename; } public void setEname(String ename) { this.ename = ename; } public String getJob() { return job; } public void setJob(String job) { this.job = job; } public String getMgr() { return mgr; } public void setMgr(String mgr) { this.mgr = mgr; } public String getHiredate() { return hiredate; } public void setHiredate(String hiredate) { this.hiredate = hiredate; } public int getSal() { return sal; } public void setSal(int sal) { this.sal = sal; } public int getComm() { return comm; } public void setComm(int comm) { this.comm = comm; } public String getDeptno() { return deptno; } public void setDeptno(String deptno) { this.deptno = deptno; } public String getDname() { return dname; } public void setDname(String dname) { this.dname = dname; } public String getLoc() { return loc; } public void setLoc(String loc) { this.loc = loc; } public boolean isValid() { return valid; } public void setValid(boolean valid) { this.valid = valid; } @Override public String toString() { StringBuilder sb = new StringBuilder(); sb.append("valid:" + this.valid); sb.append("\nempno:" + this.empno); sb.append("\nename:" + this.ename); sb.append("\njob:" + this.job); sb.append("\nmgr:" + this.mgr); sb.append("\nhiredate:" + this.hiredate); sb.append("\nsal:" + this.sal); sb.append("\ncomm:" + this.comm); sb.append("\ndeptno:" + this.deptno); sb.append("\ndname:" + this.dname); sb.append("\nloc:" + this.loc); return sb.toString(); } public static void main(String args[]) { String line = "7698 BLAKE MANAGER 7839 1981-05-01 2850 30"; System.out.println(line); Employee emp = Employee.parser(line); System.out.println(emp); } }
package week06; import java.io.IOException; import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.Date; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; //9) 將全體員工按照總收入(工資+提成)從高到低排列,要求列出姓名及其總收入 public class Emp_Test9 extends Configured implements Tool { /** * 計數器 用於計數各種異常資料 */ enum Counter { LINESKIP, } /** * MAP任務 */ public static class Map extends Mapper<LongWritable, Text, Employee, NullWritable> { @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString();// 每行檔案 // 輸入檔案首行,不處理 if (line.contains("empno") == true) { return; } Employee emp = Employee.parser(line); if (emp.isValid()) { context.write(emp, NullWritable.get()); } else { context.getCounter(Counter.LINESKIP).increment(1); // 出錯令計數器+1 return; } } } /** * REDUCE */ public static class Reduce extends Reducer<Employee, NullWritable, Text, Text> { @Override public void reduce(Employee key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { int total = key.getSal() + key.getComm(); context.write(new Text(key.getEname()), new Text("--工資:" + key.getSal() + "--提成:" + key.getComm() + "--總收入:" + total)); } } public int run(String[] args) throws Exception { Configuration conf = getConf(); conf.set("mapred.job.tracker", "192.168.1.201:9001"); String[] ioArgs = new String[] { "emp_in", "emp_out_test9" }; String[] otherArgs = new GenericOptionsParser(conf, ioArgs) .getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: Test < input path > < output path >"); System.exit(2); } Job job = new Job(conf, "week06_test_09"); // 任務名 job.setJarByClass(Emp_Test9.class); // 指定Class FileInputFormat.addInputPath(job, new Path(otherArgs[0])); // 輸入路徑 FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); // 輸出路徑 job.setMapperClass(Map.class); // 呼叫上面Map類作為Map任務程式碼 job.setReducerClass(Reduce.class);// 呼叫上面Reduce類作為Reduce任務程式碼 job.setMapOutputKeyClass(Employee.class); // 指定map輸出的KEY的格式 job.setMapOutputValueClass(NullWritable.class);// 指定map輸出的VALUE的格式 job.setOutputKeyClass(Text.class); // 指定輸出的KEY的格式 job.setOutputValueClass(Text.class); // 指定輸出的VALUE的格式 job.waitForCompletion(true); // 輸出任務完成情況 System.out.println("任務名稱:" + job.getJobName()); System.out.println("任務成功:" + (job.isSuccessful() ? "是" : "否")); System.out.println("輸入行數:" + job.getCounters() .findCounter("org.apache.hadoop.mapred.Task$Counter", "MAP_INPUT_RECORDS").getValue()); System.out.println("輸出行數:" + job.getCounters() .findCounter("org.apache.hadoop.mapred.Task$Counter", "MAP_OUTPUT_RECORDS").getValue()); System.out.println("跳過的行:" + job.getCounters().findCounter(Counter.LINESKIP).getValue()); return job.isSuccessful() ? 0 : 1; } /** * 設定系統說明 設定MapReduce任務 */ public static void main(String[] args) throws Exception { // 記錄開始時間 DateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); Date start = new Date(); // 執行任務 int res = ToolRunner.run(new Configuration(), new Emp_Test9(), args); // 輸出任務耗時 Date end = new Date(); float time = (float) ((end.getTime() - start.getTime()) / 60000.0); System.out.println("任務開始:" + formatter.format(start)); System.out.println("任務結束:" + formatter.format(end)); System.out.println("任務耗時:" + String.valueOf(time) + " 分鐘"); System.exit(res); } }