1. 程式人生 > >YARN框架&MapReduce框架

YARN框架&MapReduce框架

次數 AR prot cda mat 所有 args utf 提交

YARN框架&MapReduce框架

MapReduce實例:一個wordcount程序

統計一個相當大的數據文件中,每個單詞出現的個數。

分析map和reduce的工作

map:

  1. 切分單詞
  2. 遍歷單詞數據輸出

reduce:

對從map中得到的數據的valuelist遍歷累加,得到一個單詞的總次數

代碼

WordCountMapper(繼承Mapper)

重寫Mapper類的map方法。

mapreduce框架每讀一行數據就調用一次該方法,map的具體業務邏輯就寫在這個方法體中。

  1. map和reduce的數據輸入輸出都是以key-value對的形式封裝的
  2. 4個泛型中,前兩個(KEYIN, VALUEIN)指定mapper輸入數據的類型, 後兩個(KEYOUT, VALUEOUT)指定輸出數據的類型
  3. 默認情況下,框架傳遞給mapper的輸入數據中,key是要處理的文本中一行的起始偏移量,value是這行的內容
  4. 由於輸入輸出在結點中通過網絡傳遞,數據需要序列化,但JDK自帶的序列化機制會有附加信息冗余,對於大量數據傳輸不合適,因此 ->
  5. 業務中要處理的數據已經作為參數key-value被傳遞進來了,處理後的輸出是調用context.write()寫入到context
package cn.thousfeet.hadoop.mapreduce.wordcount;

import java.io.IOException;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;


public class WordCountMapper extends Mapper<LongWritable, Text, Text, LongWritable>{
    
    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context)
            throws IOException, InterruptedException {
        
        String line = value.toString();
        
        String[] words = StringUtils.split(line," "); //切分單詞
        
        for(String word : words) //遍歷 輸出為key-value( <word,1> )
        {
            context.write(new Text(word), new LongWritable(1));
        }
    
    }
    
}

WordCountReducer(繼承Reducer)

重寫Reducer類的reduce方法。

框架在map處理完成後,將所有的key-value對緩存起來進行分組,然後傳遞到一個組 <key,values{}>(對於wordcount程序,拿到的就是類似<hello,{1,1,1,1...}>),然後調用一次reduce方法。

package cn.thousfeet.hadoop.mapreduce.wordcount;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WordCountReducer extends Reducer<Text, LongWritable, Text, LongWritable>{

    @Override
    protected void reduce(Text key, Iterable<LongWritable> valueList,
            Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException {
        
        long count = 0;
        
        for(LongWritable value : valueList) //遍歷value list累加求和
        {
            count += value.get();
        }
        
        context.write(key, new LongWritable(count)); //輸出這一個單詞的統計結果
    }
}

WordCountRunner

用於描述job。

比如,該作業使用哪個類作為邏輯處理中的map,哪個作為reduce。還可以指定該作業要處理的數據所在的路徑,和輸出的結果放到哪個路徑。

package cn.thousfeet.hadoop.mapreduce.wordcount;

import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;

public class WordCountRunner {

        public static void main(String[] args) throws Exception {
            
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf);
            
            //設置整個job所用的那些類在哪個jar包
            job.setJarByClass(WordCountRunner.class);
            
            //指定job使用的mapper和reducer類
            job.setMapperClass(WordCountMapper.class);
            job.setReducerClass(WordCountReducer.class);
            
            //指定reduce和mapper的輸出數據key-value類型
            job.setOutputKeyClass(Text.class);
            job.setMapOutputValueClass(LongWritable.class);
            
            //指定mapper的輸出數據key-value類型
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(LongWritable.class);
            
            //指定原始輸入數據的存放路徑
            FileInputFormat.setInputPaths(job, new Path("/wordcount/srcdata/"));
            
            //指定處理結果數據的存放路徑
            FileOutputFormat.setOutputPath(job, new Path("/wordcount/output/"));
        
            //將job提交給集群運行 參數為true時會打印運行進度
            job.waitForCompletion(true);
        }
}

上傳到集群中運行

export成一個jar包,上傳到虛擬機上。

分發到集群運行:hadoop jar wordcount.jar cn.thousfeet.hadoop.mapreduce.wordcount.WordCountRunner

查看輸出結果:
技術分享圖片

(可以看到按key的字典序升序排序)

如需方便在本地debug,可以直接run main方法(直接在本機的JVM運行),但要把輸入輸出路徑改為hdfs全路徑(或用在windows本地目錄下的數據也行,MapReduce程序的運行和數據來源在哪無關),並且在eclipse的設置 Run Configurations->arguments->vm arguments ,添加-DHADOOP_USER_NAME=對應用戶

yarn框架的運行機制

技術分享圖片


坑點

org.apache.hadoop.security.AccessControlException

運行程序後查看output文件夾能看到運行成功了,但是cat查看part-r-00000的時候報錯

error creating legacy BlockReaderLocal. Disabling legacy local reads.
org.apache.hadoop.security.AccessControlException: Can‘t continue with getBlockLocalPathInfo() authorization. The user thousfeet is not configured in dfs.block.local-path-access.user

解決方法是hdfs-site.xml中的配置項dfs.client.read.shortcircuit=false
woc,這個參數其實原本默認就是false...突然想起這不是上次配置出錯的時候病急亂投醫加上的嗎,果然亂跟教程害死人orzz

(參考:http://www.51testing.com/html/59/445759-821244.html)

YARN框架&MapReduce框架