1. 程式人生 > >MapReduce-join連接

MapReduce-join連接

nac key .com auto utf 此外 壓縮 mapred java

join連接

MapReduce能夠執行大型數據集間的連接(join)操作。連接操作的具體實現技術取決於數據集的規模及分區方式
連接操作如果由mapper執行,則稱為“map端連接”;如果由reducer執行,則稱為“reduce端連接”。

Map端連接

在兩個大規模輸入數據集之間的map端連接會在數據到達map函數之前就執行連接操作。為達到該目的,各map的輸入數據必須先分區並且以特定方式排序。各個輸入數據集被劃分成相同數量的分區,並且均按相同的鍵(連接鍵)排序。同一鍵的所有記錄均會放在同一分區之中。
Map端連接操作可以連接多個作業的輸出,只要這些作業的reducer數量相同、鍵相同並且輸出文件是不可切分的(例如,小於一個HDFS塊,或gzip壓縮)。

Reduce端連接

由於reduce端連接並不要求輸入數據集符合特定結構,因而reduce端連接比map端連接更為常用。但是,由於兩個數據集均需經過MapReduce的shuffle過程,所以reduce端連接的效率往往要低一些。基本思路是mapper為各個記錄標記源,並且使用連接件作為map輸出鍵,使鍵相同的記錄放在同一reducer中。
需要使用以下技術

1.多輸入

數據集的輸入源往往有多中格式,因此可以使用MultipleInputs類來方便地解析和標註各個源。

2.輔助排序

reducer將從兩個源中選出鍵相同的記錄且並不介意這些記錄是否已排好序。此外,為了更好的執行連接操作,先將某一個源的數據傳輸到reducer會非常重要。

舉個例子

現有氣象站文件及氣象數據文件,需要將兩個文件進行關聯

氣象站文件內容如下

00001,北京
00002,天津
00003,山東

氣象數據文件內容如下

00001,20180101,15
00001,20180102,16
00002,20180101,25
00002,20180102,26
00003,20180101,35
00003,20180102,36

 要求:輸出氣象站ID 氣象站名稱及氣象數據

代碼如下

1.JoinRecordWithStationName類
package com.zhen.mapreduce.join;

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;



/**
 * @author FengZhen
 * @date 2018年9月16日
 * 
 */
public class JoinRecordWithStationName extends Configured implements Tool{

