分散式處理框架——MapReduce
阿新 • • 發佈:2018-11-06
1 MapReduce 優點
- 海量資料離線處理;
- 易開發,易執行;
2 MapReduce 程式設計模型
- 將作業拆分成 Map 階段和 Reduce 階段
- Map階段: Map Tasks
- Reduce階段: Reduce Tasks
2.1 wordcount 案例
2.2 核心概念
- Split: 交由 MapReduce 作業來處理的資料塊,是MapReduce 中最小的計算單元;HDFS: blocksize 是HDFS 中的最小儲存單元,128M;預設情況下:Split 和 block 是一一對應的,也可以手動設定他們的關係(不建議)
- InputFormat
- OutputFormat
- Combiner
- Partitioner
3 MapReduce 架構
3.1 版本1.x
- JobTracker : JT, 作業管理者,將作業分解成一堆的任務:Task(MapTask , ReduceTask);將任務分配給 TaskTracker 執行;作業的監控、容錯處理(task作業掛了,重啟 task的機制); 在一定的時間間隔內,JT沒有收到 TT 的心跳,TT可能掛了,這個TT上指派的任務可能被指派到其他TT上執行;
- TaskTracker: TT, 任務的執行者,在TT 上執行 Task(MapTask , ReduceTask); 會與JT進行互動:執行、啟動、停止作業;傳送心跳資訊給 JT;
- MapTask: 自己開發的 map 任務交由該 Task 處理;將 map 的輸出結果寫到本地磁碟;
- ReduceTask : 對 Map Task 輸出的資料進行讀取;按照資料進行分組傳給我們自己編寫的 reduce 方法處理;
3.2 MapReduce2.x
3 wordcount 案例
3.1 原始碼
package com.bzt.cn.mapreduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org. apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.FileOutputStream;
import java.io.IOException;
/*
* MapReduce 版 wordcount
* */
public class WordCountApp {
public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
LongWritable one = new LongWritable(1);
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] words = line.split(" ");
for (String word : words) {
context.write(new Text(word), one);
}
}
}
/*
* Reducer ; 歸併操作
* */
public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
long sum = 0;
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
for (LongWritable value : values) {
sum += value.get();
}
context.write(key, new LongWritable(sum));
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
// 建立 job
Job job = Job.getInstance(conf, "WC");
// 設定 job 處理類
job.setJarByClass(WordCountApp.class);
//設定作業處理的輸入路徑
FileInputFormat.setInputPaths(job, new Path(args[0]));
//設定 map 相關的引數
job.setMapperClass(MyMapper.class);
job.setOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
//設定作業處理的輸出路徑
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
3.2 maven 打包成jar,上傳到叢集
執行 [[email protected] ~]$ hadoop jar wordcount.jar com.bzt.cn.mapreduce.WordCountApp hdfs://node1:8020/hello.txt hdfs://node1:8020/wcout
[[email protected] ~]$ hadoop fs -ls /wcout
18/10/30 09:38:45 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Found 2 items
-rw-r--r-- 1 hadoop supergroup 0 2018-10-30 09:38 /wcout/_SUCCESS
-rw-r--r-- 1 hadoop supergroup 30 2018-10-30 09:38 /wcout/part-r-00000
[[email protected] ~]$ hadoop fs -text /wcout/part-r-00000
18/10/30 09:39:02 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
hello 4
jerry 5
tom 7
world 8
[[email protected] ~]$
3.4 增強版
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
// 清理已經存在的輸出目錄
Path outputPath = new Path(args[1]);
FileSystem fileSystem = FileSystem.get(conf);
if(fileSystem.exists(outputPath)){
fileSystem.delete(outputPath);
System.out.println("output file deleted!");
}
// 建立 job
Job job = Job.getInstance(conf, "WC");
// 設定 job 處理類
job.setJarByClass(WordCountApp.class);
//設定作業處理的輸入路徑
FileInputFormat.setInputPaths(job, new Path(args[0]));
//設定 map 相關的引數
job.setMapperClass(MyMapper.class);
job.setOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
//設定作業處理的輸出路徑
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
4 Combiner
- 本地的 reducer
- 減少Map Tasks 輸出的資料量及資料網路傳輸量
- 適用場景:求和,次數
5 Partitioner
- Partitioner 決定 MapTask 輸出的資料交由哪個ReduceTask處理
- 預設實現: 分發的 key 的 hash 值對 Reduce Task 個數取模
5.1 測試資料
5.2 原始碼
package com.bzt.cn.mapreduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class PartitionerApp {
public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
LongWritable one = new LongWritable(1);
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] words = line.split(" ");
context.write(new Text(words[0]),new LongWritable(Long.parseLong(words[1])));
}
}
/*
* Reducer ; 歸併操作
* */
public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
long sum = 0;
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
for (LongWritable value : values) {
sum += value.get();
}
context.write(key, new LongWritable(sum));
}
}
public static class MyPartitioner extends Partitioner<Text,LongWritable>{
@Override
public int getPartition(Text key, LongWritable value, int numPartitions) {
if(key.toString().equals("dog")){
return 0;
}
if(key.toString().equals("cat")){
return 1;
}
if(key.toString().equals("duck")){
return 2;
}
return 3;
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
// 清理已經存在的輸出目錄
Path outputPath = new Path(args[1]);
FileSystem fileSystem = FileSystem.get(conf);
if(fileSystem.exists(outputPath)){
fileSystem.delete(outputPath);
System.out.println("output file deleted!");
}
// 建立 job
Job job = Job.getInstance(conf, "WC");
// 設定 job 處理類
job.setJarByClass(PartitionerApp.class);
//設定作業處理的輸入路徑
FileInputFormat.setInputPaths(job, new Path(args[0]));
//設定 map 相關的引數
job.setMapperClass(MyMapper.class);
job.setOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
//設定 job 的partition
job.setPartitionerClass(MyPartitioner.class);
//設定 4 個 reducer,每個分割槽一個
job.setNumReduceTasks(4);
//設定作業處理的輸出路徑
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
5.3 maven 打包,在叢集執行
[[email protected] ~]$ hadoop jar part.jar com.bzt.cn.mapreduce.PartitionerApp hdfs://node1:8020/animal.txt hdfs://node1:8020/partionerout
[[email protected] ~]$ hadoop fs -ls /partionerout
18/10/30 10:39:32 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Found 5 items
-rw-r--r-- 1 hadoop supergroup 0 2018-10-30 10:38 /partionerout/_SUCCESS
-rw-r--r-- 1 hadoop supergroup 6 2018-10-30 10:38 /partionerout/part-r-00000
-rw-r--r-- 1 hadoop supergroup 6 2018-10-30 10:38 /partionerout/part-r-00001
-rw-r--r-- 1 hadoop supergroup 7 2018-10-30 10:38 /partionerout/part-r-00002
-rw-r--r-- 1 hadoop supergroup 8 2018-10-30 10:38 /partionerout/part-r-00003
[[email protected] ~]$ hadoop fs -cat /partionerout/part-r-00000
18/10/30 10:40:04 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
dog 7
[[email protected] ~]$ hadoop fs -cat /partionerout/part-r-00001
18/10/30 10:40:10 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
cat 6
[[email protected] ~]$ hadoop fs -cat /partionerout/part-r-00002
18/10/30 10:40:17 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
duck 7
[[email protected] ~]$ hadoop fs -cat /partionerout/part-r-00003
18/10/30 10:40:22 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
lion 13
[[email protected] ~]$
6 jobhistory
- 記錄已執行完的 MapReduce 資訊到指定的HDFS目錄
- 預設關閉
6.1 配置 jobhistory
/home/hadoop/apps/hadoop-2.6.0-cdh5.7.0/etc/hadoop
mapred-site.xml
<configuration>
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
<property>
<name>mapreduce.jobhistory.address</name>
<value>node1:10020</value>
</property>
<property>
<name>mapreduce.jobhistory.webapp.address</name>
<value>node1:19888</value>
</property>
<property>
<name>mapreduce.jobhistory.done-dir</name>
<value>/history/done</value>
</property>
<property>
<name>mapreduce.jobhistory.intermediate-done-dir</name>
<value>/history/done_intermediate</value>
</property>
</configuration>
6.2 啟動 history server
重啟一下 yarn
[[email protected] ~]$ mr-jobhistory-daemon.sh start historyserver
[[email protected] ~]$ mr-jobhistory-daemon.sh start historyserver
starting historyserver, logging to /home/hadoop/apps/hadoop-2.6.0-cdh5.7.0/logs/mapred-hadoop-historyserver-node1.out
[[email protected] ~]$ jps
6704 JobHistoryServer
6738 Jps
1395 DataNode
6245 ResourceManager
1271 NameNode
1559 SecondaryNameNode
6346 NodeManager
[[email protected] ~]$
6.3 測試
[[email protected] ~]$ cd /home/hadoop/apps/hadoop-2.6.0-cdh5.7.0/share/hadoop/mapreduce2
[[email protected] mapreduce2]$ clear
[[email protected] mapreduce2]$ hadoop jar hadoop-mapreduce-examples-2.6.0-cdh5.7.0.jar pi 2 3
訪問 http://node1:19888/jobhistory
點進去看 log
聚合沒有開啟
配置 yarn-site.xml
<configuration>
<!-- Site specific YARN configuration properties -->
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
<property>
<name>yarn.log-aggregation-enable</name>
<value>true</value>
</property>
</configuration>
重啟 yarn,再跑一次 pi