1. 程式人生 > 其它 >實驗四MapReduce程式設計實踐實驗

實驗四MapReduce程式設計實踐實驗

一、程式設計實現檔案合併和去重操作

對於兩個輸入檔案,即檔案A和檔案B,請編寫MapReduce程式,對兩個檔案進行合併,並剔除其中重複的內容,得到一個新的輸出檔案C。下面是輸入檔案和輸出檔案的一個樣例供參考。

    輸入檔案A的樣例如下:

20150101 x

20150102 y

20150103 x

20150104 y

20150105 z

20150106 x

輸入檔案B的樣例如下:

20150101 y

20150102 y

20150103 x

20150104 z

20150105 y

根據輸入檔案AB合併得到的輸出檔案C的樣例如下:

20150101 x

20150101 y

20150102 y

20150103 x

20150104 y

20150104 z

20150105 y

20150105 z

20150106 x

解題思路:

資料去重的最終目標是讓原始資料中出現次數超過一次的資料在輸出檔案中只出現一次。由於shuffle過程會有合併相同key值記錄的過程,會想到將不同檔案中相同內容資料的Key設定成一樣的,即是Map處理後是一樣的,然後把交給Reduce,無論這個資料的value-list是怎麼樣,只要在最終結果輸出它的key就行了。

  1. 啟動Hadoop

   cd /usr/local/hadoop

./sbin/start-dfs.sh

  2.建立檔案AB

    sudo mkdir MapReduce && cd MapReduce

sudo vim A

sudo vim B

  3.編寫Java檔案實現MapReduce

sudo vim Merge.java

原始碼:

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.util.GenericOptionsParser;

public class Merge {

/**

 * @param args

 * A,B兩個檔案進行合併,並剔除其中重複的內容,得到一個新的輸出檔案C

 */

//過載map函式,直接將輸入中的value複製到輸出資料的key

public static class Map extends Mapper<Object, Text, Text, Text>{

private static Text text = new Text();

public void map(Object key, Text value, Context context) throws IOException,InterruptedException{

text = value;

context.write(text, new Text(""));

}

}

//過載reduce函式,直接將輸入中的key複製到輸出資料的key

public static class Reduce extends Reducer<Text, Text, Text, Text>{

public void reduce(Text key, Iterable<Text> values, Context context ) throws IOException,InterruptedException{

context.write(key, new Text(""));

}

}

public static void main(String[] args) throws Exception{

// TODO Auto-generated method stub

Configuration conf = new Configuration();

conf.set("fs.default.name","hdfs://localhost:9000");

String[] otherArgs = new String[]{"input","output"}; /* 直接設定輸入引數 */

if (otherArgs.length != 2) {

System.err.println("Usage: wordcount <in><out>");

System.exit(2);

}

Job job = Job.getInstance(conf,"Merge and duplicate removal");

job.setJarByClass(Merge.class);

job.setMapperClass(Map.class);

job.setCombinerClass(Reduce.class);

job.setReducerClass(Reduce.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(Text.class);

FileInputFormat.addInputPath(job, new Path(otherArgs[0]));

FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

System.exit(job.waitForCompletion(true) ? 0 : 1);

}

}

4.賦予使用者相關許可權

sudo chown -R hadoop /usr/local/hadoop

 5.新增編譯所需的jar

vim ~/.bashrc

export HADOOP_HOME=/usr/local/hadoop

exportCLASSPATH=$($HADOOP_HOME/bin/hadoop classpath):$CLASSPATH

source ~/.bashrc

 6.編譯 Merge.java,打包生成的 class 檔案為 jar

javac Merge.java

jar -cvf Merge.jar *.class

 7.使用剛生成的 Merge.jar

/usr/local/hadoop/bin/hadoop jar Merge.jar Merge

 8.檢視輸出結果

