mapreduce 的二次排序
阿新 • • 發佈:2018-04-11
大數據 hadoop 二次排序 mapreduce
- 一: 理解二次排序的功能, 使用自己理解的方式表達(包括自定義數據類型,分區,分組,排序)
- 二: 編寫實現二次排序功能, 提供源碼文件。
- 三:理解mapreduce join 的幾種 方式,編碼實現reduce join,提供源代碼,說出思路。
一: 二次排序 使用自己理解的方式表達(包括自定義數據類型,分區,分組,排序)
1.1 二次排序的功能
1. 當客戶端提交一個作業的時候,hadoop 會開啟yarn 接受進行數據拷貝處理,之後交友有yarn 框架上的啟動服務resourcemanager 接收,同時指派任務給nomanager ,nodemanger 會調用開 applicationmaster 處理任務,同時在 container 分配好要處理任務環境的抽象,封裝了CPU、內存等多維資源以及環境變量、啟動命令等任務運行相關的信息.之後輸入數據,在輸入數據進行數據inputspilt分割,人很掉用mapper基類將數據分割成,key-values鍵值對之後調用map()方法,調用該方法後會對keys-values 對分割,之後經過shuffle 過程map 的輸出,就是reduce 端的輸入 經過reduce段數據即可輸出到hdfs 上面。 二次排序 就是首先按照第一字段排序,然後再對第一字段相同的行按照第二字段排序。 2. 在shuffle 過程中,會對數據進行分割(spilt),分區(partitioner),排序(sort),合並(combine),壓縮(compress),分組(group) 之後輸出到reduce端。
1.2 shuffle 對job 格式定義:
1) partitioner job.setPartitionerClass(FirstPartitioner.class); 2) sort job.setSortComparatorClass(cls); 3) combine job.setCombinerClass(cls); 4) compress set by configuration 5) group job.setGroupingComparatorClass(FirstGroupingComparator.class);
二: 編寫實現二次排序功能, 提供源碼文件。
2.1 二次排序格式要求
1. 利用mapreduce 默認會對key 進行排序的方法對job 進行第一次排序
2. 把key和需要排序的第二個字段進行組合
2.2 二次排序Java的代碼
SecoundarySortMapReduce.java package org.apache.hadoop.studyhadoop.sort; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; /** * * @author zhangyy * */ public class SecondarySortMapReduce extends Configured implements Tool{ // step 1: mapper class /** * public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> */ public static class SecondarySortMapper extends // Mapper<LongWritable,Text,PairWritable,IntWritable>{ private PairWritable mapOutputKey = new PairWritable() ; private IntWritable mapOutputValue = new IntWritable() ; @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // line value String lineValue = value.toString(); // split String[] strs = lineValue.split(",") ; // invalidate if(2 != strs.length){ return ; } // set map output key and value mapOutputKey.set(strs[0], Integer.valueOf(strs[1])); mapOutputValue.set(Integer.valueOf(strs[1])); // output context.write(mapOutputKey, mapOutputValue); } } // step 2: reducer class /** * public class Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> */ public static class SecondarySortReducer extends // Reducer<PairWritable,IntWritable,Text,IntWritable>{ private Text outputKey = new Text() ; @Override public void reduce(PairWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { // set output key outputKey.set(key.getFirst()); // iterator for(IntWritable value : values){ // output context.write(outputKey, value); } } } // step 3: driver public int run(String[] args) throws Exception { // 1: get configuration Configuration configuration = super.getConf() ; // 2: create job Job job = Job.getInstance(// configuration, // this.getClass().getSimpleName()// ); job.setJarByClass(this.getClass()); // 3: set job // input -> map -> reduce -> output // 3.1: input Path inPath = new Path(args[0]) ; FileInputFormat.addInputPath(job, inPath); // 3.2: mapper job.setMapperClass(SecondarySortMapper.class); job.setMapOutputKeyClass(PairWritable.class); job.setMapOutputValueClass(IntWritable.class); // ===========================Shuffle====================================== // 1) partitioner job.setPartitionerClass(FirstPartitioner.class); // 2) sort // job.setSortComparatorClass(cls); // 3) combine // job.setCombinerClass(cls); // 4) compress // set by configuration // 5) group job.setGroupingComparatorClass(FirstGroupingComparator.class); // ===========================Shuffle====================================== // 3.3: reducer job.setReducerClass(SecondarySortReducer.class); job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(IntWritable.class); // set reducer number job.setNumReduceTasks(2); // 3.4: output Path outPath = new Path(args[1]); FileOutputFormat.setOutputPath(job, outPath); // 4: submit job boolean isSuccess = job.waitForCompletion(true); return isSuccess ? 0 : 1 ; } public static void main(String[] args) throws Exception { args = new String[]{ "hdfs://namenode01.hadoop.com:8020/input/sort" ,// "hdfs://namenode01.hadoop.com:8020/output" }; // create configuration Configuration configuration = new Configuration(); // run job int status = ToolRunner.run(// configuration, // new SecondarySortMapReduce(), // args ) ; // exit program System.exit(status); } }
PairWritable.java
package org.apache.hadoop.studyhadoop.sort;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
public class PairWritable implements WritableComparable<PairWritable> {
private String first;
private int second;
public PairWritable() {
}
public PairWritable(String first, int second) {
this.set(first, second);
}
public void set(String first, int second) {
this.first = first;
this.setSecond(second);
}
public String getFirst() {
return first;
}
public void setFirst(String first) {
this.first = first;
}
public int getSecond() {
return second - Integer.MAX_VALUE;
}
public void setSecond(int second) {
this.second = second + Integer.MAX_VALUE;
}
public void write(DataOutput out) throws IOException {
out.writeUTF(first);
out.writeInt(second);
}
public void readFields(DataInput in) throws IOException {
this.first = in.readUTF();
this.second = in.readInt();
}
public int compareTo(PairWritable o) {
// compare first
int comp =this.first.compareTo(o.getFirst()) ;
// eqauls
if(0 != comp){
return comp ;
}
// compare
return Integer.valueOf(this.getSecond()).compareTo(Integer.valueOf(o.getSecond())) ;
}
}
FirstPartitioner.java
package org.apache.hadoop.studyhadoop.sort;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Partitioner;
public class FirstPartitioner extends Partitioner<PairWritable,IntWritable> {
@Override
public int getPartition(PairWritable key, IntWritable value,
int numPartitions) {
return (key.getFirst().hashCode() & Integer.MAX_VALUE) % numPartitions;
}
}
FirstGroupingComparator.java
package org.apache.hadoop.studyhadoop.sort;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.WritableComparator;
public class FirstGroupingComparator implements RawComparator<PairWritable> {
// object compare
public int compare(PairWritable o1, PairWritable o2) {
return o1.getFirst().compareTo(o2.getFirst());
}
// bytes compare
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
return WritableComparator.compareBytes(b1, 0, l1 - 4, b2, 0, l2 - 4);
}
}
2.3 輸出測試
上傳數據處理:
hdfs dfs -put sort /input
運行輸出:
三:理解mapreduce join 的幾種 方式,編碼實現reduce join,提供源代碼,說出思路。
3.1 mapreduce join 有三種:
3.1.1 map 的端的join
map階段不能獲取所有需要的join字段,即:同一個key對應的字段可能位於不同map中。Reduce side join是非常低效的,因為shuffle階段要進行大量的數據傳輸。
Map side join是針對以下場景進行的優化:兩個待連接表中,有一個表非常大,而另一個表非常小,以至於小表可以直接存放到內存中。這樣,我們可以將小表復制多份,讓每個map task內存中存在一份(比如存放到hash table中),然後只掃描大表:對於大表中的每一條記錄key/value,在hash table中查找是否有相同的key的記錄,如果有,則連接後輸出即可。
為了支持文件的復制,Hadoop提供了一個類DistributedCache 去實現。
3.1.2 reduce 的端的join
在map階段,map函數同時讀取兩個文件File1和File2,為了區分兩種來源的key/value數據對,對每條數據打一個標簽(tag),比如:tag=0表示來自文件File1,tag=2表示來自文件File2。即:map階段的主要任務是對不同文件中的數據打標簽。
在reduce階段,reduce函數獲取key相同的來自File1和File2文件的value list, 然後對於同一個key,對File1和File2中的數據進行join(笛卡爾乘積)。即:reduce階段進行實際的連接操作
3.1.3 SemiJoin
SemiJoin,也叫半連接,是從分布式數據庫中借鑒過來的方法。它的產生動機是:對於reduce side join,跨機器的數據傳輸量非常大,這成了join操作的一個瓶頸,如果能夠在map端過濾掉不會參加join操作的數據,則可以大大節省網絡IO。
實現方法很簡單:選取一個小表,假設是File1,將其參與join的key抽取出來,保存到文件File3中,File3文件一般很小,可以放到內存中。在map階段,使用DistributedCache將File3復制到各個TaskTracker上,然後將File2中不在File3中的key對應的記錄過濾掉,剩下的reduce階段的工作與reduce side join相同
3.2 編程代碼:
DataJoinMapReduce.java
DataJoinMapReduce.java
package org.apache.hadoop.studyhadoop.join;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
*
* @author zhangyy
*
*/
public class DataJoinMapReduce extends Configured implements Tool {
// step 1 : mapper
/**
* public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
*/
public static class WordCountMapper extends //
Mapper<LongWritable, Text, LongWritable, DataJoinWritable> {
private LongWritable mapOutputKey = new LongWritable();
private DataJoinWritable mapOutputValue = new DataJoinWritable();
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// split
String[] strs = value.toString().split(",");
// invalidate
if ((3 != strs.length) && (4 != strs.length)) {
return;
}
// set mapoutput key
Long cid = Long.valueOf(strs[0]);
mapOutputKey.set(cid);
// set name
String name = strs[1];
// customer
if (3 == strs.length) {
String phone = strs[2];
mapOutputValue.set("customer", name + "," + phone);
}
// order
if (4 == strs.length) {
String price = strs[2];
String date = strs[3];
mapOutputValue.set("order", name + "," + price + "," + date);
}
context.write(mapOutputKey, mapOutputValue);
}
}
// step 2 : reducer
public static class WordCountReducer extends //
Reducer<LongWritable, DataJoinWritable, NullWritable, Text> {
private Text outputValue = new Text();
@Override
public void reduce(LongWritable key, Iterable<DataJoinWritable> values,
Context context) throws IOException, InterruptedException {
String customerInfo = new String();
List<String> orderList = new ArrayList<String>();
for (DataJoinWritable value : values) {
if ("customer".equals(value.getTag())) {
customerInfo = value.getData();
} else if ("order".equals(value.getTag())) {
orderList.add(value.getData());
}
}
for (String order : orderList) {
outputValue.set(key.toString() + "," + customerInfo + ","
+ order);
context.write(NullWritable.get(), outputValue);
}
}
}
// step 3 : job
public int run(String[] args) throws Exception {
// 1 : get configuration
Configuration configuration = super.getConf();
// 2 : create job
Job job = Job.getInstance(//
configuration,//
this.getClass().getSimpleName());
job.setJarByClass(DataJoinMapReduce.class);
// job.setNumReduceTasks(tasks);
// 3 : set job
// input --> map --> reduce --> output
// 3.1 : input
Path inPath = new Path(args[0]);
FileInputFormat.addInputPath(job, inPath);
// 3.2 : mapper
job.setMapperClass(WordCountMapper.class);
// TODO
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(DataJoinWritable.class);
// ====================shuffle==========================
// 1: partition
// job.setPartitionerClass(cls);
// 2: sort
// job.setSortComparatorClass(cls);
// 3: combine
// job.setCombinerClass(cls);
// 4: compress
// set by configuration
// 5 : group
// job.setGroupingComparatorClass(cls);
// ====================shuffle==========================
// 3.3 : reducer
job.setReducerClass(WordCountReducer.class);
// TODO
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
// 3.4 : output
Path outPath = new Path(args[1]);
FileOutputFormat.setOutputPath(job, outPath);
// 4 : submit job
boolean isSuccess = job.waitForCompletion(true);
return isSuccess ? 0 : 1;
}
public static void main(String[] args) throws Exception {
args = new String[] {
"hdfs://namenode01.hadoop.com:8020/join",
"hdfs://namenode01.hadoop.com:8020/output3/"
};
// get configuration
Configuration configuration = new Configuration();
// configuration.set(name, value);
// run job
int status = ToolRunner.run(//
configuration,//
new DataJoinMapReduce(),//
args);
// exit program
System.exit(status);
}
}
DataJoinWritable.java
package org.apache.hadoop.studyhadoop.join;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
public class DataJoinWritable implements Writable {
private String tag ;
private String data ;
public DataJoinWritable() {
}
public DataJoinWritable(String tag, String data) {
this.set(tag, data);
}
public void set(String tag, String data) {
this.setTag(tag);
this.setData(data);
}
public String getTag() {
return tag;
}
public void setTag(String tag) {
this.tag = tag;
}
public String getData() {
return data;
}
public void setData(String data) {
this.data = data;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((data == null) ? 0 : data.hashCode());
result = prime * result + ((tag == null) ? 0 : tag.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
DataJoinWritable other = (DataJoinWritable) obj;
if (data == null) {
if (other.data != null)
return false;
} else if (!data.equals(other.data))
return false;
if (tag == null) {
if (other.tag != null)
return false;
} else if (!tag.equals(other.tag))
return false;
return true;
}
public void write(DataOutput out) throws IOException {
out.writeUTF(this.getTag());
out.writeUTF(this.getData());
}
public void readFields(DataInput in) throws IOException {
this.setTag(in.readUTF());
this.setData(in.readUTF());
}
@Override
public String toString() {
return tag + "," + data ;
}
}
3.3 運行代碼測試
上傳文件:
hdfs dfs -put customers.txt /join
hdfs dfs -put orders.txt /join
運行結果:
mapreduce 的二次排序