MapReduce使用Java程式碼實現
MR執行環境有兩種:本地測試環境,伺服器環境
第一種執行方式:伺服器端執行
直接在伺服器上,使用命令的方式呼叫,執行過程也在伺服器上
a、把MR程式打包(jar),傳送到伺服器上
b、通過: hadoop jar jar路徑 類的全限定名
Mapper
WordCountMapper .java
package com.mr;
import java.io.IOException;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/**
* 用到預設的 inputformat : fileInputFormat 類,把資料片段中的資料一行一行讀進來,每行下標為 key ,每行的內容為 value
* @author benxi
*
*/
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable >{
// 以單詞作為鍵
public Text k = new Text();
// 以 1 作為 value
public IntWritable v = new IntWritable(1);
@Override
protected void map(LongWritable key , Text value ,Context context )
throws IOException, InterruptedException {
String[] words = StringUtils. split ( value .toString(), " " );
for ( int i = 0; i < words . length ; i ++){
String w = words [ i ];
// 以單詞為 key
k .set( w );
context .write( k , v );
}
}
}
Reduce
WorldCountReduce .java
package com.mr;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class WorldCountReduce extends Reducer<Text, IntWritable, Text, IntWritable>{
/**
*
*/
@Override
protected void reduce(Text key , Iterable<IntWritable> ite ,Context context )
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable i : ite ){
sum += i .get();
}
context .write( key , new IntWritable( sum ));
}
}
執行檔案
package com.mr;
import java.io.FileOutputStream ;
import java.io.IOException ;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class RunJob {
public static void main(String[] args ) {
Configuration config = new Configuration();
try {
FileSystem fs = FileSystem. get ( config );
Job job = Job. getInstance ( config );
job .setJarByClass(RunJob. class );
job .setMapperClass(WordCountMapper. class );
job .setReducerClass(WorldCountReduce. class );
job .setMapOutputKeyClass(Text. class );
job .setMapOutputValueClass(IntWritable. class );
// 給 job 指定計算的輸入資料 / usr /input/wc.txt
FileInputFormat. setInputPaths ( job , new Path( "/usr/input/wc.txt" ));
// 給 job 指定計算之後結果的輸出目錄,該目錄不允許存在,如果存在, job 執行出錯
Path output = new Path( "/usr/output/wc" );
if ( fs .exists( output )){
fs .delete( output , true );
}
FileOutputFormat. setOutputPath ( job , output );
boolean f = job .waitForCompletion( true );
if ( f ){
System. out .println( " 執行成功 " );
}
} catch (Exception e ) {
e .printStackTrace();
}
}
}
然後把專案Export,選擇java》jar file 》選擇src目錄,並指定檔名和路徑。
把生成的jar包放到linux系統中,執行hadoop jar wc.jar com.mr.RunJob
方法二:本地執行
在windows的hadoop目錄bin目錄有一個winutils.exe
1、在windows下配置hadoop的環境變數
2、拷貝debug工具(winutils.exe)到HADOOP_HOME/bin
3、修改hadoop的原始碼 ,注意:確保專案的lib需要真實安裝的jdk的lib
4、MR呼叫的程式碼需要改變:
a、src不能有伺服器的hadoop配置檔案
b、在呼叫是使用:
Configuration config = new Configuration();
config.set("fs.defaultFS", "hdfs://node7:8020");
只需要修改一下RunJob中間的內容就行,其他java檔案內容一樣
public class RunJob {
public static void main(String[] args ) {
Configuration config = new Configuration();
System. setProperty ( "HADOOP_USER_NAME" , "root" ); 設定訪問主機名為root
config .set( "fs.defaultFS" , "hdfs://CentOS8:8020" );
try {
FileSystem fs = FileSystem. get ( config );
Job job = Job. getInstance ( config );
job .setJarByClass(RunJob. class );
job .setJobName( "wc" );
job .setMapperClass(WordCountMapper. class );
job .setReducerClass(WorldCountReduce. class );
job .setMapOutputKeyClass(Text. class );
job .setMapOutputValueClass(IntWritable. class );
// 給 job 指定計算的輸入資料 / usr /input/wc.txt
FileInputFormat. setInputPaths ( job , new Path( "/usr/input/wc.txt" ));
// 給 job 指定計算之後結果的輸出目錄,該目錄不允許存在,如果存在, job 執行出錯
Path output = new Path( "/usr/output/wc" );
if ( fs .exists( output )){
fs .delete( output , true );
}
FileOutputFormat. setOutputPath ( job , output );
boolean f = job .waitForCompletion( true );
if ( f ){
System. out .println( " 執行成功 " );
}
} catch (Exception e ) {
e .printStackTrace();
}
}
}
第三種方法:在本地把檔案上傳到伺服器上面去執行
需要在本地hosts檔案中配置埠號和主機名
在本地直接呼叫,執行過程在伺服器上(真正企業執行環境)
a、把MR程式打包(jar),直接放到本地
b、修改hadoop的原始碼 ,注意:確保專案的lib需要真實安裝的jdk的lib
c、增加一個屬性:配置打成jar包的檔案路徑
config.set(“mapred.jar”, “C:\Users\Administrator\Desktop\wc.jar”);
d、本地執行main方法,servlet呼叫MR。
public class RunJob {
public static void main(String[] args ) {
Configuration config = new Configuration();
System. setProperty ( "HADOOP_USER_NAME" , "root" );
// config.set("fs.defaultFS", " hdfs ://CentOS8:8020");
//C:\Users\ benxi \Desktop\wc.jar
config .set( "mapred.jar" , "C:\\Users\\benxi\\Desktop\\wc.jar" );
try {
FileSystem fs = FileSystem. get ( config );
Job job = Job. getInstance ( config );
job .setJarByClass(RunJob. class );
job .setJobName( "wc" );
job .setMapperClass(WordCountMapper. class );
job .setReducerClass(WorldCountReduce. class );
job .setMapOutputKeyClass(Text. class );
job .setMapOutputValueClass(IntWritable. class );
// 給 job 指定計算的輸入資料 / usr /input/wc.txt
FileInputFormat. setInputPaths ( job , new Path( "/usr/input/wc.txt" ));
// 給 job 指定計算之後結果的輸出目錄,該目錄不允許存在,如果存在, job 執行出錯
Path output = new Path( "/usr/output/wc" );
if ( fs .exists( output )){
fs .delete( output , true );
}
FileOutputFormat. setOutputPath ( job , output );
boolean f = job .waitForCompletion( true );
if ( f ){
System. out .println( " 執行成功 " );
}
} catch (Exception e ) {
e .printStackTrace();
}
}
}
設定用製表符分隔key和value
job.setInputFormatClass(KeyValueTextInputFormat.class);
在job檔案中設定多個檔案讀入
FileInputFormat.setInputPaths(job, new Path[] {new Path(“/usr/input/network.txt”),new Path(“/usr/input/user.txt”)});
在map中檢視讀取檔案的名字:
FileSplit split = (FileSplit) context.getInputSplit();
String name = split.getPath().getName();