mapreduce 根據value值進行排序
阿新 • • 發佈:2019-02-05
目前所知有兩種方法
1.map階段不做改變,在reduce階段對map的輸出進行快取,重寫cleanup方法,在其中對快取的資料進行排序輸出。
缺點:如果資料量過大,將消耗大量的記憶體
2.進行兩個Mapreduce操作
將第一個次Mapreduce的輸出value作為第二次map的key
,在第二次reduce再還原成原來的key value形式
如下為按照手機號產生流量的value進行排序的java程式碼示例
package com.yyx.mapr.flow;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class FlowSort {
static class FlowSortMapper extends Mapper<LongWritable,Text,FlowBean,Text> {
private FlowBean flowBean = new FlowBean();
private Text text = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] words = line.split("\t");
String phoneNum = words[0];
flowBean.set(Long.parseLong(words[1]), Long.parseLong(words[2]));
text.set(phoneNum);
context.write(flowBean, text);
}
}
static class FlowSortReducer extends Reducer<FlowBean, Text, Text, FlowBean> {
@Override
protected void reduce(FlowBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
context.write(values.iterator().next(), key);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(FlowSort.class);
//設定呼叫類
job.setMapperClass(FlowSort.FlowSortMapper.class);
job.setReducerClass(FlowSort.FlowSortReducer.class);
job.setMapOutputKeyClass(FlowBean.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
//指定輸入輸入輸出路徑
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//將job中配置的相關引數,以及job所用的java類所用的jar包,提交給yarn去執行
//job.submit();
boolean res = job.waitForCompletion(true);
System.exit(res ? 0 : 1);
}
}