Hadoop—MapReducer統計文件的單詞出現的個數
1. MapReduce 統計文件的單詞出現的個數
Mapper: 處理具體文本,發送結果
Reducer: 合並各個Mapper發送過來的結果
Job: 制定相關配置,框架
Mapper
package cn.itcast.hadoop.mr.wordcount; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.util.StringUtils; // 4個泛型中,前兩個是指定mapper輸入數據的類型 // map和 reducer的輸入和輸出都是key-value對的形式 // 默認情況下。框架輸入的我們mapper的輸入數據中,key是要處理的文本中的一行的起始偏移量, 內容就是value public class WCMapper extends Mapper <LongWritable, Text, Text, LongWritable> { // 每讀一次數據,就調一次這個方法 @Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{ //具體業務邏輯就像和在這裏, 傳入數據就是 key, value //將這一行的內容轉化成string類型 String line = value.toString(); // 對這一行的文本按特定分隔符切分 String[] words = StringUtils.split(line, ‘ ‘);// 便利這個單詞數組, 輸出為kv形式 k:單詞 v:1 for (String word : words) { context.write(new Text(word), new LongWritable(1)); } } }
Reducer
package cn.itcast.hadoop.mr.wordcount; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class WCReducer extends Reducer<Text, LongWritable, Text, LongWritable> { @Override protected void reduce(Text Key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { long count = 0; for (LongWritable value:values) { count += value.get(); } context.write(Key, new LongWritable(count)); } }
Job
package cn.itcast.hadoop.mr.wordcount; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.io.Text; //import com.sun.jersey.core.impl.provider.entity.XMLJAXBElementProvider.Text; public class WCRunner { public static void main(String[] args) throws Exception{ Configuration conf = new Configuration(); Job wcJob = Job.getInstance(conf); wcJob.setJarByClass(WCRunner.class); //本job使用的mapper和reducer類 wcJob.setMapperClass(WCMapper.class); wcJob.setReducerClass(WCReducer.class); // 指定reduce的輸出數據kv類型 wcJob.setOutputKeyClass(Text.class); wcJob.setOutputValueClass(LongWritable.class); // 指定mapper的輸出數據kv類型 wcJob.setMapOutputKeyClass(Text.class); wcJob.setMapOutputValueClass(LongWritable.class); //指定要處理的輸入數據存放路徑 FileInputFormat.setInputPaths(wcJob, new Path("/wc/srcdata/")); //指定處理結果的輸出數據存放路徑 FileOutputFormat.setOutputPath(wcJob, new Path("/wc/output")); //將job提交給集群運行 wcJob.waitForCompletion(true); } }
2. Yarn資源調度框架
Resource Manager:
Node Manager:
1. wcJob.waitforCompleition啟動一個RunJar進程,這個進程向RM申請執行一個Job 2. RM 返回一個Job相關資源的路徑staging-dir,和為Job產生的jobID 3. RunJar提交資源到 HDFS的 staging-dir上 4. RunJar提交資源完畢之後,上報RM 提交資源完畢 5. RM下個Job加入RM中的任務隊列中 6. 各個Node Manager通過通信,從RM的任務隊列中領取任務 7. 各個Node Manager初始化 運行資源的容器,從staging-dir上面拉取資源 8. RM選擇一個Node Manager 啟動MRAppMaster 來運行map reducer 9. MRAppMaster向RM註冊 10. MSAppMaster啟動Mapper任務 11. MSAppMaster啟動Reducer任務 12. 任務完成後, 向RM註銷自己
3.幾種運行模式
本地模型運行
在windows的eclipse裏面直接運行main方法,就會將job提交給本地執行器locaJobRunner執行
– 輸入輸出數據可以放在本地路徑下(c:/wc/src/data/)
– 輸入輸出數據也可以放在hdfs中(hdfs://hadoop1:9000/wc/srcdata)
在linux的ecllipse裏面直接運行main 方法,則不需要添加yarn相關的配置,也會提交給localJobRunner執行
– 輸入輸出數據可以放在本地路徑下(/home/hadoop/wc/srcdata)
– 輸入輸出數據也可以放在hdfs中(hdfs://hadoop1:9000/wc/srcdata)
集群模式運行
將工程打包成jar包, 上傳到服務器,然後用hadoop命令提交 hadoop jar wc.jar cn.itcast.hadoop.mr.wordcount.WCRunner
在linux的eclipse中直接運行main方法,也可以提交到集群中去運行,但是,必須采取一下措施:
– 在工程src目錄下加入mapred-site.xml 和yarn-site.xml
– 將工程打成jar包(wc.jar), 同時在main方法中添加一個conf的配置參數 conf.set(“mapreduce.job.jar”, “wc.jar”);
在windows的eclipse中直接運行main方法,也可以提交集群中運行,但是因為平臺不兼容,需要做很多的設置修改
– 要在windows中存放一份hadoop的安裝包(解壓好的)
– 要將其中的lib和bin目錄替換成根據你的windows版本重新編譯出的文件
– 再要配置系統環境變量 HDOOP_HOME 和 PATH
– 修改YarnRunner這個類的源碼
Hadoop—MapReducer統計文件的單詞出現的個數