大數據筆記(九)——Mapreduce的高級特性(B)
阿新 • • 發佈:2018-03-04
try getc 引入 個數 turn tsa tasks throw ()
二.排序
對象排序
員工數據 Employee.java ----> 作為key2輸出
需求:按照部門和薪水升序排列
Employee.java
package mr.object; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; //?????: 7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30public class Employee implements WritableComparable<Employee>{ private int empno; private String ename; private String job; private int mgr; private String hiredate; private int sal; private int comm; private int deptno;// @Override // public int compareTo(Employee o) {// // 一個列的排序規則:按照員工的薪水排序 // if(this.sal >= o.getSal()){ // return 1; // }else{ // return -1; // } // } @Override public int compareTo(Employee o) { // 兩個列排序規則:部門 if(this.deptno > o.getDeptno()){ return 1; }elseif(this.deptno < o.getDeptno()){ return -1; } //薪水 if(this.sal >= o.getSal()){ return 1; }else{ return -1; } } @Override public String toString() { return "["+this.empno+"\t"+this.ename+"\t"+this.sal+"\t"+this.deptno+"]"; } @Override public void write(DataOutput output) throws IOException { output.writeInt(this.empno); output.writeUTF(this.ename); output.writeUTF(this.job); output.writeInt(this.mgr); output.writeUTF(this.hiredate); output.writeInt(this.sal); output.writeInt(this.comm); output.writeInt(this.deptno); } @Override public void readFields(DataInput input) throws IOException { this.empno = input.readInt(); this.ename = input.readUTF(); this.job = input.readUTF(); this.mgr = input.readInt(); this.hiredate = input.readUTF(); this.sal = input.readInt(); this.comm = input.readInt(); this.deptno = input.readInt(); } public int getEmpno() { return empno; } public void setEmpno(int empno) { this.empno = empno; } public String getEname() { return ename; } public void setEname(String ename) { this.ename = ename; } public String getJob() { return job; } public void setJob(String job) { this.job = job; } public int getMgr() { return mgr; } public void setMgr(int mgr) { this.mgr = mgr; } public String getHiredate() { return hiredate; } public void setHiredate(String hiredate) { this.hiredate = hiredate; } public int getSal() { return sal; } public void setSal(int sal) { this.sal = sal; } public int getComm() { return comm; } public void setComm(int comm) { this.comm = comm; } public int getDeptno() { return deptno; } public void setDeptno(int deptno) { this.deptno = deptno; } }
EmployeeSortMapper.java
package mr.object; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; // Key2 public class EmployeeSortMapper extends Mapper<LongWritable, Text, Employee, NullWritable> { @Override protected void map(LongWritable key1, Text value1,Context context) throws IOException, InterruptedException { // ?????7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30 String data = value1.toString(); //分詞 String[] words = data.split(","); //創建員工對象 Employee e = new Employee(); //員工號 e.setEmpno(Integer.parseInt(words[0])); //員工姓名 e.setEname(words[1]); //job e.setJob(words[2]); //經理號:註意 有些員工沒有經理 try{ e.setMgr(Integer.parseInt(words[3])); }catch(Exception ex){ //null e.setMgr(0); } //入職日期 e.setHiredate(words[4]); //薪水 e.setSal(Integer.parseInt(words[5])); //獎金 try{ e.setComm(Integer.parseInt(words[6])); }catch(Exception ex){ //無獎金 e.setComm(0); } //部門 e.setDeptno(Integer.parseInt(words[7])); //輸出key2 context.write(e, NullWritable.get()); } }
EmployeeSortMain.java
package mr.object; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class EmployeeSortMain { public static void main(String[] args) throws Exception { // job = map + reduce Job job = Job.getInstance(new Configuration()); //?任務入口 job.setJarByClass(EmployeeSortMain.class); job.setMapperClass(EmployeeSortMapper.class); job.setMapOutputKeyClass(Employee.class); job.setMapOutputValueClass(NullWritable.class); job.setReducerClass(EmployeeSortReducer.class); job.setOutputKeyClass(LongWritable.class); job.setOutputValueClass(Employee.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); //執行任務 job.waitForCompletion(true); } }
結果:
三.分區分區:Partition:
根據Map的輸出(k2 v2)進行分區
默認情況下,MapReduce只有一個分區(只有一個輸出文件)
作用:提高查詢的效率
建立分區:根據條件的不同
需求:按照員工的部門號進行分區,相同部門號的員工輸出到一個分區中
EmpPartionMapper.java
package demo.partion; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; //k2部門號 v2 員工對象 public class EmpPartionMapper extends Mapper<LongWritable, Text, LongWritable, Employee> { @Override protected void map(LongWritable key1, Text value1, Context context) throws IOException, InterruptedException { // ?????7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30 String data = value1.toString(); String[] words = data.split(","); Employee e = new Employee(); e.setEmpno(Integer.parseInt(words[0])); e.setEname(words[1]); e.setJob(words[2]); try{ e.setMgr(Integer.parseInt(words[3])); }catch(Exception ex){ //null e.setMgr(0); } e.setHiredate(words[4]); e.setSal(Integer.parseInt(words[5])); try{ e.setComm(Integer.parseInt(words[6])); }catch(Exception ex){ e.setComm(0); } e.setDeptno(Integer.parseInt(words[7])); //輸出 k2是部門號 v2是員工對象 context.write(new LongWritable(e.getDeptno()), e); } }
EmpPartionReducer.java
package demo.partion; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapreduce.Reducer; //把相同部門的員工輸出到HDFS K4: 部門號 v4: 員工對象 public class EmpPartionReducer extends Reducer<LongWritable, Employee, LongWritable, Employee>{ @Override protected void reduce(LongWritable k3, Iterable<Employee> v3, Context context) throws IOException, InterruptedException { for (Employee e : v3) { context.write(k3, e); } } }
MyEmployeePartitioner.java
package demo.partion; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapreduce.Partitioner; //分區規則:根據Map的輸出建立分區 k2 v2 public class MyEmployeePartitioner extends Partitioner<LongWritable, Employee>{ /* * numParts 分區個數 */ @Override public int getPartition(LongWritable k2, Employee v2, int numParts) { //分區規則 int deptno = v2.getDeptno(); if (deptno == 10) { //放入一號分區 return 1%numParts; }else if (deptno == 20) { //放入二號分區 return 2%numParts; }else { //放入0號分區 return 3%numParts; } } }
EmpPartitionMain.java
package demo.partion; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class EmpPartitionMain { public static void main(String[] args) throws Exception { Job job = Job.getInstance(new Configuration()); job.setJarByClass(EmpPartitionMain.class); job.setMapperClass(EmpPartionMapper.class); job.setMapOutputKeyClass(LongWritable.class); job.setMapOutputValueClass(Employee.class); //指定分區規則 job.setPartitionerClass(MyEmployeePartitioner.class); //指定分區的個數 job.setNumReduceTasks(3); job.setReducerClass(EmpPartionReducer.class); job.setOutputKeyClass(LongWritable.class); job.setOutputValueClass(Employee.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true); } }
結果:建立了三個分區
一號分區:
二號分區:
0號分區:
四.合並:Combiner
1、MapReduce的任務中,可以沒有Combiner
2、Combiner是一種特殊的Reducer,是在Mapper端先做一次Reducer,用來減少Map的輸出,從而提高的效率。
3、註意事項:
(1)有些情況,不能使用Combiner -----> 求平均值
(2)引入Combiner,不引人Combiner,一定不能改變原理的邏輯。(MapReduce編程案例:實現倒排索引)
WordCountMapper.java
package demo.combiner; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class WordCountMapper extends Mapper<LongWritable, Text, Text, LongWritable>{ @Override protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException { //取出數據: I love beijing String data = v1.toString(); //分詞 String[] words = data.split(" "); //輸出K2:單詞 V2:記一次數 for (String w : words) { context.write(new Text(w), new LongWritable(1)); } } }
WordCountReducer.java
package demo.combiner; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class WordCountReducer extends Reducer<Text, LongWritable, Text, LongWritable>{ @Override protected void reduce(Text k3, Iterable<LongWritable> v3, Context context) throws IOException, InterruptedException { long total = 0; for (LongWritable l : v3) { total = total + l.get(); } //輸出K4 V4 context.write(k3, new LongWritable(total)); } }
WordCountMain.java:增加Combiner
package demo.combiner; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class WordCountMain { public static void main(String[] args) throws Exception { Job job = Job.getInstance(new Configuration()); job.setJarByClass(WordCountMain.class); //Mapper job.setMapperClass(WordCountMapper.class); job.setMapOutputKeyClass(Text.class);//指定k2 job.setMapOutputValueClass(LongWritable.class);//指定v2 //Combiner job.setCombinerClass(WordCountReducer.class); //?reducer job.setReducerClass(WordCountReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); //?mapper/reducer路徑 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); //?執行任務 job.waitForCompletion(true); } }
大數據筆記(九)——Mapreduce的高級特性(B)