1. 程式人生 > >wordcount 程式碼解析

wordcount 程式碼解析

MapRuduce

map表示對映 reduce表示化簡。它的思想就是‘分而治之’,具體思想就不用說了 這裡主要解析wordcount原始碼。程式碼裡的思想是一直是K,V對(鍵值對)傳輸的重要的是map ()、reduce()兩個函式。 main方法裡主要job作業的配置、啟動

main

Configuration configuration = new Configuration();
        Job job = new Job(configuration, WordCount.class.getSimpleName());
        job.setJarByClass(WordCount.class);
        // 打jar包
job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); // 通過job設定輸入/輸出格式 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 設定輸入/輸出路徑 job.setMapperClass(WordMap.class); job.setReducerClass(WordReduce.class); // 設定處理Map/Reduce階段的類
job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 設定最終輸出key/value的型別m job.waitForCompletion(true); // 提交作業

資料是:兩行文字

    i am Malik Cheng
    i am hadoop

完整的WordCount.java


import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import
org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; public class WordCount { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { if (args.length != 2 || args == null) { System.out.println("please input Path!"); System.exit(0); } Configuration configuration = new Configuration(); Job job = new Job(configuration, WordCount.class.getSimpleName()); job.setJarByClass(WordCount.class); // 打jar包 job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); // 通過job設定輸入/輸出格式 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 設定輸入/輸出路徑 job.setMapperClass(WordMap.class); job.setReducerClass(WordReduce.class); // 設定處理Map/Reduce階段的類 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 設定最終輸出key/value的型別m job.waitForCompletion(true); // 提交作業 } /* * keyin:輸入每行文字的偏移量,型別為LongerWritable(ObjectWritable) value:每行文字的內容,型別為Text * keyout:輸出中間結果的key,型別根據實際情況設定 valueout:輸出中間結果的value,型別根據實際情況設定 */ static class WordMap extends Mapper<LongWritable, Text, Text, IntWritable> { protected void map( LongWritable key, Text value, org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws java.io.IOException, InterruptedException { System.out.println("WordMap.map..."); System.out.println("Map key:" + key.toString() + " ,Map value:" + value.toString()); String[] lines = value.toString().split(" "); for (String word : lines) { context.write(new Text(word), new IntWritable(1)); // 每個單詞出現1次,作為中間結果輸出 System.out.println("word:" + word + ",one:" + new IntWritable(1).toString()); } System.out.println("context:" + context.toString()); }; } /* * keyin:輸入的key值,型別與map中的keyout一致 valuein:輸入中間結果的value值,型別與map中的valueout一致 * keyout:最終結果的key值 valueout:最終結果的value值 */ static class WordReduce extends Reducer<Text, IntWritable, Text, IntWritable> { protected void reduce( Text key, java.lang.Iterable<IntWritable> values, org.apache.hadoop.mapreduce.Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws java.io.IOException, InterruptedException { System.out.println("WordReduce rudece..."); int sum = 0; System.out.println("---------------------values:"); for (IntWritable count : values) { sum = sum + count.get(); System.out.println("count:"+count+", sum:"+sum); } context.write(key, new IntWritable(sum));// 輸出最終結果 System.out.println("Rudece key:"+key.toString()+", sum :"+new IntWritable(sum).toString()); System.out.println("Rudece context:" + context.toString()+", sum :"+new IntWritable(sum).toString()); }; } }

對列印的結果解析:

map階段

WordMap.map...
Map key:0 ,Map value:i am Malik Cheng
word:i,one:1
word:am,one:1
word:Malik,one:1
word:Cheng,one:1
context:[email protected]2ceec589
WordMap.map...
Map key:17 ,Map value:i am hadoop
word:i,one:1
word:am,one:1
word:hadoop,one:1
context:[email protected]2ceec589

map函式引數 :map(LongWritable key,Text value,Context context)         key 和 value是我們傳入的資料,value其實是真是資料(i am Malik….),key 是用來幫助換行的偏移量而context上下文物件,context作為了map和reduce執行中各個函式的一個橋樑,這個設計和java web中的session物件、application物件很相似。

從上面的輸出可以看出 有2次 WordMap.map…說明map函式被呼叫了2次,問什麼會有2次呼叫尼? 原來因為TextInputFormat型別的,都是按行處理。 每一行的內容會在value引數中傳進來,也就是說每一行的內容都對應了一個key,這個key為此行的開頭位置在本檔案中的所在位置(所以第1行的key是0,第2行的key是17)。這樣每個單詞像這樣word:Malik,one:1記錄在context中,用來傳給reduce

reduce階段

 WordReduce rudece...
---------------------values:
count:1, sum:1
Rudece key:Cheng, sum :1
Rudece context:[email protected]5e76ee18, sum :1
WordReduce rudece...
---------------------values:
count:1, sum:1
Rudece key:Malik, sum :1
Rudece context:[email protected]5e76ee18, sum :1
WordReduce rudece...
---------------------values:
count:1, sum:1
count:1, sum:2
Rudece key:am, sum :2
Rudece context:[email protected]5e76ee18, sum :2
WordReduce rudece...
---------------------values:
count:1, sum:1
Rudece key:hadoop, sum :1
Rudece context:[email protected]5e76ee18, sum :1
WordReduce rudece...
---------------------values:
count:1, sum:1
count:1, sum:2
Rudece key:i, sum :2
Rudece context:[email protected]5e76ee18, sum :2

分析:reduce 化簡的意思,第一個問題:為什麼是化簡? 在Wordcount裡,化簡就是有對map傳來的相同key(每個key對應的value是1)進行遍歷求和。第二個問題為什麼會有WordReduce rudece…輸出,也就是為什麼有5次呼叫reduce尼?其實知道第一個問題的答案,這個也就有答案了。因為資料裡面有5個不同的單詞(5個不同key),所以也就有5次化簡咯。總結 map()是按行呼叫、map()按key呼叫。


控制檯輸出

```
2017-07-20 18:07:02,175 WARN  [main] util.NativeCodeLoader (NativeCodeLoader.java:<clinit>(62)) - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2017-07-20 18:07:02,820 INFO  [main] Configuration.deprecation (Configuration.java:warnOnceIfDeprecated(1019)) - session.id is deprecated. Instead, use dfs.metrics.session-id
2017-07-20 18:07:02,821 INFO  [main] jvm.JvmMetrics (JvmMetrics.java:init(76)) - Initializing JVM Metrics with processName=JobTracker, sessionId=
2017-07-20 18:07:03,090 WARN  [main] mapreduce.JobSubmitter (JobSubmitter.java:copyAndConfigureFiles(150)) - Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
2017-07-20 18:07:03,093 WARN  [main] mapreduce.JobSubmitter (JobSubmitter.java:copyAndConfigureFiles(259)) - No job jar file set.  User classes may not be found. See Job or Job#setJar(String).
2017-07-20 18:07:03,175 INFO  [main] input.FileInputFormat (FileInputFormat.java:listStatus(281)) - Total input paths to process : 1
2017-07-20 18:07:03,213 INFO  [main] mapreduce.JobSubmitter (JobSubmitter.java:submitJobInternal(396)) - number of splits:1
2017-07-20 18:07:03,394 INFO  [main] mapreduce.JobSubmitter (JobSubmitter.java:printTokens(479)) - Submitting tokens for job: job_local130204698_0001
2017-07-20 18:07:03,479 WARN  [main] conf.Configuration (Configuration.java:loadProperty(2368)) - file:/tmp/hadoop-zkpk/mapred/staging/zkpk130204698/.staging/job_local130204698_0001/job.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.retry.interval;  Ignoring.
2017-07-20 18:07:03,489 WARN  [main] conf.Configuration (Configuration.java:loadProperty(2368)) - file:/tmp/hadoop-zkpk/mapred/staging/zkpk130204698/.staging/job_local130204698_0001/job.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.attempts;  Ignoring.
2017-07-20 18:07:03,713 WARN  [main] conf.Configuration (Configuration.java:loadProperty(2368)) - file:/tmp/hadoop-zkpk/mapred/local/localRunner/zkpk/job_local130204698_0001/job_local130204698_0001.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.retry.interval;  Ignoring.
2017-07-20 18:07:03,733 WARN  [main] conf.Configuration (Configuration.java:loadProperty(2368)) - file:/tmp/hadoop-zkpk/mapred/local/localRunner/zkpk/job_local130204698_0001/job_local130204698_0001.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.attempts;  Ignoring.
2017-07-20 18:07:03,761 INFO  [main] mapreduce.Job (Job.java:submit(1289)) - The url to track the job: http://localhost:8080/
2017-07-20 18:07:03,762 INFO  [main] mapreduce.Job (Job.java:monitorAndPrintJob(1334)) - Running job: job_local130204698_0001
2017-07-20 18:07:03,763 INFO  [Thread-12] mapred.LocalJobRunner (LocalJobRunner.java:createOutputCommitter(471)) - OutputCommitter set in config null
2017-07-20 18:07:03,779 INFO  [Thread-12] mapred.LocalJobRunner (LocalJobRunner.java:createOutputCommitter(489)) - OutputCommitter is org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
2017-07-20 18:07:03,936 INFO  [Thread-12] mapred.LocalJobRunner (LocalJobRunner.java:runTasks(448)) - Waiting for map tasks
2017-07-20 18:07:03,937 INFO  [LocalJobRunner Map Task Executor #0] mapred.LocalJobRunner (LocalJobRunner.java:run(224)) - Starting task: attempt_local130204698_0001_m_000000_0
2017-07-20 18:07:03,995 INFO  [LocalJobRunner Map Task Executor #0] mapred.Task (Task.java:initialize(587)) -  Using ResourceCalculatorProcessTree : [ ]
2017-07-20 18:07:04,001 INFO  [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:runNewMapper(733)) - Processing split: hdfs://master:9000/user/wordcount/input1/h:0+29
2017-07-20 18:07:04,017 INFO  [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:createSortingCollector(388)) - Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer
2017-07-20 18:07:04,083 INFO  [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:setEquator(1182)) - (EQUATOR) 0 kvi 26214396(104857584)
2017-07-20 18:07:04,083 INFO  [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:init(975)) - mapreduce.task.io.sort.mb: 100
2017-07-20 18:07:04,083 INFO  [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:init(976)) - soft limit at 83886080
2017-07-20 18:07:04,084 INFO  [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:init(977)) - bufstart = 0; bufvoid = 104857600
2017-07-20 18:07:04,084 INFO  [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:init(978)) - kvstart = 26214396; length = 6553600
WordMap.map...
Map key:0 ,Map value:i am Malik Cheng
word:i,one:1
word:am,one:1
word:Malik,one:1
word:Cheng,one:1
context:[email protected]2ceec589
WordMap.map...
Map key:17 ,Map value:i am hadoop
word:i,one:1
word:am,one:1
word:hadoop,one:1
context:[email protected]2ceec589
2017-07-20 18:07:04,420 INFO  [LocalJobRunner Map Task Executor #0] mapred.LocalJobRunner (LocalJobRunner.java:statusUpdate(591)) - 
2017-07-20 18:07:04,423 INFO  [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:flush(1437)) - Starting flush of map output
2017-07-20 18:07:04,424 INFO  [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:flush(1455)) - Spilling map output
2017-07-20 18:07:04,424 INFO  [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:flush(1456)) - bufstart = 0; bufend = 57; bufvoid = 104857600
2017-07-20 18:07:04,424 INFO  [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:flush(1458)) - kvstart = 26214396(104857584); kvend = 26214372(104857488); length = 25/6553600
2017-07-20 18:07:04,437 INFO  [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:sortAndSpill(1641)) - Finished spill 0
2017-07-20 18:07:04,441 INFO  [LocalJobRunner Map Task Executor #0] mapred.Task (Task.java:done(1001)) - Task:attempt_local130204698_0001_m_000000_0 is done. And is in the process of committing
2017-07-20 18:07:04,453 INFO  [LocalJobRunner Map Task Executor #0] mapred.LocalJobRunner (LocalJobRunner.java:statusUpdate(591)) - map
2017-07-20 18:07:04,453 INFO  [LocalJobRunner Map Task Executor #0] mapred.Task (Task.java:sendDone(1121)) - Task 'attempt_local130204698_0001_m_000000_0' done.
2017-07-20 18:07:04,453 INFO  [LocalJobRunner Map Task Executor #0] mapred.LocalJobRunner (LocalJobRunner.java:run(249)) - Finishing task: attempt_local130204698_0001_m_000000_0
2017-07-20 18:07:04,453 INFO  [Thread-12] mapred.LocalJobRunner (LocalJobRunner.java:runTasks(456)) - map task executor complete.
2017-07-20 18:07:04,456 INFO  [Thread-12] mapred.LocalJobRunner (LocalJobRunner.java:runTasks(448)) - Waiting for reduce tasks
2017-07-20 18:07:04,457 INFO  [pool-6-thread-1] mapred.LocalJobRunner (LocalJobRunner.java:run(302)) - Starting task: attempt_local130204698_0001_r_000000_0
2017-07-20 18:07:04,463 INFO  [pool-6-thread-1] mapred.Task (Task.java:initialize(587)) -  Using ResourceCalculatorProcessTree : [ ]
2017-07-20 18:07:04,467 INFO  [pool-6-thread-1] mapred.ReduceTask (ReduceTask.java:run(362)) - Using ShuffleConsumerPlugin: [email protected]5031c1e1
2017-07-20 18:07:04,479 INFO  [pool-6-thread-1] reduce.MergeManagerImpl (MergeManagerImpl.java:<init>(193)) - MergerManager: memoryLimit=304244320, maxSingleShuffleLimit=76061080, mergeThreshold=200801264, ioSortFactor=10, memToMemMergeOutputsThreshold=10
2017-07-20 18:07:04,484 INFO  [EventFetcher for fetching Map Completion Events] reduce.EventFetcher (EventFetcher.java:run(61)) - attempt_local130204698_0001_r_000000_0 Thread started: EventFetcher for fetching Map Completion Events
2017-07-20 18:07:04,517 INFO  [localfetcher#1] reduce.LocalFetcher (LocalFetcher.java:copyMapOutput(140)) - localfetcher#1 about to shuffle output of map attempt_local130204698_0001_m_000000_0 decomp: 73 len: 77 to MEMORY
2017-07-20 18:07:04,521 INFO  [localfetcher#1] reduce.InMemoryMapOutput (InMemoryMapOutput.java:shuffle(100)) - Read 73 bytes from map-output for attempt_local130204698_0001_m_000000_0
2017-07-20 18:07:04,523 INFO  [localfetcher#1] reduce.MergeManagerImpl (MergeManagerImpl.java:closeInMemoryFile(307)) - closeInMemoryFile -> map-output of size: 73, inMemoryMapOutputs.size() -> 1, commitMemory -> 0, usedMemory ->73
2017-07-20 18:07:04,524 INFO  [EventFetcher for fetching Map Completion Events] reduce.EventFetcher (EventFetcher.java:run(76)) - EventFetcher is interrupted.. Returning
2017-07-20 18:07:04,525 INFO  [pool-6-thread-1] mapred.LocalJobRunner (LocalJobRunner.java:statusUpdate(591)) - 1 / 1 copied.
2017-07-20 18:07:04,525 INFO  [pool-6-thread-1] reduce.MergeManagerImpl (MergeManagerImpl.java:finalMerge(667)) - finalMerge called with 1 in-memory map-outputs and 0 on-disk map-outputs
2017-07-20 18:07:04,534 INFO  [pool-6-thread-1] mapred.Merger (Merger.java:merge(591)) - Merging 1 sorted segments
2017-07-20 18:07:04,534 INFO  [pool-6-thread-1] mapred.Merger (Merger.java:merge(690)) - Down to the last merge-pass, with 1 segments left of total size: 65 bytes
2017-07-20 18:07:04,536 INFO  [pool-6-thread-1] reduce.MergeManagerImpl (MergeManagerImpl.java:finalMerge(742)) - Merged 1 segments, 73 bytes to disk to satisfy reduce memory limit
2017-07-20 18:07:04,536 INFO  [pool-6-thread-1] reduce.MergeManagerImpl (MergeManagerImpl.java:finalMerge(772)) - Merging 1 files, 77 bytes from disk
2017-07-20 18:07:04,537 INFO  [pool-6-thread-1] reduce.MergeManagerImpl (MergeManagerImpl.java:finalMerge(787)) - Merging 0 segments, 0 bytes from memory into reduce
2017-07-20 18:07:04,537 INFO  [pool-6-thread-1] mapred.Merger (Merger.java:merge(591)) - Merging 1 sorted segments
2017-07-20 18:07:04,537 INFO  [pool-6-thread-1] mapred.Merger (Merger.java:merge(690)) - Down to the last merge-pass, with 1 segments left of total size: 65 bytes
2017-07-20 18:07:04,538 INFO  [pool-6-thread-1] mapred.LocalJobRunner (LocalJobRunner.java:statusUpdate(591)) - 1 / 1 copied.
2017-07-20 18:07:04,572 INFO  [pool-6-thread-1] Configuration.deprecation (Configuration.java:warnOnceIfDeprecated(1019)) - mapred.skip.on is deprecated. Instead, use mapreduce.job.skiprecords
WordReduce rudece...
---------------------values:
count:1, sum:1
Rudece key:Cheng, sum :1
Rudece context:[email protected]5e76ee18, sum :1
WordReduce rudece...
---------------------values:
count:1, sum:1
Rudece key:Malik, sum :1
Rudece context:[email protected]5e76ee18, sum :1
WordReduce rudece...
---------------------values:
count:1, sum:1
count:1, sum:2
Rudece key:am, sum :2
Rudece context:[email protected]5e76ee18, sum :2
WordReduce rudece...
---------------------values:
count:1, sum:1
Rudece key:hadoop, sum :1
Rudece context:[email protected]5e76ee18, sum :1
WordReduce rudece...
---------------------values:
count:1, sum:1
count:1, sum:2
Rudece key:i, sum :2
Rudece context:[email protected]5e76ee18, sum :2
2017-07-20 18:07:04,700 INFO  [pool-6-thread-1] mapred.Task (Task.java:done(1001)) - Task:attempt_local130204698_0001_r_000000_0 is done. And is in the process of committing
2017-07-20 18:07:04,703 INFO  [pool-6-thread-1] mapred.LocalJobRunner (LocalJobRunner.java:statusUpdate(591)) - 1 / 1 copied.
2017-07-20 18:07:04,703 INFO  [pool-6-thread-1] mapred.Task (Task.java:commit(1162)) - Task attempt_local130204698_0001_r_000000_0 is allowed to commit now
2017-07-20 18:07:04,713 INFO  [pool-6-thread-1] output.FileOutputCommitter (FileOutputCommitter.java:commitTask(439)) - Saved output of task 'attempt_local130204698_0001_r_000000_0' to hdfs://master:9000/user/wordcount/output1/_temporary/0/task_local130204698_0001_r_000000
2017-07-20 18:07:04,715 INFO  [pool-6-thread-1] mapred.LocalJobRunner (LocalJobRunner.java:statusUpdate(591)) - reduce > reduce
2017-07-20 18:07:04,715 INFO  [pool-6-thread-1] mapred.Task (Task.java:sendDone(1121)) - Task 'attempt_local130204698_0001_r_000000_0' done.
2017-07-20 18:07:04,716 INFO  [pool-6-thread-1] mapred.LocalJobRunner (LocalJobRunner.java:run(325)) - Finishing task: attempt_local130204698_0001_r_000000_0
2017-07-20 18:07:04,716 INFO  [Thread-12] mapred.LocalJobRunner (LocalJobRunner.java:runTasks(456)) - reduce task executor complete.
2017-07-20 18:07:04,765 INFO  [main] mapreduce.Job (Job.java:monitorAndPrintJob(1355)) - Job job_local130204698_0001 running in uber mode : false
2017-07-20 18:07:04,766 INFO  [main] mapreduce.Job (Job.java:monitorAndPrintJob(1362)) -  map 100% reduce 100%
2017-07-20 18:07:04,767 INFO  [main] mapreduce.Job (Job.java:monitorAndPrintJob(1373)) - Job job_local130204698_0001 completed successfully
2017-07-20 18:07:04,793 INFO  [main] mapreduce.Job (Job.java:monitorAndPrintJob(1380)) - Counters: 38
    File System Counters
        FILE: Number of bytes read=502
        FILE: Number of bytes written=457295
        FILE: Number of read operations=0
        FILE: Number of large read operations=0
        FILE: Number of write operations=0
        HDFS: Number of bytes read=58
        HDFS: Number of bytes written=34
        HDFS: Number of read operations=15
        HDFS: Number of large read operations=0
        HDFS: Number of write operations=4
    Map-Reduce Framework
        Map input records=2
        Map output records=7
        Map output bytes=57
        Map output materialized bytes=77
        Input split bytes=107
        Combine input records=0
        Combine output records=0
        Reduce input groups=5
        Reduce shuffle bytes=77
        Reduce input records=7
        Reduce output records=5
        Spilled Records=14
        Shuffled Maps =1
        Failed Shuffles=0
        Merged Map outputs=1
        GC time elapsed (ms)=0
        CPU time spent (ms)=0
        Physical memory (bytes) snapshot=0
        Virtual memory (bytes) snapshot=0
        Total committed heap usage (bytes)=396361728
    Shuffle Errors
        BAD_ID=0
        CONNECTION=0
        IO_ERROR=0
        WRONG_LENGTH=0
        WRONG_MAP=0
        WRONG_REDUCE=0
    File Input Format Counters 
        Bytes Read=29
    File Output Format Counters 
        Bytes Written=34