MapReduce剖析筆記之一:從WordCount理解MapReduce的幾個階段
阿新 • • 發佈:2019-02-08
package org.apache.hadoop.examples; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class WordCount { public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{ private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context ) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } } public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: wordcount <in> <out>"); System.exit(2); } Job job = new Job(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
MapReduce即將一個計算任務分為兩個階段:Map、Reduce。為什麼要這麼分解? 為了理解其含義,我們先不管MapReduce這一套框架,從一個簡單的問題來看,如果對於100T的日誌檔案,需要統計其中出現的"ERROR"這個單詞的次數,怎麼辦? 最簡單的方法:單機處理,逐行讀入每一行文字,統計並累加,則得到其值。問題是:因為資料量太大,速度太慢,怎麼辦?自然,多機並行處理就是一個自然的選擇。 那麼,這個檔案怎麼切分到多個機器呢?假定有100臺機器,可以寫一個主程式,將這個100T大檔案按照每個機器儲存1T的原則,在100臺機器上分佈儲存,再把原來單機上的程式拷貝100份(無需修改)至100臺機器上執行得到結果,此時得到的結果只是一箇中間結果,最後需要寫一個彙總程式,將統計結果進行累加,則完成計算。將大檔案分解後,對單機1T檔案計算的過程就相當於Map,而Map的結果就相當於"ERROR"這個單詞在本機1T檔案中出現的次數,而最後的彙總程式就相當於Reduce,Reduce的輸入來源於100臺機器。在這個簡單的例子中,有100個Map任務,1個Reduce任務。 100臺機器計算後的中間結果需要傳遞到Reduce任務所在機器上,這個過程就是Shuffle,Shuffle這個單詞的含義是”洗牌“,也就是將中間結果從Map所在機器傳輸到Reduce所在機器,在這個過程中,存在網路傳輸。 此時,我們利用上面的例子已經理解了Map-Shuffle-Reduce的基本含義,在上面的例子中,如果還需要對”WARNING“這個單詞進行統計,那麼怎麼辦呢?此時,每個Map任務就不僅需要統計本機1T檔案中ERROR的個數,還需要統計WARNING的次數,然後在Reduce程式中分別進行統計。如果需要對所有單詞進行統計呢?一個道理,每個Map任務對1T檔案中所有單詞進行統計計數,然後 Reduce對所有結果進行彙總,得到所有單詞在100T大檔案中出現的次數。此時,問題可能出現了,因為單詞數量可能很多,Reduce用單機處理也可能存在瓶頸了,於是我們需要考慮用多臺機器平行計算Reduce,假如用2臺機器,因為Reduce只是對單詞進行計數累加,所有可以按照這樣簡單的規則進行:大寫字母A-Z開頭的單詞由Reduce 1累加;小寫字母a-z開頭的單詞由Reduce 2累加。在這種情況下,100個Map任務執行後的結果,都需要分為兩部分,一部分準備送到Reduce 1統計,一部分準備送到Reduce 2統計,這個功能稱為Partitioner,即將Map後的結果(比如一個文字檔案,記錄了各個單詞在本機檔案出現的次數)分解為兩部分(比如兩個文字 檔案),準備送到兩個Reduce任務。 因此,Shuffle在這裡就是從100個Map任務和2個Reduce任務之間傳輸中間結果的過程。
我們繼續考慮幾個問題:
1、 如果Map後的中間結果資料量較大,Shuffle過程對網路頻寬要求較高,因此需要將Map後的結果儘可能減小,這個功能當然可以在Map內自己搞 定,不過MapReduce將這個功能單獨拎出來,稱為Combiner,即合併,這個Combiner,指的是Map任務後中間結果的合併,相比於 Reduce的最終合併,這裡相當於先進行一下區域性彙總,減小中間結果,進而減小網路傳輸量。所以,在上面的例子中,假如Map並不計數,只是記錄單詞出現這個資訊,輸出結果是<ERROR,1>,<WARNING,1>,<WARNING,1>.....這樣一個Key-Value序列,Combiner可以進行區域性彙總,將Key相同的Value進行累加,形成一個新的Key-Value序列:<ERROR,14>,<WARNING,27>,.....,這樣就大大減小了Shuffle需要的網路頻寬,要知道現在資料中心一般使用千兆乙太網,好些的使用萬兆乙太網,TCP/IP傳輸的效率不太高。這裡Combiner彙總函式實際上可以與Reduce的彙總函式一致,只是輸入資料不同。
2、 來自100個Map任務後的結果分別送到兩個Reduce任務處理。對於任何一個Reduce任務,輸入是一堆<ERROR,14>這樣的 Key-Value序列,因為100個Map任務都有可能統計到ERROR的次數,因此這裡會先進行一個歸併,即將相同單詞的歸併到一起,形 成<ERROR, <14,36,.....>>,<WARNING,<27,45,...>>這樣一個仍然是Key-Value的 序列,14、36、。。。分別表示第1、2、。。。臺機器中ERROR的統計次數,這個歸併過程在MapReduce中稱為Merge。如果merge後 再進行Reduce,那麼就只需要統計即可;如果事先沒有merge,那麼Reduce自己完成這一功能也行,只是兩種情況下Reduce的輸入Key- Value形式不同。
3、如果要求最後的單詞統計結果還要形成字典序怎麼辦呢?可以自己在 Reduce中進行全排序,也可以100個Map任務中分別進行區域性排序,然後將結果發到Reduce任務時,再進行歸併排序。這個過程 MapReduce也內建支援,因此不需要使用者自己去寫排序程式,這個過程在MapReduce中稱為Sort。
到這裡,我們理解了MapReduce中的幾個典型步驟:Map、Sort、Partitioner、 Combiner、Shuffle、Merge、Reduce。MapReduce程式之所以稱為MapReduce,就說明Map、Reduce這兩個 步驟對於一個平行計算來說幾乎是必須的,你總得先分開算吧,所以必須有Map;你總得彙總吧,所以有Reduce。當然,理論上也可以不需要 Reduce,如果Map後就得到你要的結果的話。
Sort對於不需要順序的程式裡沒意義(但MapReduce預設做了排序);
Partitioner對於Reduce只有一個的時候沒意義,如果有多個Reduce,則需要,至於怎麼分,使用者可以繼承Partitioner標準類,自己實現分解函式。控制中間結果如何傳輸。MapReduce提供的標準的Partitioner是 一個介面,使用者可以自己實現getPartition()函式,MapReduce也提供了幾個基本的實現,最典型的HashPartitioner是根 據使用者設定的Reduce任務數量(注意,MapReduce中,Map任務的個數最終取決於資料分佈,Reduce則是使用者直接指定),按照雜湊進行計算的:
public class HashPartitioner<K2, V2> implements Partitioner<K2, V2> {
public void configure(JobConf job) {}
/** Use {@link Object#hashCode()} to partition. */
public int getPartition(K2 key, V2 value,int numReduceTasks) {
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
}
這裡,numReduceTasks就是使用者設定的Reduce任務數量; K2 key, V2 value 就是Map計算後的中間結果。 Combiner可以選擇性放棄,但考慮到網路頻寬,可以自己寫相應的函式實現區域性合併功能。很多情況下,直接利用Reduce那個程式即可,WordCount這個標準程式裡就是這麼用的。 Shuffle自然是必須的,不用寫,根據Partitioner邏輯,框架自己去執行結果傳輸。 Merge也不是必須的,可以揉到Reduce裡面實現等等也可以。因為這些操作的資料結構都是Key-Value,Reduce的輸入只要是一個Key-Value即可,相當靈活。 我們再來看WordCount,這個MapReduce程式中定義了一個類:
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>
而Mapper是Hadoop中的一個介面,其定義為:
public interface Mapper<K1, V1, K2, V2> extends JobConfigurable, Closeable {
/**
* Maps a single input key/value pair into an intermediate key/value pair.
*
* <p>Output pairs need not be of the same types as input pairs. A given
* input pair may map to zero or many output pairs. Output pairs are
* collected with calls to
* {@link OutputCollector#collect(Object,Object)}.</p>
*
* <p>Applications can use the {@link Reporter} provided to report progress
* or just indicate that they are alive. In scenarios where the application
* takes an insignificant amount of time to process individual key/value
* pairs, this is crucial since the framework might assume that the task has
* timed-out and kill that task. The other way of avoiding this is to set
* <a href="{@docRoot}/../mapred-default.html#mapred.task.timeout">
* mapred.task.timeout</a> to a high-enough value (or even zero for no
* time-outs).</p>
*
* @param key the input key.
* @param value the input value.
* @param output collects mapped keys and values.
* @param reporter facility to report progress.
*/
void map(K1 key, V1 value, OutputCollector<K2, V2> output, Reporter reporter)
throws IOException;
}
因此,Mapper裡面並沒有規定輸入輸出的型別是什麼,只要是KeyValue的即可,K1、V1、K2、V2是什麼由使用者指定,反正只是實現K1、V1到K2、V2的對映即可。 在WordCount中實現了繼承於Mapper<Object, Text, Text, IntWritable>的一個TokenizerMapper類,實現了map函式:map(Object key, Text value, Context context ) ; TokenizerMapper中,輸入的Key-Value是<Object, Text>,輸出是<Text, IntWritable>,在WordCount程式裡,K1代表一行文字的起始位置,V1代表這一行文字; K2代表單詞,V2代表"1",用於後面的累和。 同樣,在MapReduce中,Reducer也是一個介面,其宣告為:
public interface Reducer<K2, V2, K3, V3> extends JobConfigurable, Closeable {
/**
* <i>Reduces</i> values for a given key.
*
* <p>The framework calls this method for each
* <code><key, (list of values)></code> pair in the grouped inputs.
* Output values must be of the same type as input values. Input keys must
* not be altered. The framework will <b>reuse</b> the key and value objects
* that are passed into the reduce, therefore the application should clone
* the objects they want to keep a copy of. In many cases, all values are
* combined into zero or one value.
* </p>
*
* <p>Output pairs are collected with calls to
* {@link OutputCollector#collect(Object,Object)}.</p>
*
* <p>Applications can use the {@link Reporter} provided to report progress
* or just indicate that they are alive. In scenarios where the application
* takes an insignificant amount of time to process individual key/value
* pairs, this is crucial since the framework might assume that the task has
* timed-out and kill that task. The other way of avoiding this is to set
* <a href="{@docRoot}/../mapred-default.html#mapred.task.timeout">
* mapred.task.timeout</a> to a high-enough value (or even zero for no
* time-outs).</p>
*
* @param key the key.
* @param values the list of values to reduce.
* @param output to collect keys and combined values.
* @param reporter facility to report progress.
*/
void reduce(K2 key, Iterator<V2> values,
OutputCollector<K3, V3> output, Reporter reporter)
throws IOException;
}
Reducer的輸入為K2, V2(這個對應於Mapper輸出的經過Shuffle到達Reducer端的K2,V2,), 輸出為K3, V3。
在WordCount中,K2為單詞,V2為1這個固定值(或者為區域性出現次數,取決於是否有Combiner);K3還是單詞,V3就是累和值。 而WordCount裡存在繼承於Reducer<Text, IntWritable, Text, IntWritable>的IntSumReducer類,完成單詞計數累加功能。 對於Combiner,實際上MapReduce沒有Combiner這個基類(WordCount自然也沒有實現),從任務的提交函式來看:
public void setCombinerClass(Class<? extends Reducer> cls
) throws IllegalStateException {
ensureState(JobState.DEFINE);
conf.setClass(COMBINE_CLASS_ATTR, cls, Reducer.class);
}
可以看出,Combiner使用的類實際上符合Reducer。兩者是一樣的。 再來看作業提交程式碼: 在之前先說一下Job和Task的區別,一個MapReduce執行流程稱為一個Job,中文稱“作業”。 在傳統的分散式計算領域,一個Job分為多個Task執行。Task中文一般稱為任務,在Hadoop中,這種任務有兩種:Map和Reduce 所以下面說到Map和Reduce時,指的是任務;說到整個流程時,指的是作業。不過由於疏忽,可能會將作業稱為任務的情況。 根據上下文容易區分出來。 1 Job job = new Job(conf, "word count"); 2 job.setJarByClass(WordCount.class); 3 job.setMapperClass(TokenizerMapper.class); 4 job.setCombinerClass(IntSumReducer.class); 5 job.setReducerClass(IntSumReducer.class); 6 job.setOutputKeyClass(Text.class); 7 job.setOutputValueClass(IntWritable.class); 8 FileInputFormat.addInputPath(job, new Path(otherArgs[0])); 9 FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); 10 System.exit(job.waitForCompletion(true) ? 0 : 1);
第1行建立一個Job物件,Job是MapReduce中提供的一個作業類,其宣告為:
public class Job extends JobContext {
public static enum JobState {DEFINE, RUNNING};
private JobState state = JobState.DEFINE;
private JobClient jobClient;
private RunningJob info;
.......
之後,設定該作業的執行類,也就是WordCount這個類; 然後設定Map、Combiner、Reduce三個實現類; 之後,設定輸出Key和Value的類,這兩個類表明了MapReduce作業完畢後的結果。 Key即單詞,為一個Text物件,Text是Hadoop提供的一個可以序列化的文字類; Value為計數,為一個IntWritable物件,IntWritable是Hadoop提供的一個可以序列化的整數類。 之所以不用普通的String和int,是因為輸出Key、 Value需要寫入HDFS,因此Key和Value都要可寫,這種可寫能力在Hadoop中使用一個介面Writable表示,其實就相當於序列化,換句話說,Key、Value必須得有可序列化的能力。Writable的宣告為:
public interface Writable {
/**
* Serialize the fields of this object to <code>out</code>.
*
* @param out <code>DataOuput</code> to serialize this object into.
* @throws IOException
*/
void write(DataOutput out) throws IOException;
/**
* Deserialize the fields of this object from <code>in</code>.
*
* <p>For efficiency, implementations should attempt to re-use storage in the
* existing object where possible.</p>
*
* @param in <code>DataInput</code> to deseriablize this object from.
* @throws IOException
*/
void readFields(DataInput in) throws IOException;
}
在第8、9行,還設定了要計算的檔案在HDFS中的路徑,設定好這些配置和引數後,執行作業提交:job.waitForCompletion(true)
waitForCompletion是Job類中實現的一個方法:
public boolean waitForCompletion(boolean verbose
) throws IOException, InterruptedException,
ClassNotFoundException {
if (state == JobState.DEFINE) {
submit();
}
if (verbose) {
jobClient.monitorAndPrintJob(conf, info);
} else {
info.waitForCompletion();
}
return isSuccessful();
}
即執行submit函式:
public void submit() throws IOException, InterruptedException,
ClassNotFoundException {
ensureState(JobState.DEFINE);
setUseNewAPI();
// Connect to the JobTracker and submit the job
connect();
info = jobClient.submitJobInternal(conf);
super.setJobID(info.getID());
state = JobState.RUNNING;
}
其中,呼叫jobClient物件的submitJobInternal方法進行作業提交。jobClient是 JobClient物件,在執行connect()的時候即創建出來:
private void connect() throws IOException, InterruptedException {
ugi.doAs(new PrivilegedExceptionAction<Object>() {
public Object run() throws IOException {
jobClient = new JobClient((JobConf) getConfiguration());
return null;
}
});
}
建立JobClient的引數是這個作業的配置資訊,JobClient是MapReduce作業的客戶端部分,主要用於提交作業等等。而具體的作業提交在submitJobInternal方法中實現,關於submitJobInternal的具體實現,包括MapReduce的作業執行流程,較為複雜,留作下一節描述。 關於MapReduce的這一流程,我們也可以看出一些特點: 1、 Map任務之間是不通訊的,這與傳統的MPI(Message Passing Interface)存在本質區別,這就要求劃分後的任務具有獨立性。這個要求一方面限制了MapReduce的應用場合,但另一方面對於任務執行出錯後的處理十分方便,比如執行某個Map任務的機器掛掉了,可以不管其他Map任務,重新在另一臺機器上執行一遍即可。因為底層的資料在HDFS裡面,有3 份備份,所以資料冗餘搭配上Map的重執行這一能力,可以將叢集計算的容錯性相比MPI而言大大增強。後續博文會對MPI進行剖析,也會對 MapReduce與傳統高效能運算中的平行計算框架進行比較。 2、Map任務的分配與資料的分佈關係十分密切,對於上面的例子,這個100T的大檔案分佈在多臺機器上,MapReduce框架會根據檔案的實際儲存位置分配Map任務,這一過程需要對HDFS有好的理解,在後續博文中會對HDFS中進行剖析。到時候,能更好滴理解MapReduce框架。因為兩者是搭配起來使用的。 3、 MapReduce的輸入資料來自於HDFS,輸出結果也寫到HDFS。如果一個事情很複雜,需要分成很多個MapReduce作業反覆執行,那麼就需要來來回回地從磁碟中搬移資料的過程,速度很慢,後續博文會對Spark這一記憶體計算框架進行剖析,到時候,能更好滴理解MapReduce效能。 4、MapReduce的輸入資料和輸出結果也可以來自於HBase,HBase本身搭建於HDFS之上(理論上也可以搭建於其他檔案系統),這種應用場合大多需要MapReduce處理一些海量結構化資料。後續博文會對HBase進行剖析。轉帖:http://www.cnblogs.com/esingchan/p/3917094.html?utm_source=tuicool&utm_medium=referral