MapReduce-自定義Key-二次排序
阿新 • • 發佈:2019-02-17
這個例項緊接上一個TopK的例項最後留下的一個問題的解決以及對新的一個技術點的說明,如何自定義輸入輸出的資料型別,這裡也大概引出mapreduce中二次排序的大致思想,但不著重說明二次排序,只是大致說明自定義輸入型別的基本步驟,因為做剛接觸二次排序的時候當時陷入一個思想上的誤區,為了把這個過程記錄下來,所以會在下一篇部落格中著重說明二次排序,為了說明問題我把他說成是“三次排序”可參見《MapReduce-三次排序-曾經想不通的二次排序》。
自定義Key的基本步驟:
所有自定義的key應該實現介面WritableComparable,因為是可序列的並且可比較的。並重載方法
//反序列化,從流中的二進位制轉換成自定義Key
public void readFields(DataInput in) throws IOException
//序列化,將自定義Key轉化成使用流傳送的二進位制
public void write(DataOutput out)
//key的比較,用於map階段和reduce階段的排序 以及用於reduce階段的grouping分組
public int compareTo(IntPair o)
另外新定義的類應該重寫的兩個方法
//The hashCode() method is used by the HashPartitioner (the default partitioner in MapReduce)
public int hashCode()
public boolean equals(Object right)
根據以上步驟下面是實現程式碼:
自定義Key的基本步驟:
所有自定義的key應該實現介面WritableComparable,因為是可序列的並且可比較的。並重載方法
//反序列化,從流中的二進位制轉換成自定義Key
public void readFields(DataInput in) throws IOException
//序列化,將自定義Key轉化成使用流傳送的二進位制
public void write(DataOutput out)
//key的比較,用於map階段和reduce階段的排序 以及用於reduce階段的grouping分組
public int compareTo(IntPair o)
另外新定義的類應該重寫的兩個方法
//The hashCode() method is used by the HashPartitioner (the default partitioner in MapReduce)
public int hashCode()
public boolean equals(Object right)
根據以上步驟下面是實現程式碼:
自定義Key:
map階段:import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; public class SecondSortClass implements WritableComparable<SecondSortClass> { /** * 自定義型別的中包含的變數,本例中的變數都是用於排序的變數 * 後序的事例中我們還將定義一些其它功能的變數 */ private int first; private String second; public SecondSortClass() {} public SecondSortClass(int first, String second) { this.first = first; this.second = second; } /** * 反序列化,從流中的二進位制轉換成自定義Key */ @Override public void readFields(DataInput input) throws IOException { this.first = input.readInt(); this.second = input.readUTF(); } /** * 序列化,將自定義Key轉化成使用流傳送的二進位制 */ @Override public void write(DataOutput output) throws IOException { output.writeInt(first); output.writeUTF(second); } @Override public int hashCode() { final int prime = 31; int result = 1; result = prime * result + first; result = prime * result + ((second == null) ? 0 : second.hashCode()); return result; } @Override public boolean equals(Object obj) { if (this == obj) return true; if (obj == null) return false; if (getClass() != obj.getClass()) return false; SecondSortClass other = (SecondSortClass) obj; if (first != other.first) return false; if (second == null) { if (other.second != null) return false; } else if (!second.equals(other.second)) return false; return true; } /** * 用於map階段和reduce階段的排序 以及用於reduce階段的grouping分組 */ @Override public int compareTo(SecondSortClass o) { if(this.first != o.getFirst()) { return -(this.first - o.getFirst()); } else if( !this.second.equals(o.getSecond())) { return -this.second.compareTo(o.getSecond()); } return 0; } public int getFirst() { return first; } public void setFirst(int first) { this.first = first; } public String getSecond() { return second; } public void setSecond(String second) { this.second = second; } }
reduce階段:import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class SecondMapper extends Mapper<LongWritable, Text, SecondSortClass, Text> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString().trim(); if(line.length() > 0) { String[] arr = line.split(","); if(arr.length == 3) { context.write(new SecondSortClass(Integer.valueOf(arr[2]),arr[1]), new Text(arr[1] + "," + arr[2])); } } } }
import java.io.IOException;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class SecondReducer extends Reducer<SecondSortClass, Text, NullWritable, Text> {
int len;
/**
* Map任務啟動的時候呼叫
*/
@Override
protected void setup( Context context)
throws IOException, InterruptedException {
/**
* 通過context獲取任務啟動時傳入的TopK的K值
*/
len = context.getConfiguration().getInt("K", 10);
}
@Override
protected void reduce(SecondSortClass key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
// for(Text val: values) {
// if(len <= 0) {
// break;
// }
// context.write(null, val);
// len --;
// }
if(len > 0) {
context.write(null, values.iterator().next());
len --;
}
}
}
啟動函式:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class JobMain {
public static void main(String[] args) throws Exception{
Configuration configuration = new Configuration();
/**
* 把傳入引數放入Configuration中,map或reduce中可以通過
* 獲取Configuration來獲取傳入的引數,這是hadoop傳入引數的
* 方式之一
*/
configuration.set("K", args[2]);
Job job = new Job(configuration, "third-sort-job");
job.setJarByClass(JobMain.class);
job.setMapperClass(SecondMapper.class);
job.setMapOutputKeyClass(SecondSortClass.class);
job.setMapOutputValueClass(Text.class);
job.setReducerClass(SecondReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
Path outputDir = new Path(args[1]);
FileSystem fs = FileSystem.get(configuration);
if(fs.exists(outputDir)) {
fs.delete(outputDir, true);
}
FileOutputFormat.setOutputPath(job, outputDir);
System.exit(job.waitForCompletion(true)? 0: 1);
}
}
執行命令:
./hadoop jar mr.jar com.seven.mapreduce.test1.JobMain /input/two /output/two14 3
執行資料:
uid,name,cost
1,mr1,3234
2,mr2,123
3,mr3,9877
4,mr4,348
5,mr5,12345
6,mr6,6646
7,mr7,98
8,mr8,12345
執行結果: