1. 程式人生 > >初識Hadoop-MapReduce

初識Hadoop-MapReduce

ron final tails fin 包括 src 直接 log num

原文地址

https://blog.csdn.net/zhruixuan/article/details/85549618

Hadoop的核心是HDFSYARNMapReduce。今天先來認識一下MapReduce

MapReduce是什麽

MapReduce是Hadoop中的一種處理大規模數據的編程模型,得益於MapReduce的並行計算,可以高效的處理大規模數據(一般是HDFS中存儲的數據)。 顧名思義,MapReduce分為兩個處理階段(對於開發者來說),Map階段和Reduce階段。每個階段都以Key-Value作為輸入輸出,Key-Value的類型由開發者選擇。map階段一般可以用於對原始數據進行預處理,過濾,數據校驗等操作。數據經過map之後會經過MapReduce框架的Shuffle階段,對map處理過的數據進行排序和分組

。最後數據被傳遞給Reduce階段進行結果數據的計算。

一個java版本的示例程序

我們的目標是從記錄有每一年份每一個站點的氣候信息的文件中找出每一年的最高溫度。文件中每一行數據格式如下圖,加粗表示的是年份和溫度信息。 技術分享圖片

要實現MapReduce需要實現兩個函數,map函數reduce函數。實現map函數可以通過實現Mapper借口來完成。

map函數

  1. import java.io.IOException;
  2. import org.apache.hadoop.io.IntWritable;
  3. import org.apache.hadoop.io.LongtWritable;
  4. import org.apache.hadoop.io.Text;
  5. import org.apache.hadoop.mapreduce.Mapper;
  6. //Mapper的泛型分別表示map函數的輸入鍵類型,輸入值類型,輸出鍵類型,輸出值類型
  7. public class MaxTemeratureMapper extents MapReduceBase implements Mapper<LongWritable,Text,Text,IntWritable>{
  8. private static final int MISSING = 9999;
  9. @Override
  10. //輸入鍵是一行文本的行數偏移值,輸入值是一行文本
  11. public void map(LongWritable key, Text value, Context context) throws IOException,InterrupedException{
  12. String line = value.toString();
  13. String year = line.substring(15,19);
  14. int airTemperature;
  15. if(line.charAt(87) == ‘+‘){
  16. airTemperature = Integer.parseInt(line.substring(88,92));
  17. }else{
  18. airTemperature = Integer.parseInt(line.substring(87,92));
  19. }
  20. String quality = line.substring(92,93);
  21. if(airTemperature != MISSING && quality.matches("[01456]")){
  22. //輸出給shuffle
  23. context.write(new Text(year),new IntWritable(airTemperature));
  24. }
  25. }
  26. }

類似的書寫reducer函數 reducer函數的輸入類型必須與map函數的輸出類型一致。

reducer函數

  1. import org.apache.hadoop.io.IntWritable;
  2. import org.apache.hadoop.io.Text;
  3. import org.apache.hadoop.mapreduce.Reducer;
  4. import java.io.IOException;
  5. public class MaxTemperatureReducer extends Reducer<Text,IntWritable,Text,IntWritable>{
  6. @Override
  7. //經過shuffle分組處理後values變成了一個集合
  8. public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException,InterrupedException{
  9. int maxValue = Integer.MIN_VALUE;
  10. for(IntWritable value : values){
  11. maxValue = Math.max(maxValue,value.get());
  12. }
  13. context.write(key, new IntWritable(maxValue));
  14. }
  15. }

通過一個入口函數來啟動MapReduce 入口main

  1. import java.io.IOException;
  2. import org.apache.hadoop.fs.Path;
  3. import org.apache.hadoop.io.IntWritable;
  4. import org.apache.hadoop.io.Text;
  5. import org.apache.hadoop.mapreduce.Job;
  6. import org.apache.hadoop.mapreduce.input.FileOutputFormat;
  7. import org.apache.hadoop.mapreduce.input.FileInputFormat;
  8. public class MaxTemperature(){
  9. public static void main(String[] args) throws Exception{
  10. if(args.length != 2){
  11. System.out.println("請輸入文件路徑和結果輸出路徑");
  12. System.exit(-1);
  13. }
  14. Job job = new Job();
  15. job.setJarByClass(MaxTemperature.class);
  16. job.setJobName("max temperature");
  17. //可以是單個文件,也可以是一個目錄
  18. //如果是目錄,會將該目錄下的所有文件當做輸入
  19. //可以多次調用該方法
  20. FileInputFormat.addInputPath(job,new Path(args[0]));
  21. //只能有一個輸出路徑
  22. //在運行作業前該目錄不應該存在,否則會報錯
  23. FileInputFormat.setOutputPath(job,new Path(args[1]));
  24. job.setMapperClass(MaxTemeratureMapper.class);
  25. job.setReducerClass(MaxTemperatureReducer.class);
  26. job.setOutputKeyClass(Text.class);
  27. job.setOutputValueClass(IntWritable.class);
  28. //true表示將進度信息寫到控制臺
  29. System.exit(job.waitForCompletion(true) ? 0 : 1);
  30. }
  31. }

