1. 程式人生 > >MapReduce測試資料計算補全

MapReduce測試資料計算補全

package xxx.hadoop;


import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Set;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;


import xxx.common.ConfigurationHadoop;


/* 構造測試資料:姓名、科目、得分、名次
 * 張4     數學      96
 * 張5     數學      95      5
 * 張1     數學      100     1
 * 張3     數學      98
 * 張2     數學      99      2
 * 需要把缺的名次補全,並按名次順序輸出
 */
public class FillAndSortTest {


public static class FillAndSortMapper extends
Mapper<LongWritable, Text, KeyWritable, ValueWritable> {


private KeyWritable keyWritable;


private ValueWritable valueWritable;


@Override
protected void setup(Context context) throws IOException, InterruptedException {
//
InputSplit split = context.getInputSplit();
// String text = split.toString();
super.setup(context);
}


public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
System.out.println(value.toString());
String line = value.toString();
String[] col = line.split("\t");


keyWritable = new KeyWritable(col[1], Integer.parseInt(col[2]));
if (col.length == 3) {
valueWritable = new ValueWritable(col[0], col[1], Integer.parseInt(col[2]), Integer.parseInt("0"));
} else {
valueWritable = new ValueWritable(col[0], col[1], Integer.parseInt(col[2]), Integer.parseInt(col[3]));
}
context.write(keyWritable, valueWritable);
}


@Override
protected void cleanup(Context context)
throws IOException, InterruptedException {
super.cleanup(context);
}
}


public static class FillAndSortReducer extends
Reducer<KeyWritable, ValueWritable, NullWritable, Text> {


protected void setup(Context context) throws IOException, InterruptedException {
super.setup(context);
}


public void reduce(KeyWritable key, Iterable<ValueWritable> values,
Context context) throws IOException, InterruptedException {


Iterator<ValueWritable> it = values.iterator();
// LinkedHashMultimap<Integer, String> noRank = LinkedHashMultimap.create();
// LinkedHashMultimap<Integer, Integer> haveRank = LinkedHashMultimap.create();
HashMap<Integer, String> noRank = new HashMap<Integer, String>();
HashMap<Integer, Integer> haveRank = new HashMap<Integer, Integer>();


while (it.hasNext()) {
ValueWritable value = it.next();
System.out.println(value.toString());
if (value.rank == 0) {
noRank.put(value.getScore(), value.getName() + "\t" + value.getSubject());
} else {
haveRank.put(value.getScore(), value.getRank());
}
}


Set<Integer> scores = noRank.keySet();
Set<Integer> scores2 = haveRank.keySet();


// 處理邏輯不嚴謹,僅為展示處理過程
for (Integer score : scores) {
Integer minScore = 0;
Integer newRank = 0;
for (Integer score2 : scores2) {
if (minScore == 0) {
minScore = Math.abs(score2 - score);
newRank = haveRank.get(score2);
newRank += score > score2 ? -1 : 1; 
} else if (minScore > Math.abs(score2 - score)) {
minScore = Math.abs(score2 - score);
newRank = haveRank.get(score2);
newRank += score > score2 ? -1 : 1;
}
}
System.out.println(noRank.get(score) + "\t" + score + "\t" + newRank);
// 省略排序輸出
}
}


@Override
protected void cleanup(Context context)
throws IOException, InterruptedException {

super.cleanup(context);
}
}


public static void main(String[] args) throws Exception {
Configuration conf = ConfigurationHadoop.getConfigurationHadoop();


String[] otherArgs = new String[] { "/user/hdfs/a/test",
"/user/hdfs/b" };


Job job = Job.getInstance(conf, "FillAndSortTest");
job.setJarByClass(MultipleOutPut.class);


/*
* 關掉 speculative execution功能。 speculative
* execution功能是指,假如Hadoop發現有些任務執行的比較慢
* ,那麼,它會在其他的節點上再執行一個同樣的任務。這兩個任務,哪個先完成就以哪個結果為準。
* 但Reduce任務需要將數值寫入到HDFS的檔案裡
* ,而且這個檔名是固定的,如果同時執行兩個以上的Reduce任務,會導致寫入出錯,所以要關閉這個功能。
*/
job.setSpeculativeExecution(false);
job.setMapSpeculativeExecution(false);
job.setReduceSpeculativeExecution(false);


job.setMapperClass(FillAndSortMapper.class);
job.setGroupingComparatorClass(KeyComparator.class);
job.setReducerClass(FillAndSortReducer.class);

job.setMapOutputKeyClass(KeyWritable.class);
job.setMapOutputValueClass(ValueWritable.class);

job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);

FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
// CombineFileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileSystem hdfs = FileSystem.get(conf);
if (hdfs.exists(new Path(otherArgs[1]))) {
hdfs.delete(new Path(otherArgs[1]), true);
}
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
// TextOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

System.exit(job.waitForCompletion(true) ? 0 : 1);
}

public static class KeyWritable implements WritableComparable<KeyWritable>{

private String subject;


private Integer score;


public KeyWritable() {

}


public KeyWritable(String subject, Integer score) {
// super();
this.subject = subject;
this.score = score;
}


@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(subject);
out.writeInt(score);
}


@Override
public void readFields(DataInput in) throws IOException {
subject = in.readUTF();
score = in.readInt();
}


@Override
public int compareTo(KeyWritable o) {


return -1*this.toString().compareTo(o.toString());
}


@Override
public String toString() {
return "KeyWritable [subject=" + subject + ", score=" + score + "]";
}


public String constructGroup() {

return subject;
}


public Integer getScore() {
return score;
}


public String getSubject() {
return subject;
}
}

public static class KeyComparator extends WritableComparator {


public KeyComparator() {
super(KeyWritable.class,true);
}


@SuppressWarnings("rawtypes")
@Override
public int compare(WritableComparable a, WritableComparable b) {


KeyWritable key1 = (KeyWritable)a;
KeyWritable key2 = (KeyWritable)b;


return key1.constructGroup().compareTo(key2.constructGroup());
}
}


public static class ValueWritable implements Writable {


private String name;


private String subject;


private Integer score;


private Integer rank;


public ValueWritable() {


}


public ValueWritable(String name, String subject, Integer score,
Integer rank) {
this.name = name;
this.subject = subject;
this.score = score;
this.rank = rank;
}


@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(name);
out.writeUTF(subject);
out.writeInt(score);
out.writeInt(rank);
}


@Override
public void readFields(DataInput in) throws IOException {
name = in.readUTF();
subject = in.readUTF();
score = in.readInt();
rank = in.readInt();
}


@Override
public String toString() {
return "ValueWritable [name=" + name + ", subject=" + subject
+ ", score=" + score + ", rank=" + rank + "]";
}


public String getName() {
return name;
}


public String getSubject() {
return subject;
}


public Integer getScore() {
return score;
}


public Integer getRank() {
return rank;
}
}
}