1. 程式人生 > >MapReduce WordCount 練習max avg

MapReduce WordCount 練習max avg

max計算

需求:輸出每天最高溫度的日期及溫度
資料:
20170931 20.1
20170930 30.6
20170931 30.6
20170929 30.02
20170928 10.3
20170928 30.3
20170927 28.3
20170931 28.1

java程式碼:

       max

package com.beicai.wc1;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.log4j.chainsaw.Main;

//主類
public class Demo1Max {

//mapper實現類
	public static class MyMapper extends Mapper<LongWritable, Text, Text, DoubleWritable> {
		@Override
		protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
			String[] split = value.toString().split(" ");
                
			String date = split[0];
			// double temp = Double.parseDouble(split[1]);
			double temp = Double.valueOf(split[1]);
			context.write(new Text(date), new DoubleWritable(temp));

		}

	}
    //reduce實現類
	public static class MyReducer extends Reducer<Text, DoubleWritable, Text, DoubleWritable> {

		@Override
		protected void reduce(Text key, Iterable<DoubleWritable> value, Context context)
				throws IOException, InterruptedException {
			// key=date
			// value=list(10.1,30.6,12.5)
			double max = Double.MIN_VALUE;
			for (DoubleWritable v : value) {
				double temp = v.get();
				if (temp > max) {
					max = temp;
				}

			}
			context.write(key, new DoubleWritable(max));
		}
	}
    //驅動類
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		
		Job job = Job.getInstance(conf);
		
		job.setJarByClass(Demo1Max.class);
		
		job.setMapperClass(MyMapper.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(DoubleWritable.class);
		
		job.setReducerClass(MyReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		
		FileInputFormat.addInputPath(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		
		int status=job.waitForCompletion(true)?0:-1;
		System.exit(status);
	}
}

   avg

package com.beicai.wc1;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.log4j.chainsaw.Main;

public class Demo1Avg {

	public static class MyMapper extends Mapper<LongWritable, Text, Text, DoubleWritable> {
		@Override
		protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
			String[] split = value.toString().split(" ");

			String date = split[0];
			// double temp = Double.parseDouble(split[1]);
			double temp = Double.valueOf(split[1]);
			context.write(new Text(date), new DoubleWritable(temp));

		}

	}

	public static class MyReducer extends Reducer<Text, DoubleWritable, Text, DoubleWritable> {

		@Override
		protected void reduce(Text key, Iterable<DoubleWritable> value, Context context)
				throws IOException, InterruptedException {
			// key=date
			// value=list(10.1,30.6,12.5)
			
			double sum=0.0;
			int count=0;
			for (DoubleWritable v : value) {
				sum+=v.get();
				count++;

			}
			context.write(key, new DoubleWritable(sum/count));
		}
	}

	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		
		Job job = Job.getInstance(conf);
		
		job.setJarByClass(Demo1Avg.class);
		
		job.setMapperClass(MyMapper.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(DoubleWritable.class);
		
		job.setReducerClass(MyReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		
		FileInputFormat.addInputPath(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		
		int status=job.waitForCompletion(true)?0:-1;
		System.exit(status);
	}
}

上傳max.jar到linux

上傳測試檔案到in中 (max.txt): hdfs dfs -put max.txt /usr/hello/in

 輸入輸出: hadoop jar ./max.jar /usr/hello/in/max.txt /usr/hello/in/201 //路徑不能重複(存在)

           Or:   yarn jar ./max.jar /usr/hello/in/max.txt /usr/hello/in/201 //路徑不能重複(存在)

50070檢視  usr/hello/in/201/part-r-00000

linux檢視: hdfs dfs -cat /usr/hello/in/201/part-r-00000