1. 程式人生 > 實用技巧 >MapReduce簡單案例

MapReduce簡單案例

MapReduce簡單案例

目錄

案例一 檔案合併和去重操作

對於兩個輸入檔案,即檔案A和檔案B,請編寫MapReduce程式,對兩個檔案進行合併,並剔除其中重複的內容,得到一個新的輸出檔案C。下面是輸入檔案和輸出檔案的一個樣例供參考。

輸入檔案A的樣例如下:

資料
20150101 x
20150103 x
20150104 y
20150102 y
20150105 z
20150106 x

輸入檔案B的樣例如下:

資料
20150101 y
20150102 y
20150103 x
20150104 z
20150105 y

根據輸入檔案A和B合併得到的輸出檔案C的樣例如下:

資料
20150101 x
20150101 y
20150102 y
20150103 x
20150104 y
20150104 z
20150105 y
20150105 z
20150106 x

程式碼:

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class hebing {
    public static class Mymapper extends Mapper<Object, Text, Text, Text> {  
        public void map(Object key, Text value, Context content) throws IOException, InterruptedException {  
            content.write(value, new Text(""));  
        }  
    }  
        public static class Myreducer extends Reducer<Text, Text, Text, Text> {  
        public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {  
            context.write(key, new Text(""));  
        }  
    }
        public static void main(String[] args) throws Exception{

        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf,"hebing");
        job.setJarByClass(hebing.class);
        job.setMapperClass(hebing.Mymapper.class);
        job.setCombinerClass(hebing.Myreducer.class);
        job.setReducerClass(hebing.Myreducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        FileInputFormat.addInputPath(job, new Path("hdfs://localhost:9000/input"));
        FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost:9000/output"));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

案例二 實現對輸入檔案的排序

現在有多個輸入檔案,每個檔案中的每行內容均為一個整數。要求讀取所有檔案中的整數,進行升序排序後,輸出到一個新的檔案中,輸出的資料格式為每行兩個整數,第一個數字為第二個整數的排序位次,第二個整數為原待排列的整數。下面是輸入檔案和輸出檔案的一個樣例供參考。

輸入檔案1的樣例如下:

資料
33
37
12
40

輸入檔案2的樣例如下:

資料
4
16
39
5

輸入檔案3的樣例如下:

資料
1
45
25

根據輸入檔案1、2和3得到的輸出檔案如下:

序號 資料
1 1
2 4
3 5
4 12
5 16
6 25
7 33
8 37
9 39
10 40
11 45

程式碼:

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class Sort {
    public static class Mymapper extends Mapper<Object, Text, IntWritable, IntWritable>{
        private static IntWritable v = new IntWritable();
        public void map(Object key, Text value, Context context) throws IOException,InterruptedException{
        v.set(Integer.parseInt(value.toString()));
        context.write(v, new IntWritable(1));
        }
    }
    public static class Myreducer extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable>{
        private static IntWritable line_num = new IntWritable(1);
        public void reduce(IntWritable key, Iterable<IntWritable> values, Context context) throws IOException,InterruptedException{
           for(IntWritable num : values) {
             context.write(line_num, key);
             line_num = new IntWritable(line_num.get() + 1);
    }
  }
}
    public static void main(String[] args) throws Exception{
    	/**Designed by 王立同**/
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf,"Sort");
        job.setJarByClass(Sort.class);
        job.setMapperClass(Sort.Mymapper.class);
        job.setReducerClass(Sort.Myreducer.class);
        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPath(job, new Path("hdfs://localhost:9000/input"));
        FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost:9000/output"));
        System.exit(job.waitForCompletion(true) ? 0 : 1);

    }
}

案例三 對給定的表格進行資訊挖掘

下面給出一個child-parent的表格,要求挖掘其中的父子輩關係,給出祖孫輩關係的表格。 輸入檔案內容如下:

child parent
Steven Lucy
Steven Jack
Jone Lucy
Jone Jack
Lucy Mary
Lucy Frank
Jack Alice
Jack Jesse
David Alice
David Jesse
Philip David
Philip Alma
Mark David
Mark Alma

​ 輸出檔案內容如下:

grandchild grandparent
Steven Alice
Steven Jesse
Jone Alice
Jone Jesse
Steven Mary
Steven Frank
Jone Mary
Jone Frank
Philip Alice
Philip Jesse
Mark Alice
Mark Jesse

程式碼:

import java.io.IOException;
import java.util.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class Child2Parent {
    public static class Mymapper extends Mapper<Object, Text, Text, Text>{
        public void map(Object key, Text value, Context context) throws IOException,InterruptedException{
             String[] cap=value.toString().split("[\\s|\\t]+");//分割資料
              if (!"child".equals(cap[0])) {
                  String cName = cap[0];
                  String pName = cap[1];
                  context.write(new Text(pName), new Text("r#"+cName));//打標籤
                  context.write(new Text(cName), new Text("l#"+pName));
              }
        }
    }
    public static class Myreduce extends Reducer<Text, Text, Text, Text>{
    	public static int runtime = 0;
        public void reduce(Text key, Iterable<Text> values,Context context) throws IOException,InterruptedException{
            if (runtime == 0) {
                context.write(new Text("grandchild"), new Text("grandparent"));
                runtime++;
            }
            List<String> grandChild = new ArrayList<>();
            List<String> grandParent = new ArrayList<>();
            for (Text text : values) {
                String[] relation = text.toString().split("#");
                if ("l".equals(relation[0])) {
                    grandChild.add(relation[1]);
                } else {
                    grandParent.add(relation[1]);
                }
            }
            for (String l:grandChild) {
                for (String r:grandParent) {
                	context.write(new Text(r), new Text(l));
                }
            }
        }
    }
    public static void main(String[] args) throws Exception{
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf,"TableJoin");
        job.setJarByClass(Child2Parent.class);
        job.setMapperClass(Child2Parent.Mymapper.class);
        job.setReducerClass(Child2Parent.Myreduce.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        FileInputFormat.addInputPath(job, new Path("hdfs://localhost:9000/input"));
        FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost:9000/output"));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}