Mapreduce例項——倒排索引
現有某電商網站的3張資訊資料表,分別為商品庫表goods3,商品訪問情況表goods_visit3,訂單明細表order_items3,goods表記錄了商品的狀態資料,goods_visit3記錄了商品的點選情況,order_items3記錄了使用者購買的商品的資訊資料,它們的表結構及內容如下:
商品ID,商品點選次數
1024600,2
1024593,0
1024592,0
1024590,0
1024589,0
1024588,0
1024587,0
1024586,0
1024585,0
1024584,0
goods_visit3(goods_id,click_num)
明細ID,訂單ID,商品ID,購買資料,商品銷售價格,商品最終單價,商品金額251688,52107,1024600,1,31.6,31.6,15.8 252165,52209,1024600,1,31.6,31.6,15.8 251870,52146,1024481,1,15.6,15.6,7.8 251935,52158,1024481,1,15.6,15.6,7.8 252415,52264,1024480,1,69.0,69.0,69.0 250983,51937,1024480,1,69.0,69.0,69.0 252609,52299,1024480,1,69.0,69.0,69.0 251689,52107,1024440,1,31.6,31.6,15.8 239369,49183,1024256,1,759.0,759.0,759.0 249222,51513,1024140,1,198.0,198.0,198.0
商品ID,商品狀態,分類ID,評分
1024600,6,52006,0
1024593,1,52121,0
1024592,1,52121,0
1024590,1,52119,0
1024589,1,52119,0
1024588,1,52030,0
1024587,1,52021,0
1024586,1,52029,0
1024585,1,52014,0
1024584,1,52029,0
goods3(goods_id,goods_status,cat_id,goods_score)通過mapreduce統計goods_id相同的商品都在哪幾張表並統計出現了多少次:
package mapreduce9; import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; //08.Mapreduce例項——倒排索引 public class MyIndex { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Job job = Job.getInstance(); job.setJobName("InversedIndexTest"); job.setJarByClass(MyIndex.class); job.setMapperClass(doMapper.class); job.setCombinerClass(doCombiner.class); job.setReducerClass(doReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); Path in1 = new Path("hdfs://192.168.51.100:8020/mymapreduce9/in/goods3"); Path in2 = new Path("hdfs://192.168.51.100:8020/mymapreduce9/in/goods_visit3"); Path in3 = new Path("hdfs://192.168.51.100:8020/mymapreduce9/in/order_items3"); Path out = new Path("hdfs://192.168.51.100:8020/mymapreduce9/out"); FileInputFormat.addInputPath(job, in1); FileInputFormat.addInputPath(job, in2); FileInputFormat.addInputPath(job, in3); FileOutputFormat.setOutputPath(job, out); System.exit(job.waitForCompletion(true) ? 0 : 1); } public static class doMapper extends Mapper<Object, Text, Text, Text>{ public static Text myKey = new Text(); public static Text myValue = new Text(); //private FileSplit filePath; @Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { String filePath=((FileSplit)context.getInputSplit()).getPath().toString(); if(filePath.contains("goods")){ String val[]=value.toString().split(","); int splitIndex =filePath.indexOf("goods"); myKey.set(val[0] + ":" + filePath.substring(splitIndex)); }else if(filePath.contains("order")){ String val[]=value.toString().split(","); int splitIndex =filePath.indexOf("order"); myKey.set(val[2] + ":" + filePath.substring(splitIndex)); } myValue.set("1"); context.write(myKey, myValue); } } public static class doCombiner extends Reducer<Text, Text, Text, Text>{ public static Text myK = new Text(); public static Text myV = new Text(); @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { int sum = 0 ; for (Text value : values) { sum += Integer.parseInt(value.toString()); } int mysplit = key.toString().indexOf(":"); myK.set(key.toString().substring(0, mysplit)); myV.set(key.toString().substring(mysplit + 1) + ":" + sum); context.write(myK, myV); } } public static class doReducer extends Reducer<Text, Text, Text, Text>{ public static Text myK = new Text(); public static Text myV = new Text(); @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { String myList = new String(); for (Text value : values) { myList += value.toString() + ";"; } myK.set(key); myV.set(myList); context.write(myK, myV); } } }
結果:
原理:
"倒排索引"是文件檢索系統中最常用的資料結構,被廣泛地應用於全文搜尋引擎。它主要是用來儲存某個單詞(或片語)在一個文件或一組文件中的儲存位置的對映,即提供了一種根據內容來查詢文件的方式。由於不是根據文件來確定文件所包含的內容,而是進行相反的操作,因而稱為倒排索引(Inverted Index)。
實現"倒排索引"主要關注的資訊為:單詞、文件URL及詞頻。
(1)Map過程
首先使用預設的TextInputFormat類對輸入檔案進行處理,得到文字中每行的偏移量及其內容。顯然,Map過程首先必須分析輸入的<key,value>對,得到倒排索引中需要的三個資訊:單詞、文件URL和詞頻,接著我們對讀入的資料利用Map操作進行預處理,如下圖所示:
這裡存在兩個問題:第一,<key,value>對只能有兩個值,在不使用Hadoop自定義資料型別的情況下,需要根據情況將其中兩個值合併成一個值,作為key或value值。第二,通過一個Reduce過程無法同時完成詞頻統計和生成文件列表,所以必須增加一個Combine過程完成詞頻統計。
這裡將商品ID和URL組成key值(如"1024600:goods3"),將詞頻(商品ID出現次數)作為value,這樣做的好處是可以利用MapReduce框架自帶的Map端排序,將同一文件的相同單詞的詞頻組成列表,傳遞給Combine過程,實現類似於WordCount的功能。
(2)Combine過程
經過map方法處理後,Combine過程將key值相同的value值累加,得到一個單詞在文件中的詞頻,如下圖所示。如果直接將下圖所示的輸出作為Reduce過程的輸入,在Shuffle過程時將面臨一個問題:所有具有相同單詞的記錄(由單詞、URL和詞頻組成)應該交由同一個Reducer處理,但當前的key值無法保證這一點,所以必須修改key值和value值。這次將單詞(商品ID)作為key值,URL和詞頻組成value值(如"goods3:1")。這樣做的好處是可以利用MapReduce框架預設的HashPartitioner類完成Shuffle過程,將相同單詞的所有記錄傳送給同一個Reducer進行處理。
(3)Reduce過程
經過上述兩個過程後,Reduce過程只需將相同key值的所有value值組合成倒排索引檔案所需的格式即可,剩下的事情就可以直接交給MapReduce框架進行處理了。如下圖所示