1. 程式人生 > >MapReduce使用Java程式碼實現

MapReduce使用Java程式碼實現

MR執行環境有兩種:本地測試環境,伺服器環境

第一種執行方式:伺服器端執行

直接在伺服器上,使用命令的方式呼叫,執行過程也在伺服器上

   a、把MR程式打包(jar),傳送到伺服器上

   b、通過: hadoop jar jar路徑  類的全限定名

Mapper

WordCountMapper .java

package com.mr;
import java.io.IOException;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.IntWritable;
import
org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; /** * 用到預設的 inputformat : fileInputFormat 類,把資料片段中的資料一行一行讀進來,每行下標為 key ,每行的內容為 value * @author benxi * */ public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable
>{
// 以單詞作為鍵 public Text k = new Text(); // 以 1 作為 value public IntWritable v = new IntWritable(1); @Override protected void map(LongWritable key , Text value ,Context context ) throws IOException, InterruptedException { String[] words = StringUtils. split ( value .toString(), " "
); for ( int i = 0; i < words . length ; i ++){ String w = words [ i ]; // 以單詞為 key k .set( w ); context .write( k , v ); } } }

Reduce

WorldCountReduce .java

package com.mr;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WorldCountReduce extends Reducer<Text, IntWritable, Text, IntWritable>{
        /**
        *
        */
        @Override
        protected void reduce(Text key , Iterable<IntWritable> ite ,Context context )
                        throws IOException, InterruptedException {
                int sum = 0;
                for (IntWritable i : ite ){
                        sum += i .get();
               }
                context .write( key , new IntWritable( sum ));
       }
}

執行檔案

package com.mr;

import java.io.FileOutputStream ;
import java.io.IOException ;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class RunJob {
        public static void main(String[] args ) {
               Configuration config = new Configuration();
                try {
                       FileSystem fs = FileSystem. get ( config );
                       Job job = Job. getInstance ( config );
                        job .setJarByClass(RunJob. class );
                        job .setMapperClass(WordCountMapper. class );
                        job .setReducerClass(WorldCountReduce. class );
                        job .setMapOutputKeyClass(Text. class );
                        job .setMapOutputValueClass(IntWritable. class );
                        // 給 job 指定計算的輸入資料  / usr /input/wc.txt
                       FileInputFormat. setInputPaths ( job , new Path( "/usr/input/wc.txt" ));
                        // 給 job 指定計算之後結果的輸出目錄,該目錄不允許存在,如果存在, job 執行出錯
                       Path output = new Path( "/usr/output/wc" );
                        if ( fs .exists( output )){
                                fs .delete( output , true );
                       }
                       FileOutputFormat. setOutputPath ( job , output );
                        boolean f = job .waitForCompletion( true );
                        if ( f ){
                               System. out .println( " 執行成功 " );
                       }
               } catch (Exception e ) {
                        e .printStackTrace();
               }
       }
}

然後把專案Export,選擇java》jar file 》選擇src目錄,並指定檔名和路徑。

把生成的jar包放到linux系統中,執行hadoop jar wc.jar com.mr.RunJob

方法二:本地執行

在windows的hadoop目錄bin目錄有一個winutils.exe
1、在windows下配置hadoop的環境變數
2、拷貝debug工具(winutils.exe)到HADOOP_HOME/bin
3、修改hadoop的原始碼 ,注意:確保專案的lib需要真實安裝的jdk的lib
4、MR呼叫的程式碼需要改變:

           a、src不能有伺服器的hadoop配置檔案
           b、在呼叫是使用:
                   Configuration config = new  Configuration();
                   config.set("fs.defaultFS", "hdfs://node7:8020");

只需要修改一下RunJob中間的內容就行,其他java檔案內容一樣

public class RunJob {
        public static void main(String[] args ) {
               Configuration config = new Configuration();
               System. setProperty ( "HADOOP_USER_NAME" , "root" );     設定訪問主機名為root
                config .set( "fs.defaultFS" , "hdfs://CentOS8:8020" );
                try {
                       FileSystem fs = FileSystem. get ( config );

                       Job job = Job. getInstance ( config );
                        job .setJarByClass(RunJob. class );
                        job .setJobName( "wc" );
                        job .setMapperClass(WordCountMapper. class );
                        job .setReducerClass(WorldCountReduce. class );
                        job .setMapOutputKeyClass(Text. class );
                        job .setMapOutputValueClass(IntWritable. class );

                        // 給 job 指定計算的輸入資料  / usr /input/wc.txt
                       FileInputFormat. setInputPaths ( job , new Path( "/usr/input/wc.txt" ));
                        // 給 job 指定計算之後結果的輸出目錄,該目錄不允許存在,如果存在, job 執行出錯
                       Path output = new Path( "/usr/output/wc" );
                        if ( fs .exists( output )){
                                fs .delete( output , true );
                       }
                       FileOutputFormat. setOutputPath ( job , output );
                        boolean f = job .waitForCompletion( true );
                        if ( f ){
                               System. out .println( " 執行成功 " );
                       }
               } catch (Exception e ) {
                        e .printStackTrace();
               }
       }
}

第三種方法:在本地把檔案上傳到伺服器上面去執行
需要在本地hosts檔案中配置埠號和主機名

在本地直接呼叫,執行過程在伺服器上(真正企業執行環境)
a、把MR程式打包(jar),直接放到本地
b、修改hadoop的原始碼 ,注意:確保專案的lib需要真實安裝的jdk的lib
c、增加一個屬性:配置打成jar包的檔案路徑
config.set(“mapred.jar”, “C:\Users\Administrator\Desktop\wc.jar”);
d、本地執行main方法,servlet呼叫MR。

public class RunJob {
        public static void main(String[] args ) {
               Configuration config = new Configuration();
               System. setProperty ( "HADOOP_USER_NAME" , "root" );
//                config.set("fs.defaultFS", " hdfs ://CentOS8:8020");
                //C:\Users\ benxi \Desktop\wc.jar
                config .set( "mapred.jar" , "C:\\Users\\benxi\\Desktop\\wc.jar" );

                try {
                       FileSystem fs = FileSystem. get ( config );

                       Job job = Job. getInstance ( config );
                        job .setJarByClass(RunJob. class );
                        job .setJobName( "wc" );
                        job .setMapperClass(WordCountMapper. class );
                        job .setReducerClass(WorldCountReduce. class );
                        job .setMapOutputKeyClass(Text. class );
                        job .setMapOutputValueClass(IntWritable. class );

                        // 給 job 指定計算的輸入資料  / usr /input/wc.txt
                       FileInputFormat. setInputPaths ( job , new Path( "/usr/input/wc.txt" ));
                        // 給 job 指定計算之後結果的輸出目錄,該目錄不允許存在,如果存在, job 執行出錯
                       Path output = new Path( "/usr/output/wc" );
                        if ( fs .exists( output )){
                                fs .delete( output , true );
                       }
                       FileOutputFormat. setOutputPath ( job , output );
                        boolean f = job .waitForCompletion( true );
                        if ( f ){
                               System. out .println( " 執行成功 " );
                       }
               } catch (Exception e ) {
                        e .printStackTrace();
               }
       }
}

設定用製表符分隔key和value
job.setInputFormatClass(KeyValueTextInputFormat.class);
在job檔案中設定多個檔案讀入
FileInputFormat.setInputPaths(job, new Path[] {new Path(“/usr/input/network.txt”),new Path(“/usr/input/user.txt”)});
在map中檢視讀取檔案的名字:
FileSplit split = (FileSplit) context.getInputSplit();
String name = split.getPath().getName();