17,reducejoin
阿新 • • 發佈:2018-12-27
需求:將對應商品表的id換成對應的名字
資料:
和上一節的order pd 一樣
Mapper:
public class JoinMapper extends Mapper<LongWritable, Text, Text,OrBeen >{ @Override protected void map(LongWritable key,Text value, Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub OrBeen or = new OrBeen(); Text k = new Text(); FileSplit inputSplit = (FileSplit)context.getInputSplit(); //獲得讀取的路徑名 String path = inputSplit.getPath().getName(); String line = value.toString(); String[] fields = line.split("\t"); if(path.equals("order.txt")) { or.setP_time(fields[0]); or.setP_id(fields[1]); or.setP_name(""); or.setP_num(Integer.parseInt(fields[2])); or.setP_flag(0); //0:訂單表 k.set(fields[1]); }else { or.setP_time(""); or.setP_id(fields[0]); or.setP_name(fields[1]); or.setP_num(0); or.setP_flag(1); k.set(fields[0]); } context.write(k, or); } }
在Map端,我們通過自己構造一個型別,然後包括商品表pd.txt的欄位,通過flag來判斷是哪個表
在傳遞給Reduce,我們通過k來分別指定一組資料,例如商品id為01的為一組
OrBeen:
public class OrBeen implements Writable{ @Override public String toString() { return p_time + "\t" + p_name +"\t" +p_num; } private String p_time ; private String p_id ; private int p_num; private String p_name; private int p_flag; public OrBeen(String p_time, String p_id, int p_num, String p_name, int p_flag) { super(); this.p_time = p_time; this.p_id = p_id; this.p_num = p_num; this.p_name = p_name; this.p_flag = p_flag; } public OrBeen() { super(); // TODO Auto-generated constructor stub } @Override public void readFields(DataInput in) throws IOException { // TODO Auto-generated method stub p_time = in.readUTF(); p_id = in.readUTF(); p_num = in.readInt(); p_name = in.readUTF(); p_flag = in.readInt(); } @Override public void write(DataOutput out) throws IOException { // TODO Auto-generated method stub out.writeUTF(p_time); out.writeUTF(p_id); out.writeInt(p_num); out.writeUTF(p_name); out.writeInt(p_flag); } public String getP_time() { return p_time; } public void setP_time(String p_time) { this.p_time = p_time; } public String getP_id() { return p_id; } public void setP_id(String i) { this.p_id = i; } public int getP_num() { return p_num; } public void setP_num(int p_num) { this.p_num = p_num; } public String getP_name() { return p_name; } public void setP_name(String p_name) { this.p_name = p_name; } public int getP_flag() { return p_flag; } public void setP_flag(int p_flag) { this.p_flag = p_flag; } }
Reduce:
public class JoinReducer extends Reducer<Text, OrBeen, OrBeen, NullWritable>{ @Override protected void reduce(Text key, Iterable<OrBeen> value, Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub //存放訂單表 ArrayList<OrBeen> or = new ArrayList<OrBeen>(); //存放商品表 OrBeen o = new OrBeen(); for(OrBeen orbeen:value) { if(orbeen.getP_flag()==0) { OrBeen A = new OrBeen(); try { //注意 BeanUtils,copyProperties只能copy同類型的,將orbeen先拷貝到A裡面,然後通過or.add新增到集合中去 BeanUtils.copyProperties(A, orbeen); } catch (IllegalAccessException | InvocationTargetException e) { // TODO Auto-generated catch block e.printStackTrace(); } or.add(A); }else { try { BeanUtils.copyProperties(o, orbeen); } catch (IllegalAccessException | InvocationTargetException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } //將訂單表對應的名字互換 for(OrBeen k :or) { k.setP_name(o.getP_name()); context.write(k, NullWritable.get()); } } }
Job:
public class JoinJobDrive {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// TODO Auto-generated method stub
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(JoinJobDrive.class);
job.setMapperClass(JoinMapper.class);
job.setReducerClass(JoinReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(OrBeen.class);
job.setOutputKeyClass(OrBeen.class);
job.setOutputValueClass(NullWritable.class);
FileInputFormat.setInputPaths(job, new Path("B:/筆記/資料素材/in"));
FileOutputFormat.setOutputPath(job, new Path("B:/筆記/資料素材/out"));
boolean b = job.waitForCompletion(true);
System.out.println(b);
}
}
以上就是reducejoin的案例