1. 程式人生 > >mapreduce 根據value值進行排序

mapreduce 根據value值進行排序

目前所知有兩種方法

1.map階段不做改變,在reduce階段對map的輸出進行快取,重寫cleanup方法,在其中對快取的資料進行排序輸出。
缺點:如果資料量過大,將消耗大量的記憶體

2.進行兩個Mapreduce操作
將第一個次Mapreduce的輸出value作為第二次map的key
,在第二次reduce再還原成原來的key value形式
如下為按照手機號產生流量的value進行排序的java程式碼示例

package com.yyx.mapr.flow;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import
org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import
java.io.IOException; public class FlowSort { static class FlowSortMapper extends Mapper<LongWritable,Text,FlowBean,Text> { private FlowBean flowBean = new FlowBean(); private Text text = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws
IOException, InterruptedException { String line = value.toString(); String[] words = line.split("\t"); String phoneNum = words[0]; flowBean.set(Long.parseLong(words[1]), Long.parseLong(words[2])); text.set(phoneNum); context.write(flowBean, text); } } static class FlowSortReducer extends Reducer<FlowBean, Text, Text, FlowBean> { @Override protected void reduce(FlowBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException { context.write(values.iterator().next(), key); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(FlowSort.class); //設定呼叫類 job.setMapperClass(FlowSort.FlowSortMapper.class); job.setReducerClass(FlowSort.FlowSortReducer.class); job.setMapOutputKeyClass(FlowBean.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); //指定輸入輸入輸出路徑 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); //將job中配置的相關引數,以及job所用的java類所用的jar包,提交給yarn去執行 //job.submit(); boolean res = job.waitForCompletion(true); System.exit(res ? 0 : 1); } }