setOutputKeyClass()和setOutputValueClass()方法控制reduce函數的輸出類型,並且必須和Reduce類產生的相匹配。map函數的輸出類型如果和reduce的輸入類型不同,則必須通過setMapOutputKeyClass()和setMapOutputValueClass()方法類設置map函數的輸出類型。 單機測試運行 //將應用添加到類路徑 % export HADOOP_CLASSPATH=hadoop-example.jar //運行MaxTemperature % hadoop MaxTemperature input/ncdc/sample.txt output

在output目錄可以看到輸出文件 % cat output/part-r-00000 1949 111 1950 22

橫向擴展(並行計算)

以上MapReduce可以通過YARN(資源管理系統)處理HDFS中的數據。這樣Hadoop會將MapReduce計算轉移到存儲部分數據的各臺機器上並行執行任務。

我們把客戶端提交的MapReduce程序,數據,配置信息稱為MapReduce作業(Job)。Hadoop將作業分成若幹個任務(Task)來執行,其中包括兩類任務map任務和reduce任務,這些任務運行在集群的節點上,通過YARN進行調度,如果一個任務失敗,會在另一個節點上重新運行。 Hadoop將map的輸入數據劃分成大小相等的數據塊,稱為輸入分片(一般為一個HDFS塊大小,默認是128M),Hadoop為每個分片構建一個map任務,該任務運行用戶的map函數。 Hadoop在存儲有輸入數據的節點(HDFS)上運行map任務可以提升效率(稱為數據本地化優化),但是當存儲有輸入數據的節點有其他map任務時,Hadoop會盡量從某一數據塊所在的機架中的一個節點上選擇一個空閑的map槽(slot)來運行該map任務。 這也是為什麽最佳分片的大小應該和HDFS的塊大小相同,因為如果分片跨越兩個數據塊,那麽對於任何一個HDFS節點都不太可能存儲 這兩個數據塊。 map任務的輸出只是會存到本地磁盤,而不會寫入HDFS,因為是中間結果。

reduce任務不具備數據本地化優勢,因為單個reduce的輸入通常來自於所有的map任務輸出,本例中我們只有一個reduce任務,他的輸入就是所有的map任務輸出。 reduce任務的數量不是由輸入數據的大小決定的,而是需要獨立制定。

單個Reduce任務數據流

技術分享圖片

多個Reduce任務數據流

reduce任務的輸入通常來自不同的map任務,通過shuffle(混洗)分配到不同的reduce任務。 技術分享圖片

沒有Reduce任務的數據流

沒有Reduce任務時,數據處理可以完全並行,直接由map任務進行最終結果的HDFS寫入。 技術分享圖片

Combiner函數減少數據傳輸

通過combiner函數可以減少map函數與reduce函數之間的數據傳輸。 可以為map制定一個combiner,combiner的輸出作為reduce的輸入。 舉個例子 假設1950年的輸入數據由兩個map任務處理(因為他們在不同的分片中),假設第一個map輸出如下: (1950,0) (1950,10) (1950,20) 第二個map輸出如下: (1950,25) (1950,15) 那麽reduce 函數調用是輸入將是 (1950, [0,10,20,25,15]) 然後計算出 (1950,25) 其實我們可以對map的結果調用reduce計算出每個map的最大氣溫,然後再傳遞給reduce

  1. import java.io.IOException;
  2. import org.apache.hadoop.fs.Path;
  3. import org.apache.hadoop.io.IntWritable;
  4. import org.apache.hadoop.io.Text;
  5. import org.apache.hadoop.mapreduce.Job;
  6. import org.apache.hadoop.mapreduce.input.FileOutputFormat;
  7. import org.apache.hadoop.mapreduce.input.FileInputFormat;
  8. public class MaxTemperature(){
  9. public static void main(String[] args) throws Exception{
  10. if(args.length != 2){
  11. System.out.println("請輸入文件路徑和結果輸出路徑");
  12. System.exit(-1);
  13. }
  14. Job job = new Job();
  15. job.setJarByClass(MaxTemperature.class);
  16. job.setJobName("max temperature");
  17. //可以是單個文件,也可以是一個目錄
  18. //如果是目錄,會將該目錄下的所有文件當做輸入
  19. //可以多次調用該方法
  20. FileInputFormat.addInputPath(job,new Path(args[0]));
  21. //只能有一個輸出路徑
  22. //在運行作業前該目錄不應該存在,否則會報錯
  23. FileInputFormat.setOutputPath(job,new Path(args[1]));
  24. job.setMapperClass(MaxTemeratureMapper.class);
  25. //**設置Combiner**
  26. job.setCombinerClass(MaxTemperatureReducer.class);
  27. job.setReducerClass(MaxTemperatureReducer.class);
  28. job.setOutputKeyClass(Text.class);
  29. job.setOutputValueClass(IntWritable.class);
  30. //true表示將進度信息寫到控制臺
  31. System.exit(job.waitForCompletion(true) ? 0 : 1);
  32. }
  33. }

初識Hadoop-MapReduce