 /usr/local/hadoop/bin/hdfs dfs -cat output/*

二、編寫程式實現對輸入檔案的排序

現在有多個輸入檔案,每個檔案中的每行內容均為一個整數。要求讀取所有檔案中的整數,進行升序排序後,輸出到一個新的檔案中,輸出的資料格式為每行兩個整數,第一個數字為第二個整數的排序位次,第二個整數為原待排列的整數。下面是輸入檔案和輸出檔案的一個樣例供參考。

輸入檔案1的樣例如下:

33

37

12

40

輸入檔案2的樣例如下:

4

16

39

5

輸入檔案3的樣例如下:

1

45

25

根據輸入檔案123得到的輸出檔案如下:

1 1

2 4

3 5

4 12

5 16

6 25

7 33

8 37

9 39

10 40

11 45

 

Java程式碼:

package com.MergeSort;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.util.GenericOptionsParser;

public class MergeSort {

public static class Map extends Mapper<Object,Text,IntWritable,IntWritable>{

private static IntWritable data=new IntWritable();

public void map(Object key,Text value,Context context) throws IOException, InterruptedException{

String line=value.toString();

data.set(Integer.parseInt(line));

context.write(data, new IntWritable(1));

}

}

public static class Reduce extends Reducer<IntWritable,IntWritable,IntWritable,IntWritable>{

private static IntWritable linenum=new IntWritable(1);

public void reduce(IntWritable key,Iterable <IntWritable>values,Context context) throws IOException, InterruptedException{

for(IntWritable num:values){

context.write(linenum, key);

linenum=new IntWritable(linenum.get()+1);

}

 

}

}

 

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException{

Configuration conf=new Configuration();

conf.set("fs.defaultFS","hdfs://localhost:9000");

String[] str=new String[]{"input","output"};

String[] otherArgs=new GenericOptionsParser(conf,str).getRemainingArgs();

if(otherArgs.length!=2){

System.err.println("Usage:mergesort<in><out>");

System.exit(2);

}

Job job=Job.getInstance(conf,"mergesort");

job.setJarByClass(MergeSort.class);

job.setMapperClass(Map.class);

job.setReducerClass(Reduce.class);

job.setOutputKeyClass(IntWritable.class);

job.setOutputValueClass(IntWritable.class);

FileInputFormat.addInputPath(job,new Path(otherArgs[0]));

FileOutputFormat.setOutputPath(job,new Path(otherArgs[1]));

System.exit(job.waitForCompletion(true)?0:1);

}

}

三、對給定的表格進行資訊挖掘
下面給出一個child-parent的表格,要求挖掘其中的父子輩關係,給出祖孫輩關係的表格。
輸入檔案內容如下:

 child parent

Steven        Lucy

Steven        Jack

Jone          Lucy

Jone          Jack

Lucy          Mary

Lucy          Frank

Jack          Alice

Jack          Jesse

David        Alice

David        Jesse

Philip        David

Philip        Alma

Mark        David

Mark        Alma

輸出的檔案內容如下:

grandchild       grandparent

Steven          Alice

Steven          Jesse

Jone            Alice

Jone            Jesse

Steven          Mary

Steven          Frank

Jone            Mary

Jone            Frank

Philip           Alice

Philip           Jesse

Mark           Alice

Mark           Jesse

Java程式碼:

package com.join;

import java.io.IOException;

import java.util.*;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class STjoin {

public static int time = 0;

    public static class Map extends Mapper<Object, Text, Text, Text> {

        public void map(Object key, Text value, Context context)

                throws IOException, InterruptedException {

            String child_name = new String();

            String parent_name = new String();

            String relation_type = new String();

            String line = value.toString();

            int i = 0;

            while (line.charAt(i) != ' ') {

                i++;

            }

            String[] values = { line.substring(0, i), line.substring(i + 1) };

            if (values[0].compareTo("child") != 0) {

                child_name = values[0];

                parent_name = values[1];

                relation_type = "1";

                context.write(new Text(values[1]), new Text(relation_type + "+"   + child_name + "+" + parent_name));

                relation_type = "2";

                context.write(new Text(values[0]), new Text(relation_type + "+"

                        + child_name + "+" + parent_name));

            }}}

    public static class Reduce extends Reducer<Text, Text, Text,Text> {

        public void reduce(Text key, Iterable values, Context context)

                throws IOException, InterruptedException {

            if (time == 0) {

                context.write(new Text("grand_child"), new Text("grand_parent"));

                time++;

            }

            int grand_child_num = 0;

            String grand_child[] = new String[10];

            int grand_parent_num = 0;

            String grand_parent[] = new String[10];

            Iterator ite = values.iterator();

            while (ite.hasNext()) {

                String record = ite.next().toString();

                int len = record.length();

                int i = 2;

                if (len == 0)

                    continue;

                char relation_type = record.charAt(0);

                String child_name = new String();

                String parent_name = new String();

                while (record.charAt(i) != '+') {

                    child_name = child_name + record.charAt(i);

                    i++;

                }

                i = i + 1;

                while (i < len) {

                    parent_name = parent_name + record.charAt(i);

                    i++;

                }

                if (relation_type == '1') {

                    grand_child[grand_child_num] = child_name;

                    grand_child_num++;

                } else {

                    grand_parent[grand_parent_num] = parent_name;

                    grand_parent_num++;

                }}

            if (grand_parent_num != 0 && grand_child_num != 0) {

                for (int m = 0; m < grand_child_num; m++) {

                    for (int n = 0; n < grand_parent_num; n++) {

                        context.write(new Text(grand_child[m]), new Text(grand_parent[n]));

                    }

                }

            } }}

    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();

        conf.set("fs.defaultFS", "hdfs://localhost:9000");

        String[] otherArgs = new String[] { "input", "output" };

        if (otherArgs.length != 2) {

          System.err.println("Usage: Single Table Join  ");

             System.exit(2);}

         Job job = Job.getInstance(conf, "Single table join ");

         job.setJarByClass(STjoin.class);

         job.setMapperClass(Map.class);

         job.setReducerClass(Reduce.class);

         job.setOutputKeyClass(Text.class);

         job.setOutputValueClass(Text.class);

         FileInputFormat.addInputPath(job, new Path(otherArgs[0]));

         FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

         System.exit(job.waitForCompletion(true) ? 0 : 1);

     }

}

完整報告請私信