java操作hadoop之MapReduce
阿新 • • 發佈:2022-12-10
1.Mapper檔案WordCountMapper.java
package com.hdfs; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> { private final Text outK = new Text(); private final IntWritable outV = new IntWritable(1); @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException { String line = value.toString(); String[] words = line.split(" "); for (String word : words) { outK.set(word); context.write(outK, outV); } } }
2.Reducer檔案WordCountReducer.java
package com.hdfs; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private final IntWritable outV = new IntWritable(); @Override protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable value : values) { sum += value.get(); } outV.set(sum); context.write(key, outV); } }
3.Driver檔案WordCountDriver.java
package com.hdfs; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class WordCountDriver { public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { // 1.建立job例項 Configuration config = new Configuration(); Job job = Job.getInstance(config); // 2.設定jar job.setJarByClass(WordCountDriver.class); // 3.設定Mapper和Reducer job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordCountReducer.class); // 4.設定map輸出的kv型別 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); // 5.設定最終輸出的kv型別 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 6.設定輸入路徑和輸出路徑 FileInputFormat.setInputPaths(job, new Path("/home/navy/Desktop/work/hadoop/HadoopOperate/input")); FileOutputFormat.setOutputPath(job, new Path("/home/navy/Desktop/work/hadoop/HadoopOperate/output")); // 7.提交job boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); } }
4.pom.xml檔案
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.example</groupId> <artifactId>HdfsOperate</artifactId> <version>1.0-SNAPSHOT</version> <properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>3.2.4</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.13.2</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>2.0.5</version> </dependency> </dependencies> </project>
5.專案結構
6.執行WordCountDriver.java裡的main方法