MapReduce測試資料計算補全
阿新 • • 發佈:2018-11-09
package xxx.hadoop;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import xxx.common.ConfigurationHadoop;
/* 構造測試資料:姓名、科目、得分、名次
* 張4 數學 96
* 張5 數學 95 5
* 張1 數學 100 1
* 張3 數學 98
* 張2 數學 99 2
* 需要把缺的名次補全,並按名次順序輸出
*/
public class FillAndSortTest {
public static class FillAndSortMapper extends
Mapper<LongWritable, Text, KeyWritable, ValueWritable> {
private KeyWritable keyWritable;
private ValueWritable valueWritable;
@Override
protected void setup(Context context) throws IOException, InterruptedException {
// InputSplit split = context.getInputSplit();
// String text = split.toString();
super.setup(context);
}
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
System.out.println(value.toString());
String line = value.toString();
String[] col = line.split("\t");
keyWritable = new KeyWritable(col[1], Integer.parseInt(col[2]));
if (col.length == 3) {
valueWritable = new ValueWritable(col[0], col[1], Integer.parseInt(col[2]), Integer.parseInt("0"));
} else {
valueWritable = new ValueWritable(col[0], col[1], Integer.parseInt(col[2]), Integer.parseInt(col[3]));
}
context.write(keyWritable, valueWritable);
}
@Override
protected void cleanup(Context context)
throws IOException, InterruptedException {
super.cleanup(context);
}
}
public static class FillAndSortReducer extends
Reducer<KeyWritable, ValueWritable, NullWritable, Text> {
protected void setup(Context context) throws IOException, InterruptedException {
super.setup(context);
}
public void reduce(KeyWritable key, Iterable<ValueWritable> values,
Context context) throws IOException, InterruptedException {
Iterator<ValueWritable> it = values.iterator();
// LinkedHashMultimap<Integer, String> noRank = LinkedHashMultimap.create();
// LinkedHashMultimap<Integer, Integer> haveRank = LinkedHashMultimap.create();
HashMap<Integer, String> noRank = new HashMap<Integer, String>();
HashMap<Integer, Integer> haveRank = new HashMap<Integer, Integer>();
while (it.hasNext()) {
ValueWritable value = it.next();
System.out.println(value.toString());
if (value.rank == 0) {
noRank.put(value.getScore(), value.getName() + "\t" + value.getSubject());
} else {
haveRank.put(value.getScore(), value.getRank());
}
}
Set<Integer> scores = noRank.keySet();
Set<Integer> scores2 = haveRank.keySet();
// 處理邏輯不嚴謹,僅為展示處理過程
for (Integer score : scores) {
Integer minScore = 0;
Integer newRank = 0;
for (Integer score2 : scores2) {
if (minScore == 0) {
minScore = Math.abs(score2 - score);
newRank = haveRank.get(score2);
newRank += score > score2 ? -1 : 1;
} else if (minScore > Math.abs(score2 - score)) {
minScore = Math.abs(score2 - score);
newRank = haveRank.get(score2);
newRank += score > score2 ? -1 : 1;
}
}
System.out.println(noRank.get(score) + "\t" + score + "\t" + newRank);
// 省略排序輸出
}
}
@Override
protected void cleanup(Context context)
throws IOException, InterruptedException {
super.cleanup(context);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = ConfigurationHadoop.getConfigurationHadoop();
String[] otherArgs = new String[] { "/user/hdfs/a/test",
"/user/hdfs/b" };
Job job = Job.getInstance(conf, "FillAndSortTest");
job.setJarByClass(MultipleOutPut.class);
/*
* 關掉 speculative execution功能。 speculative
* execution功能是指,假如Hadoop發現有些任務執行的比較慢
* ,那麼,它會在其他的節點上再執行一個同樣的任務。這兩個任務,哪個先完成就以哪個結果為準。
* 但Reduce任務需要將數值寫入到HDFS的檔案裡
* ,而且這個檔名是固定的,如果同時執行兩個以上的Reduce任務,會導致寫入出錯,所以要關閉這個功能。
*/
job.setSpeculativeExecution(false);
job.setMapSpeculativeExecution(false);
job.setReduceSpeculativeExecution(false);
job.setMapperClass(FillAndSortMapper.class);
job.setGroupingComparatorClass(KeyComparator.class);
job.setReducerClass(FillAndSortReducer.class);
job.setMapOutputKeyClass(KeyWritable.class);
job.setMapOutputValueClass(ValueWritable.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
// CombineFileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileSystem hdfs = FileSystem.get(conf);
if (hdfs.exists(new Path(otherArgs[1]))) {
hdfs.delete(new Path(otherArgs[1]), true);
}
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
// TextOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
public static class KeyWritable implements WritableComparable<KeyWritable>{
private String subject;
private Integer score;
public KeyWritable() {
}
public KeyWritable(String subject, Integer score) {
// super();
this.subject = subject;
this.score = score;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(subject);
out.writeInt(score);
}
@Override
public void readFields(DataInput in) throws IOException {
subject = in.readUTF();
score = in.readInt();
}
@Override
public int compareTo(KeyWritable o) {
return -1*this.toString().compareTo(o.toString());
}
@Override
public String toString() {
return "KeyWritable [subject=" + subject + ", score=" + score + "]";
}
public String constructGroup() {
return subject;
}
public Integer getScore() {
return score;
}
public String getSubject() {
return subject;
}
}
public static class KeyComparator extends WritableComparator {
public KeyComparator() {
super(KeyWritable.class,true);
}
@SuppressWarnings("rawtypes")
@Override
public int compare(WritableComparable a, WritableComparable b) {
KeyWritable key1 = (KeyWritable)a;
KeyWritable key2 = (KeyWritable)b;
return key1.constructGroup().compareTo(key2.constructGroup());
}
}
public static class ValueWritable implements Writable {
private String name;
private String subject;
private Integer score;
private Integer rank;
public ValueWritable() {
}
public ValueWritable(String name, String subject, Integer score,
Integer rank) {
this.name = name;
this.subject = subject;
this.score = score;
this.rank = rank;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(name);
out.writeUTF(subject);
out.writeInt(score);
out.writeInt(rank);
}
@Override
public void readFields(DataInput in) throws IOException {
name = in.readUTF();
subject = in.readUTF();
score = in.readInt();
rank = in.readInt();
}
@Override
public String toString() {
return "ValueWritable [name=" + name + ", subject=" + subject
+ ", score=" + score + ", rank=" + rank + "]";
}
public String getName() {
return name;
}
public String getSubject() {
return subject;
}
public Integer getScore() {
return score;
}
public Integer getRank() {
return rank;
}
}
}
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import xxx.common.ConfigurationHadoop;
/* 構造測試資料:姓名、科目、得分、名次
* 張4 數學 96
* 張5 數學 95 5
* 張1 數學 100 1
* 張3 數學 98
* 張2 數學 99 2
* 需要把缺的名次補全,並按名次順序輸出
*/
public class FillAndSortTest {
Mapper<LongWritable, Text, KeyWritable, ValueWritable> {
private KeyWritable keyWritable;
private ValueWritable valueWritable;
@Override
protected void setup(Context context) throws IOException, InterruptedException {
//
// String text = split.toString();
super.setup(context);
}
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
System.out.println(value.toString());
String line = value.toString();
keyWritable = new KeyWritable(col[1], Integer.parseInt(col[2]));
if (col.length == 3) {
valueWritable = new ValueWritable(col[0], col[1], Integer.parseInt(col[2]), Integer.parseInt("0"));
} else {
valueWritable = new ValueWritable(col[0], col[1], Integer.parseInt(col[2]), Integer.parseInt(col[3]));
}
context.write(keyWritable, valueWritable);
}
@Override
protected void cleanup(Context context)
throws IOException, InterruptedException {
super.cleanup(context);
}
}
public static class FillAndSortReducer extends
Reducer<KeyWritable, ValueWritable, NullWritable, Text> {
protected void setup(Context context) throws IOException, InterruptedException {
super.setup(context);
}
public void reduce(KeyWritable key, Iterable<ValueWritable> values,
Context context) throws IOException, InterruptedException {
Iterator<ValueWritable> it = values.iterator();
// LinkedHashMultimap<Integer, String> noRank = LinkedHashMultimap.create();
// LinkedHashMultimap<Integer, Integer> haveRank = LinkedHashMultimap.create();
HashMap<Integer, String> noRank = new HashMap<Integer, String>();
HashMap<Integer, Integer> haveRank = new HashMap<Integer, Integer>();
while (it.hasNext()) {
ValueWritable value = it.next();
System.out.println(value.toString());
if (value.rank == 0) {
noRank.put(value.getScore(), value.getName() + "\t" + value.getSubject());
} else {
haveRank.put(value.getScore(), value.getRank());
}
}
Set<Integer> scores = noRank.keySet();
Set<Integer> scores2 = haveRank.keySet();
// 處理邏輯不嚴謹,僅為展示處理過程
for (Integer score : scores) {
Integer minScore = 0;
Integer newRank = 0;
for (Integer score2 : scores2) {
if (minScore == 0) {
minScore = Math.abs(score2 - score);
newRank = haveRank.get(score2);
newRank += score > score2 ? -1 : 1;
} else if (minScore > Math.abs(score2 - score)) {
minScore = Math.abs(score2 - score);
newRank = haveRank.get(score2);
newRank += score > score2 ? -1 : 1;
}
}
System.out.println(noRank.get(score) + "\t" + score + "\t" + newRank);
// 省略排序輸出
}
}
@Override
protected void cleanup(Context context)
throws IOException, InterruptedException {
super.cleanup(context);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = ConfigurationHadoop.getConfigurationHadoop();
String[] otherArgs = new String[] { "/user/hdfs/a/test",
"/user/hdfs/b" };
Job job = Job.getInstance(conf, "FillAndSortTest");
job.setJarByClass(MultipleOutPut.class);
/*
* 關掉 speculative execution功能。 speculative
* execution功能是指,假如Hadoop發現有些任務執行的比較慢
* ,那麼,它會在其他的節點上再執行一個同樣的任務。這兩個任務,哪個先完成就以哪個結果為準。
* 但Reduce任務需要將數值寫入到HDFS的檔案裡
* ,而且這個檔名是固定的,如果同時執行兩個以上的Reduce任務,會導致寫入出錯,所以要關閉這個功能。
*/
job.setSpeculativeExecution(false);
job.setMapSpeculativeExecution(false);
job.setReduceSpeculativeExecution(false);
job.setMapperClass(FillAndSortMapper.class);
job.setGroupingComparatorClass(KeyComparator.class);
job.setReducerClass(FillAndSortReducer.class);
job.setMapOutputKeyClass(KeyWritable.class);
job.setMapOutputValueClass(ValueWritable.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
// CombineFileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileSystem hdfs = FileSystem.get(conf);
if (hdfs.exists(new Path(otherArgs[1]))) {
hdfs.delete(new Path(otherArgs[1]), true);
}
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
// TextOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
public static class KeyWritable implements WritableComparable<KeyWritable>{
private String subject;
private Integer score;
public KeyWritable() {
}
public KeyWritable(String subject, Integer score) {
// super();
this.subject = subject;
this.score = score;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(subject);
out.writeInt(score);
}
@Override
public void readFields(DataInput in) throws IOException {
subject = in.readUTF();
score = in.readInt();
}
@Override
public int compareTo(KeyWritable o) {
return -1*this.toString().compareTo(o.toString());
}
@Override
public String toString() {
return "KeyWritable [subject=" + subject + ", score=" + score + "]";
}
public String constructGroup() {
return subject;
}
public Integer getScore() {
return score;
}
public String getSubject() {
return subject;
}
}
public static class KeyComparator extends WritableComparator {
public KeyComparator() {
super(KeyWritable.class,true);
}
@SuppressWarnings("rawtypes")
@Override
public int compare(WritableComparable a, WritableComparable b) {
KeyWritable key1 = (KeyWritable)a;
KeyWritable key2 = (KeyWritable)b;
return key1.constructGroup().compareTo(key2.constructGroup());
}
}
public static class ValueWritable implements Writable {
private String name;
private String subject;
private Integer score;
private Integer rank;
public ValueWritable() {
}
public ValueWritable(String name, String subject, Integer score,
Integer rank) {
this.name = name;
this.subject = subject;
this.score = score;
this.rank = rank;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(name);
out.writeUTF(subject);
out.writeInt(score);
out.writeInt(rank);
}
@Override
public void readFields(DataInput in) throws IOException {
name = in.readUTF();
subject = in.readUTF();
score = in.readInt();
rank = in.readInt();
}
@Override
public String toString() {
return "ValueWritable [name=" + name + ", subject=" + subject
+ ", score=" + score + ", rank=" + rank + "]";
}
public String getName() {
return name;
}
public String getSubject() {
return subject;
}
public Integer getScore() {
return score;
}
public Integer getRank() {
return rank;
}
}
}