Hadoop MapReduce開發--資料去重
阿新 • • 發佈:2019-01-12
環境
hadoop-2.9.1
windows7
idea15
示例資料
file1.txt和file2.txt檔案儲存在路徑:C:\bigdata\example_data\mr_example\exp_02\
file1.txt
2012-3-1 a
2012-3-2 b
2012-3-3 c
2012-3-4 d
2012-3-5 a
2012-3-6 b
2012-3-7 c
2012-3-3 c
file2.txt
2012-3-1 b
2012-3-2 a
2012-3-3 b
2012-3-4 d
2012-3-5 a
2012-3-6 c
2012-3-7 d
2012-3-3 c
程式碼部分
DeMapper.java
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /** * 去重操作 */ public class DeMapper extends Mapper<LongWritable, Text, Text, NullWritable> { public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { context.write(new Text(value.toString().trim()), NullWritable.get()); } }
DeReducer.java
import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class DeReducer extends Reducer<Text, NullWritable, Text, NullWritable> { public void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { context.write(key, NullWritable.get()); } }
JobMain.java
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
import java.io.IOException;
public class JobMain {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(JobMain.class);
job.setMapperClass(DeMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
job.setReducerClass(DeReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
Path path = new Path(args[1]);
FileSystem fs = FileSystem.get(conf);
if(fs.exists(path)) {
fs.delete(path, true);
}
FileOutputFormat.setOutputPath(job, path);
job.setNumReduceTasks(1);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
執行程式碼
在windows7環境下執行前需要設定HADOOP_CLASSPATH
set HADOOP_CLASSPATH=mr_example.jar
注:mr_example.jar是以上程式碼打成jar包的名稱
執行命令
hadoop jar的全類名(包名+類名) 資料路徑 輸出路徑
eg:
C:\hadoop-2.9.1\bin\hadoop jar的全類名(包名+類名) file:///C:…\mr_example\exp_02*.txt C:…\mr_example\exp_02\out\
執行結果
2012-3-1 a
2012-3-1 b
2012-3-2 a
2012-3-2 b
2012-3-3 b
2012-3-3 c
2012-3-4 d
2012-3-5 a
2012-3-6 b
2012-3-6 c
2012-3-7 c
2012-3-7 d