	/**
	 * 在reduce端連接中,標記氣象站記錄的mapper
	 * @author FengZhen
	 *	00001,北京
		00002,天津
		00003,山東
	 */
	static class JoinStationMapper extends Mapper<LongWritable, Text, TextPair, Text>{
		private NcdcStationMetadataParser parser = new NcdcStationMetadataParser();
		@Override
		protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, TextPair, Text>.Context context)
				throws IOException, InterruptedException {
			if (parser.parse(value.toString())) {
				context.write(new TextPair(parser.getStationId(), "0"), new Text(parser.getStationName()));
			}
		}
	}
	
	/**
	 * 在reduce端連接中標記天氣記錄的mapper
	 * @author FengZhen
	 *	00001,20180101,15
		00001,20180102,16
		00002,20180101,25
		00002,20180102,26
		00003,20180101,35
		00003,20180102,36
	 */
	static class JoinRecordMapper extends Mapper<LongWritable, Text, TextPair, Text> {
		private NcdcRecordParser parser = new NcdcRecordParser();
		@Override
		protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, TextPair, Text>.Context context)
				throws IOException, InterruptedException {
			parser.parse(value.toString());
			context.write(new TextPair(parser.getStationId(), "1"), value);
		}
	}
	
	/**
	 * reducer知道自己會先接收氣象站記錄。因此從中抽取出值,並將其作為後續每條輸出記錄的一部分寫到輸出文件。
	 * @author FengZhen
	 *
	 */
	static class JoinReducer extends Reducer<TextPair, Text, Text, Text> {
		@Override
		protected void reduce(TextPair key, Iterable<Text> values, Reducer<TextPair, Text, Text, Text>.Context context)
				throws IOException, InterruptedException {
			Iterator<Text> iterator = values.iterator();
			//取氣象站名
			Text stationName = new Text(iterator.next());
			while (iterator.hasNext()) {
				Text record = iterator.next();
				Text outValue = new Text(stationName.toString() + "\t" + record.toString());
				context.write(key.getFirst(), outValue);
			}
		}
	}
	
	static class KeyPartitioner extends Partitioner<TextPair, Text>{
		@Override
		public int getPartition(TextPair key, Text value, int numPartitions) {
			return (key.getFirst().hashCode() & Integer.MAX_VALUE) % numPartitions;
		}
	}
	
	public int run(String[] args) throws Exception {
		Job job = Job.getInstance(getConf());
		job.setJobName("JoinRecordWithStationName");
		job.setJarByClass(JoinRecordWithStationName.class);

		Path ncdcInputPath = new Path(args[0]);
		Path stationInputPath = new Path(args[1]);
		Path outputPath = new Path(args[2]);
		
		MultipleInputs.addInputPath(job, ncdcInputPath, TextInputFormat.class, JoinRecordMapper.class);
		MultipleInputs.addInputPath(job, stationInputPath, TextInputFormat.class, JoinStationMapper.class);
		FileOutputFormat.setOutputPath(job, outputPath);
		
		job.setPartitionerClass(KeyPartitioner.class);
		job.setGroupingComparatorClass(TextPair.FirstComparator.class);
		
		job.setMapOutputKeyClass(TextPair.class);
		
		job.setReducerClass(JoinReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		return job.waitForCompletion(true) ? 0 : 1;
	}

	public static void main(String[] args)  {
		String[] params = new String[] {
				"hdfs://fz/user/hdfs/MapReduce/data/join/JoinRecordWithStationName/input/record",
				"hdfs://fz/user/hdfs/MapReduce/data/join/JoinRecordWithStationName/input/station",
				"hdfs://fz/user/hdfs/MapReduce/data/join/JoinRecordWithStationName/output"};
		int exitCode = 0;
		try {
			exitCode = ToolRunner.run(new JoinRecordWithStationName(), params);
		} catch (Exception e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		System.exit(exitCode);
	}
	
}

2.NcdcRecordParser類

package com.zhen.mapreduce.join;

import java.io.Serializable;

/**
 * @author FengZhen
 * @date 2018年9月9日
 * 解析天氣數據
 */
public class NcdcRecordParser implements Serializable{

	private static final long serialVersionUID = 1L;

	/**
	 * 氣象臺ID
	 */
	private String stationId;
	/**
	 * 時間
	 */
	private long timeStamp;
	/**
	 * 氣溫
	 */
	private Integer temperature;
	
	/**
	 * 解析
	 * @param value
	 */
	public void parse(String value) {
		String[] values = value.split(",");
		if (values.length >= 3) {
			stationId = values[0];
			timeStamp = Long.parseLong(values[1]);
			temperature = Integer.valueOf(values[2]);
		}
	}
	
	/**
	 * 校驗是否合格
	 * @return
	 */
	public boolean isValidTemperature() {
		return null != temperature;
	}

	public String getStationId() {
		return stationId;
	}

	public void setStationId(String stationId) {
		this.stationId = stationId;
	}

	public long getTimeStamp() {
		return timeStamp;
	}

	public void setTimeStamp(long timeStamp) {
		this.timeStamp = timeStamp;
	}

	public Integer getTemperature() {
		return temperature;
	}

	public void setTemperature(Integer temperature) {
		this.temperature = temperature;
	}
	
}

 3.NcdcStationMetadataParser類

package com.zhen.mapreduce.join;

import java.io.Serializable;

/**
 * @author FengZhen
 * @date 2018年9月9日
 * 解析氣象臺數據
 */
public class NcdcStationMetadataParser implements Serializable{

	private static final long serialVersionUID = 1L;

	/**
	 * 氣象臺ID
	 */
	private String stationId;
	/**
	 * 氣象臺名稱
	 */
	private String stationName;
	
	/**
	 * 解析
	 * @param value
	 */
	public boolean parse(String value) {
		String[] values = value.split(",");
		if (values.length >= 2) {
			stationId = values[0];
			stationName = values[1];
			return true;
		}
		return false;
	}

	public String getStationId() {
		return stationId;
	}

	public void setStationId(String stationId) {
		this.stationId = stationId;
	}

	public String getStationName() {
		return stationName;
	}

	public void setStationName(String stationName) {
		this.stationName = stationName;
	}
}

 4.TextPair類

package com.zhen.mapreduce.join;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

/**
 * @author FengZhen
 * @date 2018年9月16日
 * 
 */
public class TextPair implements WritableComparable<TextPair>{

	private Text first;
	private Text second;
	public TextPair() {
		set(new Text(), new Text());
	}
	public TextPair(String first, String second) {
		set(new Text(first), new Text(second));
	}
	public TextPair(Text first, Text second) {
		set(first, second);
	}
	public void set(Text first, Text second) {
		this.first = first;
		this.second = second;
	}
	
	public void write(DataOutput out) throws IOException {
		first.write(out);
		second.write(out);
	}

	public void readFields(DataInput in) throws IOException {
		first.readFields(in);
		second.readFields(in);
	}

	@Override
	public int hashCode() {
		return first.hashCode() * 163 + second.hashCode();
	}
	
	@Override
	public boolean equals(Object obj) {
		if (obj instanceof TextPair) {
			TextPair textPair = (TextPair) obj;
			return first.equals(textPair.first) && second.equals(textPair.second);
		}
		return false;
	}
	
	public int compareTo(TextPair o) {
		int cmp = first.compareTo(o.first);
		if (cmp != 0) {
			return cmp;
		}
		return second.compareTo(o.second);
	}
	
	public Text getFirst() {
		return first;
	}
	public void setFirst(Text first) {
		this.first = first;
	}
	public Text getSecond() {
		return second;
	}
	public void setSecond(Text second) {
		this.second = second;
	}
	@Override
	public String toString() {
		return first + "\t" + second;
	}
	
	/**
	 * 比較兩個int值大小
	 * 降序
	 * @param a
	 * @param b
	 * @return
	 */
	public static int compare(Text a, Text b) {
		return a.compareTo(b);
	}
	
	static class FirstComparator extends WritableComparator{
		protected FirstComparator() {
			super(TextPair.class, true);
		}
		@Override
		public int compare(WritableComparable a, WritableComparable b) {
			TextPair ip1 = (TextPair) a;
			TextPair ip2 = (TextPair) b;
			return TextPair.compare(ip1.getFirst(), ip2.getFirst());
		}
	}
	
}

 打jar包,上傳並執行

scp /Users/FengZhen/Desktop/Hadoop/file/JoinRecordWithStationName.jar [email protected]:/usr/local/test/mr
hadoop jar JoinRecordWithStationName.jar com.zhen.mapreduce.join.JoinRecordWithStationName

 結果如下

00001	北京	00001,20180102,16
00001	北京	00001,20180101,15
00002	天津	00002,20180102,26
00002	天津	00002,20180101,25
00003	山東	00003,20180102,36
00003	山東	00003,20180101,35

 

MapReduce-join連接