1. 程式人生 > >Hadoop—MapReducer統計文件的單詞出現的個數

Hadoop—MapReducer統計文件的單詞出現的個數

key 都是 val 兩個 一份 sta rdquo water site

1. MapReduce 統計文件的單詞出現的個數

Mapper: 處理具體文本,發送結果
Reducer: 合並各個Mapper發送過來的結果
Job: 制定相關配置,框架

Mapper

package cn.itcast.hadoop.mr.wordcount;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.util.StringUtils; // 4個泛型中,前兩個是指定mapper輸入數據的類型 // map和 reducer的輸入和輸出都是key-value對的形式 // 默認情況下。框架輸入的我們mapper的輸入數據中,key是要處理的文本中的一行的起始偏移量, 內容就是value public class WCMapper extends Mapper <LongWritable, Text, Text, LongWritable> { // 每讀一次數據,就調一次這個方法 @Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{ //具體業務邏輯就像和在這裏, 傳入數據就是 key, value //將這一行的內容轉化成string類型 String line = value.toString(); // 對這一行的文本按特定分隔符切分 String[] words = StringUtils.split(line, ‘ ‘);
// 便利這個單詞數組, 輸出為kv形式 k:單詞 v:1 for (String word : words) { context.write(new Text(word), new LongWritable(1)); } } }

Reducer

package cn.itcast.hadoop.mr.wordcount;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WCReducer extends Reducer<Text, LongWritable, Text, LongWritable> {

    @Override
    protected void reduce(Text Key, Iterable<LongWritable> values, Context context) 
        throws IOException, InterruptedException {

        long count = 0;

        for (LongWritable value:values) {
            count += value.get();
        }

        context.write(Key, new LongWritable(count));
    }
}

Job

package cn.itcast.hadoop.mr.wordcount;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.io.Text;
//import com.sun.jersey.core.impl.provider.entity.XMLJAXBElementProvider.Text;

public class WCRunner {

    public static void main(String[] args) throws Exception{

        Configuration conf = new Configuration();

        Job wcJob = Job.getInstance(conf);

        wcJob.setJarByClass(WCRunner.class);

        //本job使用的mapper和reducer類
        wcJob.setMapperClass(WCMapper.class);
        wcJob.setReducerClass(WCReducer.class);

        // 指定reduce的輸出數據kv類型
        wcJob.setOutputKeyClass(Text.class);
        wcJob.setOutputValueClass(LongWritable.class);

        // 指定mapper的輸出數據kv類型
        wcJob.setMapOutputKeyClass(Text.class);
        wcJob.setMapOutputValueClass(LongWritable.class);

        //指定要處理的輸入數據存放路徑
        FileInputFormat.setInputPaths(wcJob, new Path("/wc/srcdata/"));

        //指定處理結果的輸出數據存放路徑
        FileOutputFormat.setOutputPath(wcJob, new Path("/wc/output"));

        //將job提交給集群運行
        wcJob.waitForCompletion(true);
    }
}

2. Yarn資源調度框架

Resource Manager:
Node Manager:

技術分享圖片



1. wcJob.waitforCompleition啟動一個RunJar進程,這個進程向RM申請執行一個Job
2. RM 返回一個Job相關資源的路徑staging-dir,和為Job產生的jobID
3. RunJar提交資源到 HDFS的 staging-dir上
4. RunJar提交資源完畢之後,上報RM 提交資源完畢
5. RM下個Job加入RM中的任務隊列中
6. 各個Node Manager通過通信,從RM的任務隊列中領取任務
7. 各個Node Manager初始化  運行資源的容器,從staging-dir上面拉取資源
8. RM選擇一個Node Manager 啟動MRAppMaster 來運行map reducer
9. MRAppMaster向RM註冊
10. MSAppMaster啟動Mapper任務
11. MSAppMaster啟動Reducer任務
12. 任務完成後, 向RM註銷自己

3.幾種運行模式
本地模型運行

在windows的eclipse裏面直接運行main方法,就會將job提交給本地執行器locaJobRunner執行
– 輸入輸出數據可以放在本地路徑下(c:/wc/src/data/)
– 輸入輸出數據也可以放在hdfs中(hdfs://hadoop1:9000/wc/srcdata)

在linux的ecllipse裏面直接運行main 方法,則不需要添加yarn相關的配置,也會提交給localJobRunner執行
– 輸入輸出數據可以放在本地路徑下(/home/hadoop/wc/srcdata)
– 輸入輸出數據也可以放在hdfs中(hdfs://hadoop1:9000/wc/srcdata)

集群模式運行

將工程打包成jar包, 上傳到服務器,然後用hadoop命令提交 hadoop jar wc.jar cn.itcast.hadoop.mr.wordcount.WCRunner
在linux的eclipse中直接運行main方法,也可以提交到集群中去運行,但是,必須采取一下措施:
– 在工程src目錄下加入mapred-site.xml 和yarn-site.xml
– 將工程打成jar包(wc.jar), 同時在main方法中添加一個conf的配置參數 conf.set(“mapreduce.job.jar”, “wc.jar”);

在windows的eclipse中直接運行main方法,也可以提交集群中運行,但是因為平臺不兼容,需要做很多的設置修改
– 要在windows中存放一份hadoop的安裝包(解壓好的)
– 要將其中的lib和bin目錄替換成根據你的windows版本重新編譯出的文件
– 再要配置系統環境變量 HDOOP_HOME 和 PATH
– 修改YarnRunner這個類的源碼

Hadoop—MapReducer統計文件的單詞出現的個數