MapReduce簡單案例
阿新 • • 發佈:2020-07-27
MapReduce簡單案例
目錄
案例一 檔案合併和去重操作
對於兩個輸入檔案,即檔案A和檔案B,請編寫MapReduce程式,對兩個檔案進行合併,並剔除其中重複的內容,得到一個新的輸出檔案C。下面是輸入檔案和輸出檔案的一個樣例供參考。
輸入檔案A的樣例如下:
資料 |
---|
20150101 x |
20150103 x |
20150104 y |
20150102 y |
20150105 z |
20150106 x |
輸入檔案B的樣例如下:
資料 |
---|
20150101 y |
20150102 y |
20150103 x |
20150104 z |
20150105 y |
根據輸入檔案A和B合併得到的輸出檔案C的樣例如下:
資料 |
---|
20150101 x |
20150101 y |
20150102 y |
20150103 x |
20150104 y |
20150104 z |
20150105 y |
20150105 z |
20150106 x |
程式碼:
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class hebing { public static class Mymapper extends Mapper<Object, Text, Text, Text> { public void map(Object key, Text value, Context content) throws IOException, InterruptedException { content.write(value, new Text("")); } } public static class Myreducer extends Reducer<Text, Text, Text, Text> { public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { context.write(key, new Text("")); } } public static void main(String[] args) throws Exception{ Configuration conf = new Configuration(); Job job = Job.getInstance(conf,"hebing"); job.setJarByClass(hebing.class); job.setMapperClass(hebing.Mymapper.class); job.setCombinerClass(hebing.Myreducer.class); job.setReducerClass(hebing.Myreducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path("hdfs://localhost:9000/input")); FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost:9000/output")); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
案例二 實現對輸入檔案的排序
現在有多個輸入檔案,每個檔案中的每行內容均為一個整數。要求讀取所有檔案中的整數,進行升序排序後,輸出到一個新的檔案中,輸出的資料格式為每行兩個整數,第一個數字為第二個整數的排序位次,第二個整數為原待排列的整數。下面是輸入檔案和輸出檔案的一個樣例供參考。
輸入檔案1的樣例如下:
資料 |
---|
33 |
37 |
12 |
40 |
輸入檔案2的樣例如下:
資料 |
---|
4 |
16 |
39 |
5 |
輸入檔案3的樣例如下:
資料 |
---|
1 |
45 |
25 |
根據輸入檔案1、2和3得到的輸出檔案如下:
序號 | 資料 |
---|---|
1 | 1 |
2 | 4 |
3 | 5 |
4 | 12 |
5 | 16 |
6 | 25 |
7 | 33 |
8 | 37 |
9 | 39 |
10 | 40 |
11 | 45 |
程式碼:
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class Sort {
public static class Mymapper extends Mapper<Object, Text, IntWritable, IntWritable>{
private static IntWritable v = new IntWritable();
public void map(Object key, Text value, Context context) throws IOException,InterruptedException{
v.set(Integer.parseInt(value.toString()));
context.write(v, new IntWritable(1));
}
}
public static class Myreducer extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable>{
private static IntWritable line_num = new IntWritable(1);
public void reduce(IntWritable key, Iterable<IntWritable> values, Context context) throws IOException,InterruptedException{
for(IntWritable num : values) {
context.write(line_num, key);
line_num = new IntWritable(line_num.get() + 1);
}
}
}
public static void main(String[] args) throws Exception{
/**Designed by 王立同**/
Configuration conf = new Configuration();
Job job = Job.getInstance(conf,"Sort");
job.setJarByClass(Sort.class);
job.setMapperClass(Sort.Mymapper.class);
job.setReducerClass(Sort.Myreducer.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path("hdfs://localhost:9000/input"));
FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost:9000/output"));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
案例三 對給定的表格進行資訊挖掘
下面給出一個child-parent的表格,要求挖掘其中的父子輩關係,給出祖孫輩關係的表格。 輸入檔案內容如下:
child | parent |
---|---|
Steven | Lucy |
Steven | Jack |
Jone | Lucy |
Jone | Jack |
Lucy | Mary |
Lucy | Frank |
Jack | Alice |
Jack | Jesse |
David | Alice |
David | Jesse |
Philip | David |
Philip | Alma |
Mark | David |
Mark | Alma |
輸出檔案內容如下:
grandchild | grandparent |
---|---|
Steven | Alice |
Steven | Jesse |
Jone | Alice |
Jone | Jesse |
Steven | Mary |
Steven | Frank |
Jone | Mary |
Jone | Frank |
Philip | Alice |
Philip | Jesse |
Mark | Alice |
Mark | Jesse |
程式碼:
import java.io.IOException;
import java.util.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class Child2Parent {
public static class Mymapper extends Mapper<Object, Text, Text, Text>{
public void map(Object key, Text value, Context context) throws IOException,InterruptedException{
String[] cap=value.toString().split("[\\s|\\t]+");//分割資料
if (!"child".equals(cap[0])) {
String cName = cap[0];
String pName = cap[1];
context.write(new Text(pName), new Text("r#"+cName));//打標籤
context.write(new Text(cName), new Text("l#"+pName));
}
}
}
public static class Myreduce extends Reducer<Text, Text, Text, Text>{
public static int runtime = 0;
public void reduce(Text key, Iterable<Text> values,Context context) throws IOException,InterruptedException{
if (runtime == 0) {
context.write(new Text("grandchild"), new Text("grandparent"));
runtime++;
}
List<String> grandChild = new ArrayList<>();
List<String> grandParent = new ArrayList<>();
for (Text text : values) {
String[] relation = text.toString().split("#");
if ("l".equals(relation[0])) {
grandChild.add(relation[1]);
} else {
grandParent.add(relation[1]);
}
}
for (String l:grandChild) {
for (String r:grandParent) {
context.write(new Text(r), new Text(l));
}
}
}
}
public static void main(String[] args) throws Exception{
Configuration conf = new Configuration();
Job job = Job.getInstance(conf,"TableJoin");
job.setJarByClass(Child2Parent.class);
job.setMapperClass(Child2Parent.Mymapper.class);
job.setReducerClass(Child2Parent.Myreduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path("hdfs://localhost:9000/input"));
FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost:9000/output"));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}