1. 程式人生 > 其它 >Mapreduce例項——倒排索引

Mapreduce例項——倒排索引

現有某電商網站的3張資訊資料表,分別為商品庫表goods3,商品訪問情況表goods_visit3,訂單明細表order_items3,goods表記錄了商品的狀態資料,goods_visit3記錄了商品的點選情況,order_items3記錄了使用者購買的商品的資訊資料,它們的表結構及內容如下:

商品ID,商品點選次數
1024600,2
1024593,0
1024592,0
1024590,0
1024589,0
1024588,0
1024587,0
1024586,0
1024585,0
1024584,0
goods_visit3(goods_id,click_num)
明細ID,訂單ID,商品ID,購買資料,商品銷售價格,商品最終單價,商品金額
251688,52107,1024600,1,31.6,31.6,15.8 252165,52209,1024600,1,31.6,31.6,15.8 251870,52146,1024481,1,15.6,15.6,7.8 251935,52158,1024481,1,15.6,15.6,7.8 252415,52264,1024480,1,69.0,69.0,69.0 250983,51937,1024480,1,69.0,69.0,69.0 252609,52299,1024480,1,69.0,69.0,69.0 251689,52107,1024440,1,31.6,31.6,15.8 239369,49183,1024256,1,759.0,759.0,759.0 249222,51513,1024140,1,198.0,198.0,198.0
order_items3(item_id,order_id,goods_id,goods_number,shop_price,goods_price,goods_amount)
商品ID,商品狀態,分類ID,評分
1024600,6,52006,0
1024593,1,52121,0
1024592,1,52121,0
1024590,1,52119,0
1024589,1,52119,0
1024588,1,52030,0
1024587,1,52021,0
1024586,1,52029,0
1024585,1,52014,0
1024584,1,52029,0
goods3(goods_id,goods_status,cat_id,goods_score)

通過mapreduce統計goods_id相同的商品都在哪幾張表並統計出現了多少次:

package mapreduce9;

import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

//08.Mapreduce例項——倒排索引
public class MyIndex {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Job job = Job.getInstance();
        job.setJobName("InversedIndexTest");
        job.setJarByClass(MyIndex.class);

        job.setMapperClass(doMapper.class);
        job.setCombinerClass(doCombiner.class);
        job.setReducerClass(doReducer.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        Path in1 = new Path("hdfs://192.168.51.100:8020/mymapreduce9/in/goods3");
        Path in2 = new Path("hdfs://192.168.51.100:8020/mymapreduce9/in/goods_visit3");
        Path in3 = new Path("hdfs://192.168.51.100:8020/mymapreduce9/in/order_items3");
        Path out = new Path("hdfs://192.168.51.100:8020/mymapreduce9/out");

        FileInputFormat.addInputPath(job, in1);
        FileInputFormat.addInputPath(job, in2);
        FileInputFormat.addInputPath(job, in3);
        FileOutputFormat.setOutputPath(job, out);

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }

    public static class doMapper extends Mapper<Object, Text, Text, Text>{
        public static Text myKey = new Text();
        public static Text myValue = new Text();
        //private FileSplit filePath;

        @Override
        protected void map(Object key, Text value, Context context)
                throws IOException, InterruptedException {
            String filePath=((FileSplit)context.getInputSplit()).getPath().toString();
            if(filePath.contains("goods")){
                String val[]=value.toString().split(",");
                int splitIndex =filePath.indexOf("goods");
                myKey.set(val[0] + ":" + filePath.substring(splitIndex));
            }else if(filePath.contains("order")){
                String val[]=value.toString().split(",");
                int splitIndex =filePath.indexOf("order");
                myKey.set(val[2] + ":" + filePath.substring(splitIndex));
            }
            myValue.set("1");
            context.write(myKey, myValue);
        }
    }
    public static class doCombiner extends Reducer<Text, Text, Text, Text>{
        public static Text myK = new Text();
        public static Text myV = new Text();

        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context)
                throws IOException, InterruptedException {
            int sum = 0 ;
            for (Text value : values) {
                sum += Integer.parseInt(value.toString());
            }
            int mysplit = key.toString().indexOf(":");
            myK.set(key.toString().substring(0, mysplit));
            myV.set(key.toString().substring(mysplit + 1) + ":" + sum);
            context.write(myK, myV);
        }
    }

    public static class doReducer extends Reducer<Text, Text, Text, Text>{

        public static Text myK = new Text();
        public static Text myV = new Text();

        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context)
                throws IOException, InterruptedException {

            String myList = new String();

            for (Text value : values) {
                myList += value.toString() + ";";
            }
            myK.set(key);
            myV.set(myList);
            context.write(myK, myV);
        }
    }
}

結果:

原理:

"倒排索引"是文件檢索系統中最常用的資料結構,被廣泛地應用於全文搜尋引擎。它主要是用來儲存某個單詞(或片語)在一個文件或一組文件中的儲存位置的對映,即提供了一種根據內容來查詢文件的方式。由於不是根據文件來確定文件所包含的內容,而是進行相反的操作,因而稱為倒排索引(Inverted Index)。

實現"倒排索引"主要關注的資訊為:單詞、文件URL及詞頻。

(1)Map過程

首先使用預設的TextInputFormat類對輸入檔案進行處理,得到文字中每行的偏移量及其內容。顯然,Map過程首先必須分析輸入的<key,value>對,得到倒排索引中需要的三個資訊:單詞、文件URL和詞頻,接著我們對讀入的資料利用Map操作進行預處理,如下圖所示:

這裡存在兩個問題:第一,<key,value>對只能有兩個值,在不使用Hadoop自定義資料型別的情況下,需要根據情況將其中兩個值合併成一個值,作為key或value值。第二,通過一個Reduce過程無法同時完成詞頻統計和生成文件列表,所以必須增加一個Combine過程完成詞頻統計。

這裡將商品ID和URL組成key值(如"1024600:goods3"),將詞頻(商品ID出現次數)作為value,這樣做的好處是可以利用MapReduce框架自帶的Map端排序,將同一文件的相同單詞的詞頻組成列表,傳遞給Combine過程,實現類似於WordCount的功能。

(2)Combine過程

經過map方法處理後,Combine過程將key值相同的value值累加,得到一個單詞在文件中的詞頻,如下圖所示。如果直接將下圖所示的輸出作為Reduce過程的輸入,在Shuffle過程時將面臨一個問題:所有具有相同單詞的記錄(由單詞、URL和詞頻組成)應該交由同一個Reducer處理,但當前的key值無法保證這一點,所以必須修改key值和value值。這次將單詞(商品ID)作為key值,URL和詞頻組成value值(如"goods3:1")。這樣做的好處是可以利用MapReduce框架預設的HashPartitioner類完成Shuffle過程,將相同單詞的所有記錄傳送給同一個Reducer進行處理。

(3)Reduce過程

經過上述兩個過程後,Reduce過程只需將相同key值的所有value值組合成倒排索引檔案所需的格式即可,剩下的事情就可以直接交給MapReduce框架進行處理了。如下圖所示