MapReduce的執行流程和程式設計規範以及簡單的WordCount案例
阿新 • • 發佈:2020-12-18
技術標籤:hadoopmapReduce大資料hadoopmapreduce
▶ \blacktriangleright ▶ MapReduce程式設計規範
-
Mapper階段(以WordCount為例)
- 使用者自定義的類要繼承Mapper類
- 輸入的資料是KV對的形式(K:當前行數,V:當前行的內容)
- Mapper中的業務邏輯寫在map()方法中,也就是說要重寫map方法
- 輸出資料也是KV對的形式(K:每個單詞 V: 1 )
- map方法對每個K V呼叫一次
-
Reducer階段
8i9=-0o- 使用者自定義的類要繼承Reducer類
- 輸入型別詩句對應的時Mapper的輸出資料型別,也是KV
- Reducer的業務邏輯寫在reduce方法中
- ReduceTask程序對每一組相同K的的<K,V>組呼叫一側reduce方法
-
Driver階段
相當於yarn叢集的客戶端,用來提交我們整個程式到yarn叢集,提交的是封裝了MapRdeuce程式相關執行引數的job物件
▶ \blacktriangleright ▶ 案例實操
-
在F盤新建一個hello.txt
內容:Hello World Hello Hadoop Hello MapReduce
-
新建WordCountMapper.java
import org.apache.hadoop.io.IntWritable;
import org.apache. hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class WordCountMapper extends Mapper<LongWritable, Text,Text, IntWritable> {
Text k = new Text();
IntWritable v = new IntWritable();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();//行的內容
String[] words = line.split(" ");//空格拆分成單詞
for(String word : words){
k.set(word);
v.set(1);
context.write(k,v);
}
}
}
- 新建WordCountReducer.java
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class WordCountReducer extends Reducer<Text, IntWritable, Text,IntWritable> {
/*
實際拿到的
hadoop,1
hello ,(1,1,1)
mapreducer,1
world ,1
*/
int sum = 0;
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
for(IntWritable i : values){
sum+=i.get();//結果轉成int型
}
context.write(key,new IntWritable(sum));
}
}
- 新建Driver.java
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class Driver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Job job = Job.getInstance(); //封裝成Job物件
job.setMapOutputKeyClass(Text.class);//map輸出的key的型別
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);//reducer輸出的key的型別
job.setOutputValueClass(IntWritable.class);
job.setMapperClass(WordCountMapper.class);//設定返回的型別
job.setReducerClass(WordCountReducer.class);
FileInputFormat.setInputPaths(job,new Path("F://hello.txt"));//讀取檔案路徑
FileOutputFormat.setOutputPath(job,new Path("F://helloOutput"));//結果輸出的路徑
job.waitForCompletion(true);//完成之後退出
}
}
▶ \blacktriangleright ▶ MapReduce的執行流程
內容:Hello World
Hello Hadoop
Hello MapReduce
-
分片
分片過程中主要的任務就是確定資料大小。這個值一旦確定就可以依次將原始檔按值進行劃分,小於這個值就成為單獨的一個區,大於這個值就按這個值劃分,不足的部分成為單獨的一個區。hadoop2.x的中這個值是128MB,也就是說原始檔有200MB時,就分成2個區,一個區是0-128,剩下的是一個單獨的區
2.輸入格式化
是將劃分好的區格式轉化成<key,value>形式的資料,其中key是行數,value是當前行的內容。
每解析出一個數據源,就交給Mapper處理一次
根據如上內容,解析出的資料來源應該是
<1,Hello World>
<2,Hello Hadoop>
<3,Hello MapReduce>
3.Map過程
Mapper接收<key,value>形式的資料,並處理成<key,value>形式的資料
在WOrdCount中,Mapper會解析傳過來的key值,以空格字元為標誌,如果碰到空格就把之前累計的字串作為輸出的key值 ,1當作value的值
根據如上內容,輸出的內容是
<Hello,1>
<World,1>
<Hello,1>
<Hadoop,1>
<Hello,1>
<MapReduce,1>
4.Shuffle過程
主要是對Map輸出結果進行分割槽、排序、合併等處理並交給Reduce的過程
根據如上內容,處理之後的結果是
<Hadoop,{1}>
<Hello,{1,1,1}>
<MapReduce,{1}>
<World,{1}>
5.Reduce過程
Reduce接收<key,{value list}>形式的資料流,形成<key,value>形式的資料輸出,輸出資料直接寫入HDFS
在WordCount中,Reducer會將相同的key的value list進行累加,然後得到單詞的總次數,然後輸出
根據如上內容,寫入的資料是
<Hadoop,{1}> -------------><hadoop,1>
<Hello,{1,1,1}>-------------><Hello,3>
<MapReduce,{1}> -------------><MapReduce,1>
<World,{1}> -------------><World,1>
-
輸出
MapReduce看框架自動把Reducer生成的<key,value>傳入write()方法,write()方法實現檔案的寫入
在WordCount中,呼叫的是預設的文字寫入的方法