MapReduce-join連接
join連接
MapReduce能夠執行大型數據集間的連接(join)操作。連接操作的具體實現技術取決於數據集的規模及分區方式
連接操作如果由mapper執行,則稱為“map端連接”;如果由reducer執行,則稱為“reduce端連接”。
Map端連接
在兩個大規模輸入數據集之間的map端連接會在數據到達map函數之前就執行連接操作。為達到該目的,各map的輸入數據必須先分區並且以特定方式排序。各個輸入數據集被劃分成相同數量的分區,並且均按相同的鍵(連接鍵)排序。同一鍵的所有記錄均會放在同一分區之中。
Map端連接操作可以連接多個作業的輸出,只要這些作業的reducer數量相同、鍵相同並且輸出文件是不可切分的(例如,小於一個HDFS塊,或gzip壓縮)。
Reduce端連接
由於reduce端連接並不要求輸入數據集符合特定結構,因而reduce端連接比map端連接更為常用。但是,由於兩個數據集均需經過MapReduce的shuffle過程,所以reduce端連接的效率往往要低一些。基本思路是mapper為各個記錄標記源,並且使用連接件作為map輸出鍵,使鍵相同的記錄放在同一reducer中。
需要使用以下技術
1.多輸入
數據集的輸入源往往有多中格式,因此可以使用MultipleInputs類來方便地解析和標註各個源。
2.輔助排序
reducer將從兩個源中選出鍵相同的記錄且並不介意這些記錄是否已排好序。此外,為了更好的執行連接操作,先將某一個源的數據傳輸到reducer會非常重要。
舉個例子
現有氣象站文件及氣象數據文件,需要將兩個文件進行關聯
氣象站文件內容如下
00001,北京 00002,天津 00003,山東
氣象數據文件內容如下
00001,20180101,15 00001,20180102,16 00002,20180101,25 00002,20180102,26 00003,20180101,35 00003,20180102,36
要求:輸出氣象站ID 氣象站名稱及氣象數據
代碼如下
1.JoinRecordWithStationName類
package com.zhen.mapreduce.join; import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; /** * @author FengZhen * @date 2018年9月16日 * */ public class JoinRecordWithStationName extends Configured implements Tool{ /** * 在reduce端連接中,標記氣象站記錄的mapper * @author FengZhen * 00001,北京 00002,天津 00003,山東 */ static class JoinStationMapper extends Mapper<LongWritable, Text, TextPair, Text>{ private NcdcStationMetadataParser parser = new NcdcStationMetadataParser(); @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, TextPair, Text>.Context context) throws IOException, InterruptedException { if (parser.parse(value.toString())) { context.write(new TextPair(parser.getStationId(), "0"), new Text(parser.getStationName())); } } } /** * 在reduce端連接中標記天氣記錄的mapper * @author FengZhen * 00001,20180101,15 00001,20180102,16 00002,20180101,25 00002,20180102,26 00003,20180101,35 00003,20180102,36 */ static class JoinRecordMapper extends Mapper<LongWritable, Text, TextPair, Text> { private NcdcRecordParser parser = new NcdcRecordParser(); @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, TextPair, Text>.Context context) throws IOException, InterruptedException { parser.parse(value.toString()); context.write(new TextPair(parser.getStationId(), "1"), value); } } /** * reducer知道自己會先接收氣象站記錄。因此從中抽取出值,並將其作為後續每條輸出記錄的一部分寫到輸出文件。 * @author FengZhen * */ static class JoinReducer extends Reducer<TextPair, Text, Text, Text> { @Override protected void reduce(TextPair key, Iterable<Text> values, Reducer<TextPair, Text, Text, Text>.Context context) throws IOException, InterruptedException { Iterator<Text> iterator = values.iterator(); //取氣象站名 Text stationName = new Text(iterator.next()); while (iterator.hasNext()) { Text record = iterator.next(); Text outValue = new Text(stationName.toString() + "\t" + record.toString()); context.write(key.getFirst(), outValue); } } } static class KeyPartitioner extends Partitioner<TextPair, Text>{ @Override public int getPartition(TextPair key, Text value, int numPartitions) { return (key.getFirst().hashCode() & Integer.MAX_VALUE) % numPartitions; } } public int run(String[] args) throws Exception { Job job = Job.getInstance(getConf()); job.setJobName("JoinRecordWithStationName"); job.setJarByClass(JoinRecordWithStationName.class); Path ncdcInputPath = new Path(args[0]); Path stationInputPath = new Path(args[1]); Path outputPath = new Path(args[2]); MultipleInputs.addInputPath(job, ncdcInputPath, TextInputFormat.class, JoinRecordMapper.class); MultipleInputs.addInputPath(job, stationInputPath, TextInputFormat.class, JoinStationMapper.class); FileOutputFormat.setOutputPath(job, outputPath); job.setPartitionerClass(KeyPartitioner.class); job.setGroupingComparatorClass(TextPair.FirstComparator.class); job.setMapOutputKeyClass(TextPair.class); job.setReducerClass(JoinReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) { String[] params = new String[] { "hdfs://fz/user/hdfs/MapReduce/data/join/JoinRecordWithStationName/input/record", "hdfs://fz/user/hdfs/MapReduce/data/join/JoinRecordWithStationName/input/station", "hdfs://fz/user/hdfs/MapReduce/data/join/JoinRecordWithStationName/output"}; int exitCode = 0; try { exitCode = ToolRunner.run(new JoinRecordWithStationName(), params); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } System.exit(exitCode); } }
2.NcdcRecordParser類
package com.zhen.mapreduce.join; import java.io.Serializable; /** * @author FengZhen * @date 2018年9月9日 * 解析天氣數據 */ public class NcdcRecordParser implements Serializable{ private static final long serialVersionUID = 1L; /** * 氣象臺ID */ private String stationId; /** * 時間 */ private long timeStamp; /** * 氣溫 */ private Integer temperature; /** * 解析 * @param value */ public void parse(String value) { String[] values = value.split(","); if (values.length >= 3) { stationId = values[0]; timeStamp = Long.parseLong(values[1]); temperature = Integer.valueOf(values[2]); } } /** * 校驗是否合格 * @return */ public boolean isValidTemperature() { return null != temperature; } public String getStationId() { return stationId; } public void setStationId(String stationId) { this.stationId = stationId; } public long getTimeStamp() { return timeStamp; } public void setTimeStamp(long timeStamp) { this.timeStamp = timeStamp; } public Integer getTemperature() { return temperature; } public void setTemperature(Integer temperature) { this.temperature = temperature; } }
3.NcdcStationMetadataParser類
package com.zhen.mapreduce.join; import java.io.Serializable; /** * @author FengZhen * @date 2018年9月9日 * 解析氣象臺數據 */ public class NcdcStationMetadataParser implements Serializable{ private static final long serialVersionUID = 1L; /** * 氣象臺ID */ private String stationId; /** * 氣象臺名稱 */ private String stationName; /** * 解析 * @param value */ public boolean parse(String value) { String[] values = value.split(","); if (values.length >= 2) { stationId = values[0]; stationName = values[1]; return true; } return false; } public String getStationId() { return stationId; } public void setStationId(String stationId) { this.stationId = stationId; } public String getStationName() { return stationName; } public void setStationName(String stationName) { this.stationName = stationName; } }
4.TextPair類
package com.zhen.mapreduce.join; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; /** * @author FengZhen * @date 2018年9月16日 * */ public class TextPair implements WritableComparable<TextPair>{ private Text first; private Text second; public TextPair() { set(new Text(), new Text()); } public TextPair(String first, String second) { set(new Text(first), new Text(second)); } public TextPair(Text first, Text second) { set(first, second); } public void set(Text first, Text second) { this.first = first; this.second = second; } public void write(DataOutput out) throws IOException { first.write(out); second.write(out); } public void readFields(DataInput in) throws IOException { first.readFields(in); second.readFields(in); } @Override public int hashCode() { return first.hashCode() * 163 + second.hashCode(); } @Override public boolean equals(Object obj) { if (obj instanceof TextPair) { TextPair textPair = (TextPair) obj; return first.equals(textPair.first) && second.equals(textPair.second); } return false; } public int compareTo(TextPair o) { int cmp = first.compareTo(o.first); if (cmp != 0) { return cmp; } return second.compareTo(o.second); } public Text getFirst() { return first; } public void setFirst(Text first) { this.first = first; } public Text getSecond() { return second; } public void setSecond(Text second) { this.second = second; } @Override public String toString() { return first + "\t" + second; } /** * 比較兩個int值大小 * 降序 * @param a * @param b * @return */ public static int compare(Text a, Text b) { return a.compareTo(b); } static class FirstComparator extends WritableComparator{ protected FirstComparator() { super(TextPair.class, true); } @Override public int compare(WritableComparable a, WritableComparable b) { TextPair ip1 = (TextPair) a; TextPair ip2 = (TextPair) b; return TextPair.compare(ip1.getFirst(), ip2.getFirst()); } } }
打jar包,上傳並執行
scp /Users/FengZhen/Desktop/Hadoop/file/JoinRecordWithStationName.jar [email protected]:/usr/local/test/mr hadoop jar JoinRecordWithStationName.jar com.zhen.mapreduce.join.JoinRecordWithStationName
結果如下
00001 北京 00001,20180102,16 00001 北京 00001,20180101,15 00002 天津 00002,20180102,26 00002 天津 00002,20180101,25 00003 山東 00003,20180102,36 00003 山東 00003,20180101,35
MapReduce-join連接