1. 程式人生 > >MapReduce計算每年最大值

MapReduce計算每年最大值

wait true form ins ref pid nds 例如 artifact

1. 測試文件生成程序,參考

https://www.cnblogs.com/jonban/p/10555364.html

MapReduce程序示例如下:

2. 新建Maven項目 hadoop

3. pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
        http://maven.apache.org/xsd/maven-4.0.0.xsd"
> <modelVersion>4.0.0</modelVersion> <groupId>com.java</groupId> <artifactId>hadoop</artifactId> <version>1.0.0</version> <dependencies> <dependency> <groupId>org.apache.hadoop</groupId
> <artifactId>hadoop-common</artifactId> <version>3.2.0</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <
version>3.2.0</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>3.2.0</version> </dependency> </dependencies> <build> <finalName>${project.artifactId}</finalName> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.8.0</version> <configuration> <source>1.8</source> <target>1.8</target> <encoding>UTF-8</encoding> </configuration> </plugin> </plugins> </build> </project>

4. MaxMapper.java

package com.java.mapreduce;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

/**
 * 按年份映射分組
 * 
 * @author Logan
 * @createDate 2019-03-18
 * @version 1.0.0
 *
 */
public class MaxMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        String line = value.toString();
        String year = line.substring(0, 4);
        int num = Integer.parseInt(line.substring(8, 12));

        context.write(new Text(year), new IntWritable(num));
    }

}

5. MaxReducer.java

package com.java.mapreduce;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/**
 * 計算每年數據中的最大值
 * 
 * @author Logan
 * @createDate 2019-03-18
 * @version 1.0.0
 *
 */
public class MaxReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {

        int max = Integer.MIN_VALUE;
        for (IntWritable value : values) {
            max = Math.max(max, value.get());
        }

        context.write(key, new IntWritable(max));
    }

}

6. MaxJob.java

package com.java.mapreduce;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 * 主程序入口類
 * 
 * @author Logan
 * @createDate 2019-03-18
 * @version 1.0.0
 *
 */
public class MaxJob {
    public static void main(String[] args) {
        try {
            Job job = Job.getInstance();
            job.setJarByClass(MaxJob.class);
            job.setJobName("Get Max");

            // 輸入第一個參數為文件輸入路徑
            FileInputFormat.addInputPath(job, new Path(args[0]));

            // 輸入第二個參數為輸出結果文件路徑
            FileOutputFormat.setOutputPath(job, new Path(args[1]));

            job.setMapperClass(MaxMapper.class);
            job.setReducerClass(MaxReducer.class);

            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);

            job.waitForCompletion(true);

        } catch (Exception e) {
            e.printStackTrace();
        }

    }

}

.

MapReduce計算每年最大值