大資料第四天——MapReduce原理及IDEA Maven下WordCount例項兩種實現
1.MapReduce概述
HDFS實現了分散式檔案儲存,儲存問題解決了,我們就需要考慮如何對資料進行處理,MapReduce是一個計算框架(程式設計模型),基於該計算框架,可以很容易的編寫資料處理程式,從而以較高的效率處理海量資料集。
MR框架對於程式設計師的最大意義在於,不需要掌握分散式計算程式設計,不需要考慮分散式程式設計裡可能存在的種種難題,比如任務排程和分配、檔案邏輯切塊、位置追溯、工作。這樣,程式設計師能夠把大部分精力放在核心業務層面上,大大簡化了分散式程式的開發和調試周期。
2.MapReduce的設計思想
MapReduce的設計思想簡單概括而言,就是“分而治之”。整個MapReduce階
段分為兩大部分,分別是Map、Reduce。
(1)map負責“分”,即把複雜的任務分解為若干個“簡單的任務”來處理。“簡單的任務”包含三層含義:一是資料或計算的規模相對原任務要大大縮小;二是就近計算原則,即任務會分配到存放著所需資料的節點上進行計算;三是這些小任務可以平行計算,彼此間幾乎沒有依賴關係。
(2)reducer負責”合”,即對map階段輸出的處理結果進行彙總。
3.MapReduce組成架構
在MapReduce程式進行計算時,需要yarn進行資源管理,yarn主要分為ResourceManager和NodeManager兩部分。
ResourceManager
NodeManager是每個DataNode節點上的資源和工作管理員,一方面,它會定時地向ResourceManager彙報本節點上的資源使用情況以及執行狀態;另一方面,它接收並處理來自ResourceManager分配的任務。
4.WordCount第一種實現(jar)
1.啟動MapReduce(先啟動hadoop,命令:sh start-dfs.sh)
在啟動hadoop前提下,進入hadoop中的sbin目錄,執行:sh start-yarn.sh
通過jps檢視是否啟動:
如果有NodeManager和ResourceManager,則證明啟動成功。
2.IDEA的pom.xml檔案中需要的架包
<!-- https://mvnrepository.com/artifact/commons-beanutils/commons-beanutils -->
<dependency>
<groupId>commons-beanutils</groupId>
<artifactId>commons-beanutils</artifactId>
<version>1.9.3</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-mapreduce-client-jobclient -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
<version>2.7.1</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-mapreduce-client-core -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>2.7.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-hdfs -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.7.1</version>
</dependency>
3.封裝工具類-ClassUtil和MrUtil(自定義的封裝,不是必要的,這是為了減少程式碼)
/**
* Title: ClassUtil
* Description: 抽象物件類
*
* @author dtt
* @data 2018-09-06 23:14
**/
public class ClassUtil {
private Class classname;
private Class okey;
private Class ovalue;
public ClassUtil(Class a,Class b,Class c){
this.classname = a;
this.okey = b;
this.ovalue = c;
}
public Class getClassname() { return classname; }
public Class getOkey() { return okey; }
public Class getOvalue() { return ovalue; }
}
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* Title: MrUtil
* Description: MapReduce工具類
*
* @author dtt
* @data 2018-09-06 22:50
**/
public class MrUtil {
private Configuration conf;
public MrUtil(){ this.conf = new Configuration(); }
public void go(String jobname, Class mapreduce, ClassUtil map, ClassUtil reduce,String iPath,String oPath) throws Exception{
// 建立job物件
Job job = Job.getInstance(conf,jobname);
// 設定執行job的類
job.setJarByClass(mapreduce);
// 設定 mapper 類
job.setMapperClass(map.getClassname());
// 設定 map 輸出的key value
job.setMapOutputKeyClass(map.getOkey());
job.setOutputValueClass(map.getOvalue());
// 設定 reduce 類
job.setReducerClass(reduce.getClassname());
// 設定reduce 輸出的key value
job.setOutputKeyClass(reduce.getOkey());
job.setOutputValueClass(reduce.getOvalue());
// 設定輸入輸出路徑
FileInputFormat.setInputPaths(job,new Path(iPath));
FileOutputFormat.setOutputPath(job,new Path(oPath));
// 提交job
boolean b = job.waitForCompletion(true);
if(!b){
System.out.println(jobname + " task fail!");
} else{
System.out.println(jobname + " task success");
}
}
}
4.程式碼
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* Title: WordCountMapper
* Description: wordCount mapper程式
* @author dtt
* @data 2018-09-06 10:20
**/
/*
*Mapper<LongWritable, Text, Text, IntWritable>
* 其中的4個型別分別是:輸入key型別、輸入value型別、輸出key型別、輸出value型別。
* */
public class WordCountMapper extends Mapper<LongWritable, Text,Text, IntWritable> {
protected void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException {
//得到輸入的每一行資料
String line = value.toString();
//通過空格分割
String[] words = line.split(" ");
//迴圈遍歷 輸出
for (String word : words) {
context.write(new Text(word),new IntWritable(1));
}
}
}
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* Title: WordCountReduce
* Description: wordcount reduce程式
* @author dtt
* @data 2018-09-06 10:28
**/
/*
* Reducer<Text, IntWritable, Text, IntWritable>
* 4個型別分別指:輸入key的型別、輸入value的型別、輸出key的型別、輸出value的型別。
* */
public class WordCountReduce extends Reducer<Text, IntWritable,Text,IntWritable> {
protected void reduce(Text key,Iterable<IntWritable> values,Context context) throws IOException,InterruptedException{
Integer count = 0;
for (IntWritable value : values) {
count += value.get();
}
context.write(key, new IntWritable(count));
}
}
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* Title: WordCountMapReduce
* Description: wordcount 主程式
* @author dtt
* @data 2018-09-06 10:33
**/
public class WordCountMapReduce {
private static String myurl = "hdfs://自己的IP:9000";
public static void main(String[] args) throws Exception {
ClassUtil map = new ClassUtil(WordCountMapper.class,Text.class,IntWritable.class);
ClassUtil reduce = new ClassUtil(WordCountReduce.class,Text.class,IntWritable.class);
Class mr = WordCountMapReduce.class;
MrUtil job = new MrUtil();
job.go("wordcounnt",mr,map,reduce,myurl+"/wordcount/input",myurl+"/wordcount/output");
}
}
5.兩種實現
1).jar包實現,點選IDEA右側的Maven Project。點選裡面的package。
在xshell中通過rz命令上傳word.jar到指定目錄
執行jar包:hadoop jar word.jar
2).直接執行。