YARN框架&MapReduce框架
YARN框架&MapReduce框架
MapReduce實例:一個wordcount程序
統計一個相當大的數據文件中,每個單詞出現的個數。
分析map和reduce的工作
map:
- 切分單詞
- 遍歷單詞數據輸出
reduce:
對從map中得到的數據的valuelist遍歷累加,得到一個單詞的總次數
代碼
WordCountMapper(繼承Mapper)
重寫Mapper類的map方法。
mapreduce框架每讀一行數據就調用一次該方法,map的具體業務邏輯就寫在這個方法體中。
- map和reduce的數據輸入輸出都是以key-value對的形式封裝的
- 4個泛型中,前兩個(KEYIN, VALUEIN)指定mapper輸入數據的類型, 後兩個(KEYOUT, VALUEOUT)指定輸出數據的類型
- 默認情況下,框架傳遞給mapper的輸入數據中,key是要處理的文本中一行的起始偏移量,value是這行的內容
- 由於輸入輸出在結點中通過網絡傳遞,數據需要序列化,但JDK自帶的序列化機制會有附加信息冗余,對於大量數據傳輸不合適,因此
-> - 業務中要處理的數據已經作為參數key-value被傳遞進來了,處理後的輸出是調用context.write()寫入到context
package cn.thousfeet.hadoop.mapreduce.wordcount; import java.io.IOException; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class WordCountMapper extends Mapper<LongWritable, Text, Text, LongWritable>{ @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context) throws IOException, InterruptedException { String line = value.toString(); String[] words = StringUtils.split(line," "); //切分單詞 for(String word : words) //遍歷 輸出為key-value( <word,1> ) { context.write(new Text(word), new LongWritable(1)); } } }
WordCountReducer(繼承Reducer)
重寫Reducer類的reduce方法。
框架在map處理完成後,將所有的key-value對緩存起來進行分組,然後傳遞到一個組 <key,values{}>
(對於wordcount程序,拿到的就是類似<hello,{1,1,1,1...}>
),然後調用一次reduce方法。
package cn.thousfeet.hadoop.mapreduce.wordcount; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class WordCountReducer extends Reducer<Text, LongWritable, Text, LongWritable>{ @Override protected void reduce(Text key, Iterable<LongWritable> valueList, Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException { long count = 0; for(LongWritable value : valueList) //遍歷value list累加求和 { count += value.get(); } context.write(key, new LongWritable(count)); //輸出這一個單詞的統計結果 } }
WordCountRunner
用於描述job。
比如,該作業使用哪個類作為邏輯處理中的map,哪個作為reduce。還可以指定該作業要處理的數據所在的路徑,和輸出的結果放到哪個路徑。
package cn.thousfeet.hadoop.mapreduce.wordcount;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
public class WordCountRunner {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//設置整個job所用的那些類在哪個jar包
job.setJarByClass(WordCountRunner.class);
//指定job使用的mapper和reducer類
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
//指定reduce和mapper的輸出數據key-value類型
job.setOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
//指定mapper的輸出數據key-value類型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
//指定原始輸入數據的存放路徑
FileInputFormat.setInputPaths(job, new Path("/wordcount/srcdata/"));
//指定處理結果數據的存放路徑
FileOutputFormat.setOutputPath(job, new Path("/wordcount/output/"));
//將job提交給集群運行 參數為true時會打印運行進度
job.waitForCompletion(true);
}
}
上傳到集群中運行
export成一個jar包,上傳到虛擬機上。
分發到集群運行:hadoop jar wordcount.jar cn.thousfeet.hadoop.mapreduce.wordcount.WordCountRunner
查看輸出結果:
(可以看到按key的字典序升序排序)
如需方便在本地debug,可以直接run main方法(直接在本機的JVM運行),但要把輸入輸出路徑改為hdfs全路徑(或用在windows本地目錄下的數據也行,MapReduce程序的運行和數據來源在哪無關),並且在eclipse的設置 Run Configurations->arguments->vm arguments ,添加-DHADOOP_USER_NAME=對應用戶
yarn框架的運行機制
坑點
org.apache.hadoop.security.AccessControlException
運行程序後查看output文件夾能看到運行成功了,但是cat查看part-r-00000的時候報錯
error creating legacy BlockReaderLocal. Disabling legacy local reads.
org.apache.hadoop.security.AccessControlException: Can‘t continue with getBlockLocalPathInfo() authorization. The user thousfeet is not configured in dfs.block.local-path-access.user
解決方法是hdfs-site.xml中的配置項dfs.client.read.shortcircuit=false
woc,這個參數其實原本默認就是false...突然想起這不是上次配置出錯的時候病急亂投醫加上的嗎,果然亂跟教程害死人orzz
(參考:http://www.51testing.com/html/59/445759-821244.html)
YARN框架&MapReduce框架