MapReduce平行計算框架
1.思想:分而治之
map:對每一部分資料進行處理
reduce:合併
2.資料流動的形式是<key,value>
1.
Map階段由一定數量的Map Task組成
*輸入資料格式解析:InputFormat
*輸入資料處理:Mapper
*資料分組:Partitioner
2.
Reduce階段由一定數量的Reduce任務組成
*資料遠端拷貝
*資料按照Key排序
*資料處理:Reducer
*資料輸出格式:OutputFormat
3.編寫MapReduce程式
1.pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.hadoop</groupId> <artifactId>mapreduce</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>mapreduce</name> <url>http://maven.apache.org</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.5.0</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.10</version> </dependency> </dependencies> </project>
2.src/main/resources
匯入
3.指定輸出
4.將hadoop裡面配置好的4個xml檔案複製到src/main/resources
cp core-site.xml hdfs-site.xml mapred-site.xml yarn-site.xml /root/workspace/mapreduce/src/main/resources/
refresh一下src/main/resources這個資料夾
最後確認基本環境沒有問題,執行一下自帶的程式碼
好了,可以開始真正的MapReduce部分的程式碼啦
八股文寫程式碼
map:(k1,v1)->list(k2,V2)
reduce:(K2,list(v2)) ->list(K3,V3)
Context是上下文物件
mapper<longWritable,Text,Text,IntWritable>
longWritable是偏移量 Text是每行型別,對應java中的string
Text,IntWritable 檔案中的key,value型別對應string和int
注意,map的輸出結果就是reduce的輸入結果
map程式碼中比較好的兩個地方:將value設定為常量1
不使用split函式太耗記憶體 寫程式碼的時候集中注意力到map和reduce函式即可
package com.hadoop.mapreduce; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class WordCount { //step1 map class public static class WordCountMapper extends Mapper<LongWritable,Text,Text,IntWritable>{ private Text mapOutputKey = new Text(); private final static IntWritable mapOutputValue = new IntWritable(1); @Override public void map(LongWritable key, Text value, org.apache.hadoop.mapreduce.Mapper.Context context) throws IOException, InterruptedException { //line value String lineValue = value.toString(); //split //lineValue.split(" "); StringTokenizer strT = new StringTokenizer(lineValue); //iterator while(strT.hasMoreTokens()){ String wordValue= strT.nextToken(); mapOutputKey.set(wordValue); context.write(mapOutputKey, mapOutputValue); } } } //step2 reduce class public static class WordCountReducer extends Reducer<Text,IntWritable,Text,IntWritable>{ private IntWritable reduceOutputValue = new IntWritable(); @Override public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { //sum int sum = 0; //iterator for (IntWritable value:values){ sum += value.get(); } reduceOutputValue.set(sum); context.write(key, reduceOutputValue); } } //step3 driver class public void run(String[] args) throws Exception{ //1.get configuration Configuration conf = new Configuration(); //2.create job Job job = Job.getInstance(conf,this.getClass().getSimpleName()); //run jar job.setJarByClass(this.getClass()); //set job input->map->reduce->output Path inpath = new Path(args[0]); FileInputFormat.addInputPath(job, inpath); //map job.setMapperClass(WordCountMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); //reduce job.setReducerClass(WordCountReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //output Path outPath = new Path(args[1]); FileOutputFormat.setOutputPath(job, outPath); //submit boolean isSuccess = job.waitForCompletion(true); if (isSuccess){ System.out.println("success"); }else{ System.out.println("fail"); } } public static void main(String[] args) throws Exception { new WordCount().run(args); } }
4.測試執行
選擇main class 打包完了以後 新增許可權 chmod u+x wordCount.jar
bin/yarn jar jars/wordCount.jar inpath outpath