1. 程式人生 > >MapReduce平行計算框架

MapReduce平行計算框架

1.思想:分而治之

map:對每一部分資料進行處理

reduce:合併

2.資料流動的形式是<key,value>

1.

Map階段由一定數量的Map Task組成

*輸入資料格式解析:InputFormat

*輸入資料處理:Mapper

*資料分組:Partitioner

2.

Reduce階段由一定數量的Reduce任務組成

*資料遠端拷貝

*資料按照Key排序

*資料處理:Reducer

*資料輸出格式:OutputFormat

3.編寫MapReduce程式

1.pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.hadoop</groupId>
  <artifactId>mapreduce</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <packaging>jar</packaging>

  <name>mapreduce</name>
  <url>http://maven.apache.org</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  </properties>

  <dependencies>
    <dependency>
    	<groupId>org.apache.hadoop</groupId>
    	<artifactId>hadoop-client</artifactId>
    	<version>2.5.0</version>
    	
    </dependency>
    <dependency>
    	<groupId>junit</groupId>
    	<artifactId>junit</artifactId>
    	<version>4.10</version>
    	
    </dependency>
  </dependencies>
</project>

2.src/main/resources

匯入

3.指定輸出

4.將hadoop裡面配置好的4個xml檔案複製到src/main/resources

cp core-site.xml hdfs-site.xml mapred-site.xml yarn-site.xml /root/workspace/mapreduce/src/main/resources/

refresh一下src/main/resources這個資料夾

最後確認基本環境沒有問題,執行一下自帶的程式碼

好了,可以開始真正的MapReduce部分的程式碼啦

八股文寫程式碼

map:(k1,v1)->list(k2,V2)

reduce:(K2,list(v2)) ->list(K3,V3)

Context是上下文物件

mapper<longWritable,Text,Text,IntWritable>

longWritable是偏移量 Text是每行型別,對應java中的string 

Text,IntWritable 檔案中的key,value型別對應string和int

注意,map的輸出結果就是reduce的輸入結果

map程式碼中比較好的兩個地方:將value設定為常量1

不使用split函式太耗記憶體 寫程式碼的時候集中注意力到map和reduce函式即可

package com.hadoop.mapreduce;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {
	//step1 map class
	public static class WordCountMapper extends Mapper<LongWritable,Text,Text,IntWritable>{
		private Text mapOutputKey = new Text();
		private final static IntWritable mapOutputValue = new IntWritable(1);
		@Override
		public void map(LongWritable key, Text value,
				org.apache.hadoop.mapreduce.Mapper.Context context)
				throws IOException, InterruptedException {
			//line value
			String lineValue = value.toString();
			//split
			//lineValue.split(" "); 
			StringTokenizer strT = new StringTokenizer(lineValue);
			//iterator
			while(strT.hasMoreTokens()){
				String wordValue= strT.nextToken();
				mapOutputKey.set(wordValue);
				context.write(mapOutputKey, mapOutputValue);
			}
			
		}
	}
	//step2 reduce class
	public static class WordCountReducer  extends Reducer<Text,IntWritable,Text,IntWritable>{
		private IntWritable reduceOutputValue = new IntWritable();
		@Override
		public void reduce(Text key, Iterable<IntWritable> values,
				Context context)
				throws IOException, InterruptedException {
			//sum
			int sum = 0;
			//iterator
			for (IntWritable value:values){
				sum += value.get();
			}
			reduceOutputValue.set(sum);
			context.write(key, reduceOutputValue);
		}
	}
	//step3 driver class 
	public void run(String[] args) throws Exception{
		//1.get configuration
		Configuration conf = new Configuration();
		//2.create job
		Job job = Job.getInstance(conf,this.getClass().getSimpleName());
		//run jar
		job.setJarByClass(this.getClass());
		//set job input->map->reduce->output
		Path inpath = new Path(args[0]);
		FileInputFormat.addInputPath(job, inpath);
		//map
		job.setMapperClass(WordCountMapper.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(IntWritable.class);
		//reduce
		job.setReducerClass(WordCountReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		//output
		Path outPath = new Path(args[1]);
		FileOutputFormat.setOutputPath(job, outPath);
		//submit
		boolean isSuccess = job.waitForCompletion(true);
		if (isSuccess){
			System.out.println("success");
		}else{
			System.out.println("fail");
		}
	}
	public static void main(String[] args) throws Exception {
		new WordCount().run(args);
		
	}
}

4.測試執行

選擇main class 打包完了以後 新增許可權 chmod u+x wordCount.jar

bin/yarn jar jars/wordCount.jar inpath outpath