wordcount 程式碼解析
阿新 • • 發佈:2018-11-01
MapRuduce
map表示對映 reduce表示化簡。它的思想就是‘分而治之’,具體思想就不用說了 這裡主要解析wordcount原始碼。程式碼裡的思想是一直是K,V對(鍵值對)傳輸的重要的是map ()、reduce()兩個函式。 main方法裡主要job作業的配置、啟動
main
Configuration configuration = new Configuration();
Job job = new Job(configuration, WordCount.class.getSimpleName());
job.setJarByClass(WordCount.class);
// 打jar包
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
// 通過job設定輸入/輸出格式
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 設定輸入/輸出路徑
job.setMapperClass(WordMap.class);
job.setReducerClass(WordReduce.class);
// 設定處理Map/Reduce階段的類
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 設定最終輸出key/value的型別m
job.waitForCompletion(true);
// 提交作業
資料是:兩行文字
i am Malik Cheng
i am hadoop
完整的WordCount.java
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class WordCount {
public static void main(String[] args) throws IOException,
ClassNotFoundException, InterruptedException {
if (args.length != 2 || args == null) {
System.out.println("please input Path!");
System.exit(0);
}
Configuration configuration = new Configuration();
Job job = new Job(configuration, WordCount.class.getSimpleName());
job.setJarByClass(WordCount.class);
// 打jar包
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
// 通過job設定輸入/輸出格式
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 設定輸入/輸出路徑
job.setMapperClass(WordMap.class);
job.setReducerClass(WordReduce.class);
// 設定處理Map/Reduce階段的類
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 設定最終輸出key/value的型別m
job.waitForCompletion(true);
// 提交作業
}
/*
* keyin:輸入每行文字的偏移量,型別為LongerWritable(ObjectWritable) value:每行文字的內容,型別為Text
* keyout:輸出中間結果的key,型別根據實際情況設定 valueout:輸出中間結果的value,型別根據實際情況設定
*/
static class WordMap extends Mapper<LongWritable, Text, Text, IntWritable> {
protected void map(
LongWritable key,
Text value,
org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, Text, IntWritable>.Context context)
throws java.io.IOException, InterruptedException {
System.out.println("WordMap.map...");
System.out.println("Map key:" + key.toString() + " ,Map value:"
+ value.toString());
String[] lines = value.toString().split(" ");
for (String word : lines) {
context.write(new Text(word), new IntWritable(1));
// 每個單詞出現1次,作為中間結果輸出
System.out.println("word:" + word + ",one:"
+ new IntWritable(1).toString());
}
System.out.println("context:" + context.toString());
};
}
/*
* keyin:輸入的key值,型別與map中的keyout一致 valuein:輸入中間結果的value值,型別與map中的valueout一致
* keyout:最終結果的key值 valueout:最終結果的value值
*/
static class WordReduce extends
Reducer<Text, IntWritable, Text, IntWritable> {
protected void reduce(
Text key,
java.lang.Iterable<IntWritable> values,
org.apache.hadoop.mapreduce.Reducer<Text, IntWritable, Text, IntWritable>.Context context)
throws java.io.IOException, InterruptedException {
System.out.println("WordReduce rudece...");
int sum = 0;
System.out.println("---------------------values:");
for (IntWritable count : values) {
sum = sum + count.get();
System.out.println("count:"+count+", sum:"+sum);
}
context.write(key, new IntWritable(sum));// 輸出最終結果
System.out.println("Rudece key:"+key.toString()+", sum :"+new IntWritable(sum).toString());
System.out.println("Rudece context:" + context.toString()+", sum :"+new IntWritable(sum).toString());
};
}
}
對列印的結果解析:
map階段
WordMap.map...
Map key:0 ,Map value:i am Malik Cheng
word:i,one:1
word:am,one:1
word:Malik,one:1
word:Cheng,one:1
context:[email protected]2ceec589
WordMap.map...
Map key:17 ,Map value:i am hadoop
word:i,one:1
word:am,one:1
word:hadoop,one:1
context:[email protected]2ceec589
map函式引數 :map(LongWritable key,Text value,Context context) key 和 value是我們傳入的資料,value其實是真是資料(i am Malik….),key 是用來幫助換行的偏移量而context上下文物件,context作為了map和reduce執行中各個函式的一個橋樑,這個設計和java web中的session物件、application物件很相似。
從上面的輸出可以看出 有2次 WordMap.map…說明map函式被呼叫了2次,問什麼會有2次呼叫尼? 原來因為TextInputFormat型別的,都是按行處理。 每一行的內容會在value引數中傳進來,也就是說每一行的內容都對應了一個key,這個key為此行的開頭位置在本檔案中的所在位置(所以第1行的key是0,第2行的key是17)。這樣每個單詞像這樣word:Malik,one:1記錄在context中,用來傳給reduce
reduce階段
WordReduce rudece...
---------------------values:
count:1, sum:1
Rudece key:Cheng, sum :1
Rudece context:[email protected]5e76ee18, sum :1
WordReduce rudece...
---------------------values:
count:1, sum:1
Rudece key:Malik, sum :1
Rudece context:[email protected]5e76ee18, sum :1
WordReduce rudece...
---------------------values:
count:1, sum:1
count:1, sum:2
Rudece key:am, sum :2
Rudece context:[email protected]5e76ee18, sum :2
WordReduce rudece...
---------------------values:
count:1, sum:1
Rudece key:hadoop, sum :1
Rudece context:[email protected]5e76ee18, sum :1
WordReduce rudece...
---------------------values:
count:1, sum:1
count:1, sum:2
Rudece key:i, sum :2
Rudece context:[email protected]5e76ee18, sum :2
分析:reduce 化簡的意思,第一個問題:為什麼是化簡? 在Wordcount裡,化簡就是有對map傳來的相同key(每個key對應的value是1)進行遍歷求和。第二個問題為什麼會有WordReduce rudece…輸出,也就是為什麼有5次呼叫reduce尼?其實知道第一個問題的答案,這個也就有答案了。因為資料裡面有5個不同的單詞(5個不同key),所以也就有5次化簡咯。總結 map()是按行呼叫、map()按key呼叫。
控制檯輸出
```
2017-07-20 18:07:02,175 WARN [main] util.NativeCodeLoader (NativeCodeLoader.java:<clinit>(62)) - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2017-07-20 18:07:02,820 INFO [main] Configuration.deprecation (Configuration.java:warnOnceIfDeprecated(1019)) - session.id is deprecated. Instead, use dfs.metrics.session-id
2017-07-20 18:07:02,821 INFO [main] jvm.JvmMetrics (JvmMetrics.java:init(76)) - Initializing JVM Metrics with processName=JobTracker, sessionId=
2017-07-20 18:07:03,090 WARN [main] mapreduce.JobSubmitter (JobSubmitter.java:copyAndConfigureFiles(150)) - Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
2017-07-20 18:07:03,093 WARN [main] mapreduce.JobSubmitter (JobSubmitter.java:copyAndConfigureFiles(259)) - No job jar file set. User classes may not be found. See Job or Job#setJar(String).
2017-07-20 18:07:03,175 INFO [main] input.FileInputFormat (FileInputFormat.java:listStatus(281)) - Total input paths to process : 1
2017-07-20 18:07:03,213 INFO [main] mapreduce.JobSubmitter (JobSubmitter.java:submitJobInternal(396)) - number of splits:1
2017-07-20 18:07:03,394 INFO [main] mapreduce.JobSubmitter (JobSubmitter.java:printTokens(479)) - Submitting tokens for job: job_local130204698_0001
2017-07-20 18:07:03,479 WARN [main] conf.Configuration (Configuration.java:loadProperty(2368)) - file:/tmp/hadoop-zkpk/mapred/staging/zkpk130204698/.staging/job_local130204698_0001/job.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.retry.interval; Ignoring.
2017-07-20 18:07:03,489 WARN [main] conf.Configuration (Configuration.java:loadProperty(2368)) - file:/tmp/hadoop-zkpk/mapred/staging/zkpk130204698/.staging/job_local130204698_0001/job.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.attempts; Ignoring.
2017-07-20 18:07:03,713 WARN [main] conf.Configuration (Configuration.java:loadProperty(2368)) - file:/tmp/hadoop-zkpk/mapred/local/localRunner/zkpk/job_local130204698_0001/job_local130204698_0001.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.retry.interval; Ignoring.
2017-07-20 18:07:03,733 WARN [main] conf.Configuration (Configuration.java:loadProperty(2368)) - file:/tmp/hadoop-zkpk/mapred/local/localRunner/zkpk/job_local130204698_0001/job_local130204698_0001.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.attempts; Ignoring.
2017-07-20 18:07:03,761 INFO [main] mapreduce.Job (Job.java:submit(1289)) - The url to track the job: http://localhost:8080/
2017-07-20 18:07:03,762 INFO [main] mapreduce.Job (Job.java:monitorAndPrintJob(1334)) - Running job: job_local130204698_0001
2017-07-20 18:07:03,763 INFO [Thread-12] mapred.LocalJobRunner (LocalJobRunner.java:createOutputCommitter(471)) - OutputCommitter set in config null
2017-07-20 18:07:03,779 INFO [Thread-12] mapred.LocalJobRunner (LocalJobRunner.java:createOutputCommitter(489)) - OutputCommitter is org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
2017-07-20 18:07:03,936 INFO [Thread-12] mapred.LocalJobRunner (LocalJobRunner.java:runTasks(448)) - Waiting for map tasks
2017-07-20 18:07:03,937 INFO [LocalJobRunner Map Task Executor #0] mapred.LocalJobRunner (LocalJobRunner.java:run(224)) - Starting task: attempt_local130204698_0001_m_000000_0
2017-07-20 18:07:03,995 INFO [LocalJobRunner Map Task Executor #0] mapred.Task (Task.java:initialize(587)) - Using ResourceCalculatorProcessTree : [ ]
2017-07-20 18:07:04,001 INFO [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:runNewMapper(733)) - Processing split: hdfs://master:9000/user/wordcount/input1/h:0+29
2017-07-20 18:07:04,017 INFO [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:createSortingCollector(388)) - Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer
2017-07-20 18:07:04,083 INFO [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:setEquator(1182)) - (EQUATOR) 0 kvi 26214396(104857584)
2017-07-20 18:07:04,083 INFO [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:init(975)) - mapreduce.task.io.sort.mb: 100
2017-07-20 18:07:04,083 INFO [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:init(976)) - soft limit at 83886080
2017-07-20 18:07:04,084 INFO [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:init(977)) - bufstart = 0; bufvoid = 104857600
2017-07-20 18:07:04,084 INFO [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:init(978)) - kvstart = 26214396; length = 6553600
WordMap.map...
Map key:0 ,Map value:i am Malik Cheng
word:i,one:1
word:am,one:1
word:Malik,one:1
word:Cheng,one:1
context:[email protected]2ceec589
WordMap.map...
Map key:17 ,Map value:i am hadoop
word:i,one:1
word:am,one:1
word:hadoop,one:1
context:[email protected]2ceec589
2017-07-20 18:07:04,420 INFO [LocalJobRunner Map Task Executor #0] mapred.LocalJobRunner (LocalJobRunner.java:statusUpdate(591)) -
2017-07-20 18:07:04,423 INFO [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:flush(1437)) - Starting flush of map output
2017-07-20 18:07:04,424 INFO [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:flush(1455)) - Spilling map output
2017-07-20 18:07:04,424 INFO [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:flush(1456)) - bufstart = 0; bufend = 57; bufvoid = 104857600
2017-07-20 18:07:04,424 INFO [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:flush(1458)) - kvstart = 26214396(104857584); kvend = 26214372(104857488); length = 25/6553600
2017-07-20 18:07:04,437 INFO [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:sortAndSpill(1641)) - Finished spill 0
2017-07-20 18:07:04,441 INFO [LocalJobRunner Map Task Executor #0] mapred.Task (Task.java:done(1001)) - Task:attempt_local130204698_0001_m_000000_0 is done. And is in the process of committing
2017-07-20 18:07:04,453 INFO [LocalJobRunner Map Task Executor #0] mapred.LocalJobRunner (LocalJobRunner.java:statusUpdate(591)) - map
2017-07-20 18:07:04,453 INFO [LocalJobRunner Map Task Executor #0] mapred.Task (Task.java:sendDone(1121)) - Task 'attempt_local130204698_0001_m_000000_0' done.
2017-07-20 18:07:04,453 INFO [LocalJobRunner Map Task Executor #0] mapred.LocalJobRunner (LocalJobRunner.java:run(249)) - Finishing task: attempt_local130204698_0001_m_000000_0
2017-07-20 18:07:04,453 INFO [Thread-12] mapred.LocalJobRunner (LocalJobRunner.java:runTasks(456)) - map task executor complete.
2017-07-20 18:07:04,456 INFO [Thread-12] mapred.LocalJobRunner (LocalJobRunner.java:runTasks(448)) - Waiting for reduce tasks
2017-07-20 18:07:04,457 INFO [pool-6-thread-1] mapred.LocalJobRunner (LocalJobRunner.java:run(302)) - Starting task: attempt_local130204698_0001_r_000000_0
2017-07-20 18:07:04,463 INFO [pool-6-thread-1] mapred.Task (Task.java:initialize(587)) - Using ResourceCalculatorProcessTree : [ ]
2017-07-20 18:07:04,467 INFO [pool-6-thread-1] mapred.ReduceTask (ReduceTask.java:run(362)) - Using ShuffleConsumerPlugin: [email protected]5031c1e1
2017-07-20 18:07:04,479 INFO [pool-6-thread-1] reduce.MergeManagerImpl (MergeManagerImpl.java:<init>(193)) - MergerManager: memoryLimit=304244320, maxSingleShuffleLimit=76061080, mergeThreshold=200801264, ioSortFactor=10, memToMemMergeOutputsThreshold=10
2017-07-20 18:07:04,484 INFO [EventFetcher for fetching Map Completion Events] reduce.EventFetcher (EventFetcher.java:run(61)) - attempt_local130204698_0001_r_000000_0 Thread started: EventFetcher for fetching Map Completion Events
2017-07-20 18:07:04,517 INFO [localfetcher#1] reduce.LocalFetcher (LocalFetcher.java:copyMapOutput(140)) - localfetcher#1 about to shuffle output of map attempt_local130204698_0001_m_000000_0 decomp: 73 len: 77 to MEMORY
2017-07-20 18:07:04,521 INFO [localfetcher#1] reduce.InMemoryMapOutput (InMemoryMapOutput.java:shuffle(100)) - Read 73 bytes from map-output for attempt_local130204698_0001_m_000000_0
2017-07-20 18:07:04,523 INFO [localfetcher#1] reduce.MergeManagerImpl (MergeManagerImpl.java:closeInMemoryFile(307)) - closeInMemoryFile -> map-output of size: 73, inMemoryMapOutputs.size() -> 1, commitMemory -> 0, usedMemory ->73
2017-07-20 18:07:04,524 INFO [EventFetcher for fetching Map Completion Events] reduce.EventFetcher (EventFetcher.java:run(76)) - EventFetcher is interrupted.. Returning
2017-07-20 18:07:04,525 INFO [pool-6-thread-1] mapred.LocalJobRunner (LocalJobRunner.java:statusUpdate(591)) - 1 / 1 copied.
2017-07-20 18:07:04,525 INFO [pool-6-thread-1] reduce.MergeManagerImpl (MergeManagerImpl.java:finalMerge(667)) - finalMerge called with 1 in-memory map-outputs and 0 on-disk map-outputs
2017-07-20 18:07:04,534 INFO [pool-6-thread-1] mapred.Merger (Merger.java:merge(591)) - Merging 1 sorted segments
2017-07-20 18:07:04,534 INFO [pool-6-thread-1] mapred.Merger (Merger.java:merge(690)) - Down to the last merge-pass, with 1 segments left of total size: 65 bytes
2017-07-20 18:07:04,536 INFO [pool-6-thread-1] reduce.MergeManagerImpl (MergeManagerImpl.java:finalMerge(742)) - Merged 1 segments, 73 bytes to disk to satisfy reduce memory limit
2017-07-20 18:07:04,536 INFO [pool-6-thread-1] reduce.MergeManagerImpl (MergeManagerImpl.java:finalMerge(772)) - Merging 1 files, 77 bytes from disk
2017-07-20 18:07:04,537 INFO [pool-6-thread-1] reduce.MergeManagerImpl (MergeManagerImpl.java:finalMerge(787)) - Merging 0 segments, 0 bytes from memory into reduce
2017-07-20 18:07:04,537 INFO [pool-6-thread-1] mapred.Merger (Merger.java:merge(591)) - Merging 1 sorted segments
2017-07-20 18:07:04,537 INFO [pool-6-thread-1] mapred.Merger (Merger.java:merge(690)) - Down to the last merge-pass, with 1 segments left of total size: 65 bytes
2017-07-20 18:07:04,538 INFO [pool-6-thread-1] mapred.LocalJobRunner (LocalJobRunner.java:statusUpdate(591)) - 1 / 1 copied.
2017-07-20 18:07:04,572 INFO [pool-6-thread-1] Configuration.deprecation (Configuration.java:warnOnceIfDeprecated(1019)) - mapred.skip.on is deprecated. Instead, use mapreduce.job.skiprecords
WordReduce rudece...
---------------------values:
count:1, sum:1
Rudece key:Cheng, sum :1
Rudece context:[email protected]5e76ee18, sum :1
WordReduce rudece...
---------------------values:
count:1, sum:1
Rudece key:Malik, sum :1
Rudece context:[email protected]5e76ee18, sum :1
WordReduce rudece...
---------------------values:
count:1, sum:1
count:1, sum:2
Rudece key:am, sum :2
Rudece context:[email protected]5e76ee18, sum :2
WordReduce rudece...
---------------------values:
count:1, sum:1
Rudece key:hadoop, sum :1
Rudece context:[email protected]5e76ee18, sum :1
WordReduce rudece...
---------------------values:
count:1, sum:1
count:1, sum:2
Rudece key:i, sum :2
Rudece context:[email protected]5e76ee18, sum :2
2017-07-20 18:07:04,700 INFO [pool-6-thread-1] mapred.Task (Task.java:done(1001)) - Task:attempt_local130204698_0001_r_000000_0 is done. And is in the process of committing
2017-07-20 18:07:04,703 INFO [pool-6-thread-1] mapred.LocalJobRunner (LocalJobRunner.java:statusUpdate(591)) - 1 / 1 copied.
2017-07-20 18:07:04,703 INFO [pool-6-thread-1] mapred.Task (Task.java:commit(1162)) - Task attempt_local130204698_0001_r_000000_0 is allowed to commit now
2017-07-20 18:07:04,713 INFO [pool-6-thread-1] output.FileOutputCommitter (FileOutputCommitter.java:commitTask(439)) - Saved output of task 'attempt_local130204698_0001_r_000000_0' to hdfs://master:9000/user/wordcount/output1/_temporary/0/task_local130204698_0001_r_000000
2017-07-20 18:07:04,715 INFO [pool-6-thread-1] mapred.LocalJobRunner (LocalJobRunner.java:statusUpdate(591)) - reduce > reduce
2017-07-20 18:07:04,715 INFO [pool-6-thread-1] mapred.Task (Task.java:sendDone(1121)) - Task 'attempt_local130204698_0001_r_000000_0' done.
2017-07-20 18:07:04,716 INFO [pool-6-thread-1] mapred.LocalJobRunner (LocalJobRunner.java:run(325)) - Finishing task: attempt_local130204698_0001_r_000000_0
2017-07-20 18:07:04,716 INFO [Thread-12] mapred.LocalJobRunner (LocalJobRunner.java:runTasks(456)) - reduce task executor complete.
2017-07-20 18:07:04,765 INFO [main] mapreduce.Job (Job.java:monitorAndPrintJob(1355)) - Job job_local130204698_0001 running in uber mode : false
2017-07-20 18:07:04,766 INFO [main] mapreduce.Job (Job.java:monitorAndPrintJob(1362)) - map 100% reduce 100%
2017-07-20 18:07:04,767 INFO [main] mapreduce.Job (Job.java:monitorAndPrintJob(1373)) - Job job_local130204698_0001 completed successfully
2017-07-20 18:07:04,793 INFO [main] mapreduce.Job (Job.java:monitorAndPrintJob(1380)) - Counters: 38
File System Counters
FILE: Number of bytes read=502
FILE: Number of bytes written=457295
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=58
HDFS: Number of bytes written=34
HDFS: Number of read operations=15
HDFS: Number of large read operations=0
HDFS: Number of write operations=4
Map-Reduce Framework
Map input records=2
Map output records=7
Map output bytes=57
Map output materialized bytes=77
Input split bytes=107
Combine input records=0
Combine output records=0
Reduce input groups=5
Reduce shuffle bytes=77
Reduce input records=7
Reduce output records=5
Spilled Records=14
Shuffled Maps =1
Failed Shuffles=0
Merged Map outputs=1
GC time elapsed (ms)=0
CPU time spent (ms)=0
Physical memory (bytes) snapshot=0
Virtual memory (bytes) snapshot=0
Total committed heap usage (bytes)=396361728
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=29
File Output Format Counters
Bytes Written=34