MapReduce之Reduce Join
阿新 • • 發佈:2018-12-15
一 介紹
Reduce Join其主要思想如下:
在map階段,map函式同時讀取兩個檔案File1和File2,為了區分兩種來源的key/value資料對,對每條資料打一個標籤(tag), 比如:tag=0表示來自檔案File1,tag=2表示來自檔案File2。即:map階段的主要任務是對不同檔案中的資料打標籤。在reduce階段,reduce函式獲取key相同的來自File1和File2檔案的value list, 然後對於同一個key,對File1和File2中的資料進行join(笛卡爾乘積),即:reduce階段進行實際的連線操作。
在這個例子中我們假設有兩個資料檔案如下:
儲存客戶資訊的檔案:customers.csv
1,stephaie leung,555-555-5555 2,edward kim,123-456-7890 3,jose madriz,281-330-8004 4,david storkk,408-55-0000
儲存訂單資訊的檔案:orders.csv
3,A,12.95,02-Jun-2008 1,B,88.25,20-May-2008 2,C,32.00,30-Nov-2007 3,D,25.02,22-Jan-2009
要求最終的輸出結果為:
1,Stephanie Leung,555-555-5555,B,88.25,20-May-2008 2,Edward Kim,123-456-7890,C,32.00,30-Nov-2007 3,Jose Madriz,281-330-8004,A,12.95,02-Jun-2008 3,Jose Madriz,281-330-8004,D,25.02,22-Jan-2009
二 程式碼部分
自定義資料型別:用於對不同檔案資料打標籤
1 package mapreduce.reducejoin; 2 3 import java.io.DataInput; 4 import java.io.DataOutput; 5 import java.io.IOException; 6 import org.apache.hadoop.io.Writable; 7 8 public class DataJoinWritable implements Writable { 9 10 // mark ,customer / order11 private String tag; 12 13 // info 14 private String data; 15 16 public DataJoinWritable() { 17 18 } 19 20 public DataJoinWritable(String tag, String data) { 21 this.set(tag, data); 22 } 23 24 public void set(String tag, String data) { 25 this.setTag(tag); 26 this.setData(data); 27 } 28 29 public String getTag() { 30 return tag; 31 } 32 33 public void setTag(String tag) { 34 this.tag = tag; 35 } 36 37 public String getData() { 38 return data; 39 } 40 41 public void setData(String data) { 42 this.data = data; 43 } 44 45 public void write(DataOutput out) throws IOException { 46 out.writeUTF(this.getTag()); 47 out.writeUTF(this.getData()); 48 } 49 50 public void readFields(DataInput in) throws IOException { 51 this.setTag(in.readUTF()); 52 this.setData(in.readUTF()); 53 } 54 55 @Override 56 public int hashCode() { 57 final int prime = 31; 58 int result = 1; 59 result = prime * result + ((data == null) ? 0 : data.hashCode()); 60 result = prime * result + ((tag == null) ? 0 : tag.hashCode()); 61 return result; 62 } 63 64 @Override 65 public boolean equals(Object obj) { 66 if (this == obj) 67 return true; 68 if (obj == null) 69 return false; 70 if (getClass() != obj.getClass()) 71 return false; 72 DataJoinWritable other = (DataJoinWritable) obj; 73 if (data == null) { 74 if (other.data != null) 75 return false; 76 } else if (!data.equals(other.data)) 77 return false; 78 if (tag == null) { 79 if (other.tag != null) 80 return false; 81 } else if (!tag.equals(other.tag)) 82 return false; 83 return true; 84 } 85 86 @Override 87 public String toString() { 88 return tag + "," + data; 89 } 90 }
MapReduce程式碼部分
1 package mapreduce.reducejoin; 2 3 import java.io.IOException; 4 import java.util.ArrayList; 5 import java.util.List; 6 import org.apache.hadoop.conf.Configuration; 7 import org.apache.hadoop.conf.Configured; 8 import org.apache.hadoop.fs.Path; 9 import org.apache.hadoop.io.LongWritable; 10 import org.apache.hadoop.io.NullWritable; 11 import org.apache.hadoop.io.Text; 12 import org.apache.hadoop.mapreduce.Job; 13 import org.apache.hadoop.mapreduce.Mapper; 14 import org.apache.hadoop.mapreduce.Reducer; 15 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 16 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 17 import org.apache.hadoop.util.Tool; 18 import org.apache.hadoop.util.ToolRunner; 19 20 public class DataJoinMapReduce extends Configured implements Tool { 21 22 // step 1: Mapper 23 public static class DataJoinMapper extends 24 Mapper<LongWritable, Text, LongWritable, DataJoinWritable> { 25 26 // map output key 27 private LongWritable mapOutputKey = new LongWritable(); 28 29 // map output value 30 private DataJoinWritable mapOutputValue = new DataJoinWritable(); 31 32 @Override 33 public void setup(Context context) throws IOException, 34 InterruptedException { 35 } 36 37 @Override 38 public void map(LongWritable key, Text value, Context context) 39 throws IOException, InterruptedException { 40 41 // line value 42 String lineValue = value.toString(); 43 44 // split 45 String[] vals = lineValue.split(","); 46 47 int length = vals.length; 48 49 if ((3 != length) && (4 != length)) { 50 return; 51 } 52 53 // get cid 54 Long cid = Long.valueOf(vals[0]); 55 56 // get name 57 String name = vals[1]; 58 59 // set customer 60 if (3 == length) { 61 String phone = vals[2]; 62 63 // set 64 mapOutputKey.set(cid); 65 mapOutputValue.set("customer", name + "," + phone); 66 } 67 68 // set order 69 if (4 == length) { 70 String price = vals[2]; 71 String date = vals[3]; 72 73 // set 74 mapOutputKey.set(cid); 75 mapOutputValue.set("order", name + "," + price + "," + date); 76 } 77 78 // output 79 context.write(mapOutputKey, mapOutputValue); 80 81 } 82 83 @Override 84 public void cleanup(Context context) throws IOException, 85 InterruptedException { 86 } 87 } 88 89 // step 2: Reducer 90 public static class DataJoinReducer extends 91 Reducer<LongWritable, DataJoinWritable, NullWritable, Text> { 92 93 private Text outputValue = new Text(); 94 95 @Override 96 protected void setup(Context context) throws IOException, 97 InterruptedException { 98 } 99 100 @Override 101 protected void reduce(LongWritable key, 102 Iterable<DataJoinWritable> values, Context context) 103 throws IOException, InterruptedException { 104 String customerInfo = null; 105 List<String> orderList = new ArrayList<String>(); 106 107 for (DataJoinWritable value : values) { 108 if ("customer".equals(value.getTag())) { 109 customerInfo = value.getData(); 110 } else if ("order".equals(value.getTag())) { 111 orderList.add(value.getData()); 112 } 113 } 114 115 // output 116 for (String order : orderList) { 117 118 // ser outout value 119 outputValue.set(key.get() + "," + customerInfo + "," + order); 120 121 // output 122 context.write(NullWritable.get(), outputValue); 123 } 124 } 125 126 @Override 127 protected void cleanup(Context context) throws IOException, 128 InterruptedException { 129 } 130 } 131 132 /** 133 * Execute the command with the given arguments. 134 * 135 * @param args 136 * command specific arguments. 137 * @return exit code. 138 * @throws Exception 139 */ 140 141 // step 3: Driver 142 public int run(String[] args) throws Exception { 143 144 Configuration configuration = this.getConf(); 145 146 // set job 147 Job job = Job.getInstance(configuration, this.getClass().getSimpleName()); 148 job.setJarByClass(DataJoinMapReduce.class); 149 150 // input 151 Path inpath = new Path(args[0]); 152 FileInputFormat.addInputPath(job, inpath); 153 154 // output 155 Path outPath = new Path(args[1]); 156 FileOutputFormat.setOutputPath(job, outPath); 157 158 // Mapper 159 job.setMapperClass(DataJoinMapper.class); 160 job.setMapOutputKeyClass(LongWritable.class); 161 job.setMapOutputValueClass(DataJoinWritable.class); 162 163 // Reducer 164 job.setReducerClass(DataJoinReducer.class); 165 job.setOutputKeyClass(NullWritable.class); 166 job.setOutputValueClass(Text.class); 167 168 // submit job -> YARN 169 boolean isSuccess = job.waitForCompletion(true); 170 return isSuccess ? 0 : 1; 171 172 } 173 174 public static void main(String[] args) throws Exception { 175 176 Configuration configuration = new Configuration(); 177 178 args = new String[] { 179 "hdfs://beifeng01:8020/user/beifeng01/mapreduce/input/reducejoin", 180 "hdfs://beifeng01:8020/user/beifeng01/mapreduce/output" }; 181 182 // run job 183 int status = ToolRunner.run(configuration, new DataJoinMapReduce(), 184 args); 185 186 // exit program 187 System.exit(status); 188 } 189 }
執行程式碼後查詢結果
[[email protected] hadoop-2.5.0-cdh5.3.6]$ bin/hdfs dfs -text /user/beifeng01/mapreduce/output/p* 1,stephaie leung,555-555-5555,B,88.25,20-May-2008 2,edward kim,123-456-7890,C,32.00,30-Nov-2007 3,jose madriz,281-330-8004,D,25.02,22-Jan-2009 3,jose madriz,281-330-8004,A,12.95,02-Jun-2008