03.Mapreduce例項——排序
實驗原理
Map、Reduce任務中Shuffle和排序的過程圖如下:
流程分析:
1.Map端:
(1)每個輸入分片會讓一個map任務來處理,預設情況下,以HDFS的一個塊的大小(預設為64M)為一個分片,當然我們也可以設定塊的大小。map輸出的結果會暫且放在一個環形記憶體緩衝區中(該緩衝區的大小預設為100M,由io.sort.mb屬性控制),當該緩衝區快要溢位時(預設為緩衝區大小的80%,由io.sort.spill.percent屬性控制),會在本地檔案系統中建立一個溢位檔案,將該緩衝區中的資料寫入這個檔案。
(2)在寫入磁碟之前,執行緒首先根據reduce任務的數目將資料劃分為相同數目的分割槽,也就是一個reduce任務對應一個分割槽的資料。這樣做是為了避免有些reduce任務分配到大量資料,而有些reduce任務卻分到很少資料,甚至沒有分到資料的尷尬局面。其實分割槽就是對資料進行hash的過程。然後對每個分割槽中的資料進行排序,如果此時設定了Combiner,將排序後的結果進行Combia操作,這樣做的目的是讓儘可能少的資料寫入到磁碟。
(3)當map任務輸出最後一個記錄時,可能會有很多的溢位檔案,這時需要將這些檔案合併。合併的過程中會不斷地進行排序和combia操作,目的有兩個:①儘量減少每次寫入磁碟的資料量。②儘量減少下一複製階段網路傳輸的資料量。最後合併成了一個已分割槽且已排序的檔案。為了減少網路傳輸的資料量,這裡可以將資料壓縮,只要將mapred.compress.map.out設定為true就可以了。
(4)將分割槽中的資料拷貝給相對應的reduce任務。有人可能會問:分割槽中的資料怎麼知道它對應的reduce是哪個呢?其實map任務一直和其父TaskTracker保持聯絡,而TaskTracker又一直和JobTracker保持心跳。所以JobTracker中儲存了整個叢集中的巨集觀資訊。只要reduce任務向JobTracker獲取對應的map輸出位置就ok了哦。
到這裡,map端就分析完了。那到底什麼是Shuffle呢?Shuffle的中文意思是“洗牌”,如果我們這樣看:一個map產生的資料,結果通過hash過程分割槽卻分配給了不同的reduce任務,是不是一個對資料洗牌的過程呢?
2.Reduce端:
(1)Reduce會接收到不同map任務傳來的資料,並且每個map傳來的資料都是有序的。如果reduce端接受的資料量相當小,則直接儲存在記憶體中(緩衝區大小由mapred.job.shuffle.input.buffer.percent屬性控制,表示用作此用途的堆空間的百分比),如果資料量超過了該緩衝區大小的一定比例(由mapred.job.shuffle.merge.percent決定),則對資料合併後溢寫到磁碟中。
(2)隨著溢寫檔案的增多,後臺執行緒會將它們合併成一個更大的有序的檔案,這樣做是為了給後面的合併節省時間。其實不管在map端還是reduce端,MapReduce都是反覆地執行排序,合併操作,現在終於明白了有些人為什麼會說:排序是hadoop的靈魂。
(3)合併的過程中會產生許多的中間檔案(寫入磁碟了),但MapReduce會讓寫入磁碟的資料儘可能地少,並且最後一次合併的結果並沒有寫入磁碟,而是直接輸入到reduce函式。
熟悉MapReduce的人都知道:排序是MapReduce的天然特性!在資料達到reducer之前,MapReduce框架已經對這些資料按鍵排序了。但是在使用之前,首先需要了解它的預設排序規則。它是按照key值進行排序的,如果key為封裝的int為IntWritable型別,那麼MapReduce按照數字大小對key排序,如果Key為封裝String的Text型別,那麼MapReduce將按照資料字典順序對字元排序。
瞭解了這個細節,我們就知道應該使用封裝int的Intwritable型資料結構了,也就是在map這裡,將讀入的資料中要排序的欄位轉化為Intwritable型,然後作為key值輸出(不排序的欄位作為value)。reduce階段拿到<key,value-list>之後,將輸入的key作為的輸出key,並根據value-list中的元素的個數決定輸出的次數。
實驗步驟
1.在Linux中開啟Hadoop
start-all.sh
2.在Linux本地新建/data/mapreduce3目錄。
mkdir-p/data/mapreduce3
3.下載hadoop2lib,解壓到mapreduce資料夾下
unzip hadoop2lib.zip
4.在HDFS上新建/mymapreduce3/in目錄,然後將Linux本地/data/mapreduce3目錄下的goods_click檔案匯入到HDFS的/mymapreduce3/in目錄中。
hadoop fs -mkdir -p /mymapreduce3/in
hadoop fs -put /data/mapreduce3/goods_click /mymapreduce3/in
注意:檔案需要注意檔案格式,資料後有隱藏的空格會導致API中讀取失敗,行末尾的空格應該取消掉,中間使用逗號分隔開
5.在IDEA中編寫程式碼
package mapreduce3; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; public class OneSort { public static class Map extends Mapper<Object , Text , IntWritable,Text >{ private static Text goods=new Text(); private static IntWritable num=new IntWritable(); public void map(Object key,Text value,Context context) throws IOException, InterruptedException{ String line=value.toString(); String arr[]=line.split(","); num.set(Integer.parseInt(arr[1])); goods.set(arr[0]); context.write(num,goods); } } public static class Reduce extends Reducer< IntWritable, Text, IntWritable, Text>{ private static IntWritable result= new IntWritable(); public void reduce(IntWritable key,Iterable<Text> values,Context context) throws IOException, InterruptedException{ for(Text val:values){ context.write(key,val); } } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException{ Configuration conf=new Configuration(); Job job =new Job(conf,"OneSort"); job.setJarByClass(OneSort.class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(Text.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); Path in=new Path("hdfs://192.168.149.10:9000/mymapreduce3/in/goods_visit1"); Path out=new Path("hdfs://192.168.149.10:9000/mymapreduce3/out"); FileInputFormat.addInputPath(job,in); FileOutputFormat.setOutputPath(job,out); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
6.建立resources資料夾,其中建立log4j.properties檔案
hadoop.root.logger=DEBUG, console log4j.rootLogger = DEBUG, console log4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console.target=System.out log4j.appender.console.layout=org.apache.log4j.PatternLayout log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n
7.匯入hadoop2lib的包
8.執行結果
PS:記得修改許可權