Hadoop那些事兒(三)---MapReduce程式設計淺析
1.map和reduce
1.1 mapReduce處理邏輯
在本系列文章的第一篇中,曾對MapReduce原理做過簡單的描述,在這裡再重述一遍。
首先我們有兩個檔案word1.txt和word2.txt
其中word1.txt的內容如下:
aaaa
bbbb
cccc
dddd
aaaa
word2.txt的內容如下:
aaaa
cccc
dddd
eeee
aaaa
這裡的兩個檔案很小,我們先假設這兩個檔案很大,分別為64M和96M的大小,然後我們需要統計檔案中每個字串的數量,那麼MapReduce的處理流程如下:
Input:最左邊是輸入的過程,輸入了圖示的資料。
Split分片
Map:map階段是由程式設計人員通過程式碼來控制的,圖中所示的大概內容就是將字串分割開來,作為鍵儲存在map中,值的位置儲存1,表示數量。
shuffle洗牌:洗牌階段,由於之前生成map中存在很多鍵相同的map,在洗牌階段將鍵相同的進行合併。
Reduce
這樣最後輸出的資料就是每個字串出現的次數。
1.2 Hadoop資料型別
Hadoop本身提供了一套可優化網路序列化傳輸的基本型別
型別 | 含義 |
---|---|
BooleanWritable | 標準布林型數值 |
ByteWritable | 單位元組數值 |
DoubleWritable | 雙位元組數值 |
FloatWritable | 浮點數 |
IntWritable | 整型數 |
LongWritable | 長整型數 |
Text | 使用UTF8格式儲存的文字 |
NullWritable | 當中的key或value為空時使用 |
1.3 Mapper
Mapper類是一個泛型類,四個引數分別指定map函式的輸入鍵,輸入值,輸出鍵,輸出值
Mapper類包含四個方法:
setup方法在任務開始時呼叫一次,一般用來做map前的準備工作。
map承擔主要的處理工作,把輸入資料拆分為鍵值對。
cleanup方法則是在任務結束時呼叫一次,主要負責收尾工作。
run方法確定了setup-map-cleanup的執行模板。
map()方法的輸入是一個鍵和一個值,輸出是一個Context例項:
先了解到這裡,後續我們結合程式碼來進一步瞭解Mapper。
1.4 Reducer
Reducer類也是一個泛型類,與Mapper相似,四個引數分別指定map函式的輸入鍵,輸入值,輸出鍵,輸出值
Reducer類也包含四個方法:
setup方法在任務開始時呼叫一次,一般用來做reduce前的準備工作。
reduce承擔主要的處理工作,把輸入資料拆分為鍵值對。
cleanup方法則是在任務結束時呼叫一次,主要負責收尾工作。
run方法確定了setup-reduce-cleanup的執行模板。
注意,Reducer的輸入型別必須匹配Mapper的輸出型別。
2.程式碼分析
接下來我們來看一下上一篇文章用到的測試程式碼:
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class WordCount {
//繼承mapper介面,設定map的輸入型別為<Object,Text>
//輸出型別為<Text,IntWritable>
public static class Map extends Mapper<Object,Text,Text,IntWritable>{
//one表示單詞出現一次
private static IntWritable one = new IntWritable(1);
//word儲存切下的單詞
private Text word = new Text();
public void map(Object key,Text value,Context context) throws IOException,InterruptedException{
//對輸入的行切詞
StringTokenizer st = new StringTokenizer(value.toString());
while(st.hasMoreTokens()){
word.set(st.nextToken());//切下的單詞存入word
context.write(word, one);
}
}
}
//繼承reducer介面,設定reduce的輸入型別<Text,IntWritable>
//輸出型別為<Text,IntWritable>
public static class Reduce extends Reducer<Text,IntWritable,Text,IntWritable>{
//result記錄單詞的頻數
private static IntWritable result = new IntWritable();
public void reduce(Text key,Iterable<IntWritable> values,Context context) throws IOException,InterruptedException{
int sum = 0;
//對獲取的<key,value-list>計算value的和
for(IntWritable val:values){
sum += val.get();
}
//將頻數設定到result
result.set(sum);
//收集結果
context.write(key, result);
}
}
/**
* @param args
*/
public static void main(String[] args) throws Exception{
// TODO Auto-generated method stub
Configuration conf = new Configuration();
conf.set("mapred.job.tracker", "localhost:9001");
args = new String[]{"hdfs://localhost:9000/user/hadoop/input/count_in","hdfs://localhost:9000/user/hadoop/output/count_out"};
//檢查執行命令
String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs();
if(otherArgs.length != 2){
System.err.println("Usage WordCount <int> <out>");
System.exit(2);
}
//配置作業名
Job job = new Job(conf,"word count");
//配置作業各個類
job.setJarByClass(WordCount.class);
job.setMapperClass(Map.class);
job.setCombinerClass(Reduce.class);
job.setReducerClass(Reduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
WordCount類可以分為三部分,Map,Reduce和main三部分,Map和Reduce都是靜態內部類。
Map類繼承與Mapper類,四個引數表示其輸入鍵型別為Object,輸入值為文字,輸出鍵為文字,輸出值為整型數。
通過執行Map操作後,我們希望得到的結果是圖1中第三列mapping列的值,即將資料拆分後儲存到map中,每個字串的數量均儲存為1.
在程式碼中定義了一個整型型別的變數one,值為1,用來作為map的值。
map方法的前兩個引數分別為輸入的鍵和值,通過下面的程式碼先將text格式的欄位轉為java的String型別。
StringTokenizer st = new StringTokenizer(value.toString());
StringTokenizer 根據自定義字元為分界符對字串進行拆分並將結果集封裝提供對應的遍歷方法,有如下構造方法:
str為要拆分的字串,delim為界定符,當不指定delim時,將預設以空格進行拆分。
有如下方法:
其中hasMoreTokens方法用來判斷是否還有分隔符。
使用context的write方法將資料進行記錄。
Reduce類繼承於Reducer類,Reducer類是一個泛型類,四個引數分別表示輸入鍵,輸入值,輸出鍵,輸出值。其中輸入鍵和輸入值與Map類的輸出鍵,輸出值保持一致。
當資料到達reduce時,資料已經經過了洗牌,即鍵相同的資料進行了合併,所以reduce方法的key為鍵,values是一個迭代器,儲存著該鍵對應的所有值,然後在方法體中對該鍵對應的值得數量進行了統計。
如果我們在map方法中分別寫一句System.out.println(“map”)和System.out.println(“reduce”),就會發現map方法和reduce方法都不止被執行了一次。
main方法來控制任務的執行。
要知道,使用MapReduce框架時,我們僅僅只是填寫map和reduce部分的程式碼,其他的都交給mapreduce框架來處理,所以我們至少需要告訴mapreduce框架應該怎麼執行,main方法中的程式碼做的就是這個操作。
首先我們需要初始化Configuration類,使用MapReduce之前一定要初始化Configuration,該類主要用來讀取hdfs和Mapreduce的配置資訊。
args設定輸入檔案和輸出檔案的位置,這裡指向hdfs,輸出檔案的資料夾可以不存在,執行後會在指定目錄下自動生成,輸出檔案一定不能存在,在執行前要將上一次執行生成的輸出檔案刪除掉。
在上面的程式碼中我們是通過下面的程式碼來配置的:
conf.set("mapred.job.tracker", "localhost:9001");
我們也可以將該資訊新增到xml檔案中來配置,如下圖:
程式碼修改為:
接下來的if部分用來判斷是否有兩個引數都指定了。
再往下就是配置作業。首先建立一個Job類,然後裝載需要的各個類,從上到下分別為:程式類(我們編寫的java檔案的類名,這裡是WordCount),Mapper類(繼承了Mapper類的內部類,這裡是Map),
Combiner和Reducer類都指向繼承於Reducer的內部類Reduce.
(需要特別注意的是,Combiner並非一定要指向Reducer類,有時候也可以不指定,有時候不能指向Reducer而是需要單獨寫Combiner,只是這裡指向Reducer而已)
再往下兩行:
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
指定了輸出資料的鍵和值的型別,也是資料儲存到hdfs結果檔案中的型別。
下面的程式碼用來建立輸入檔案和輸出檔案:
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
最後一行程式碼表示執行成功後退出程式。