1. 程式人生 > >ElasticSearch倒排索引原理揭祕——基於mapreduce實現自己的倒排索引

ElasticSearch倒排索引原理揭祕——基於mapreduce實現自己的倒排索引

Elasticsearch簡單介紹

Elasticsearch (ES)是一個基於Lucene構建的開源、分散式、RESTful 介面全文搜尋引擎。Elasticsearch 還是一個分散式文件資料庫,其中每個欄位均是被索引的資料且可被搜尋,它能夠擴充套件至數以百計的伺服器儲存以及處理PB級的資料。它可以在很短的時間內在儲、搜尋和分析大量的資料。它通常作為具有複雜搜尋場景情況下的核心發動機。
Elasticsearch就是為高可用和可擴充套件而生的。可以通過購置效能更強的伺服器來完成

Elasticsearch優勢

1.橫向可擴充套件性:只需要增加臺伺服器,做一點兒配置,啟動一下Elasticsearch就可以併入叢集。

2.分片機制提供更好的分佈性:同一個索引分成多個分片(sharding), 這點類似於HDFS的塊機制;分而治之的方式可提升處理效率。

3.高可用:提供複製( replica) 機制,一個分片可以設定多個複製,使得某臺伺服器在宕機的情況下,叢集仍舊可以照常執行,並會把伺服器宕機丟失的資料資訊複製恢復到其他可用節點上。
4.使用簡單:共需一條命令就可以下載檔案,然後很快就能搭建一一個站內搜尋引擎。

Elasticsearch應用場景

大型分散式日誌分析系統ELK  elasticsearch(儲存日誌)+logstash(收集日誌)+kibana(展示資料)

大型電商商品搜尋系統、網盤搜尋引擎等。

什麼是倒排索引

倒排表以字或詞為關鍵字進行索引,表中關鍵字所對應的記錄表項記錄了出現這個字或詞的所有文件,一個表項就是一個字表段,它記錄該文件的ID和字元在該文件中出現的位置情況。
由於每個字或詞對應的文件數量在動態變化,所以倒排表的建立和維護都較為複雜,但是在查詢的時候由於可以一次得到查詢關鍵字所對應的所有文件,所以效率高於正排表。在全文檢索中,檢索的快速響應是一個最為關鍵的效能,而索引建立由於在後臺進行,儘管效率相對低一些,但不會影響整個搜尋引擎的效率。

倒排索引案例分析

文件內容:

序號

文件內容

1

小俊是一家科技公司創始人,開的汽車是奧迪a8l,加速爽。

2

小薇是一家科技公司的前臺,開的汽車是保時捷911

3

小紅買了小薇的保時捷911,加速爽。

4

小明是一家科技公司開發主管,開的汽車是奧迪a6l,加速爽。

5

小軍是一家科技公司開發,開的汽車是比亞迪速銳,加速有點慢

倒排索引會對以上文件內容進行關鍵詞分詞,可以使用關鍵次直接定位到文件內容。

單詞ID

單詞

倒排列表docId

1

1,2,3,4,5

2

一家

1,2,4,5

3

科技公司

1,2,4,5

4

開發

4,5

5

汽車

1,2,4,5

6

奧迪

1,4

7

加速爽

1,3,4

8

保時捷

2,3

9

保時捷911

2

10

比亞迪

5

基於mapreduce純手寫打造自己的倒排索引

需求:有大量的文字文件,如下所示:
a.txt
hello tom
hello jim
hello kitty
hello rose

b.txt
hello jerry
hello jim
hello kitty
hello jack

c.txt
hello jerry
hello java
hello c++
hello c++

需要得到以下結果:
hello  a.txt-->4  b.txt-->4  c.txt-->4
java   c.txt-->1
jerry  b.txt-->1  c.txt-->1
....

思路:

1、先寫一個mr程式:統計出每個單詞在每個檔案中的總次數

hello-a.txt 4

hello-b.txt 4

hello-c.txt 4

java-c.txt 1

jerry-b.txt 1

jerry-c.txt 1

要點1:map方法中,如何獲取所處理的這一行資料所在的檔名?

worker在呼叫map方法時,會傳入一個context,而context中包含了這個worker所讀取的資料切片資訊。而切片資訊又包含這個切片所在的檔案資訊,那麼就可以在map中:

FileSplit split=context.getInputSplit();

String fileName=split.getPath().getName();

要點二:setup方法

worker在正式處理資料之前,會先呼叫一次setup方法,所以,常利用這個機制來做一些初始化操作

2、然後在寫一個mr程式,讀取上述結果資料:

map: 根據-切,以單詞做key,後面一段作為value

reduce: 拼接values裡面的每一段,以單詞做key,拼接結果做value,輸出即可

程式碼實現

public class IndexStepOne {
    public static class IndexStepOneMapper extends Mapper<LongWritable,Text,Text,IntWritable>{
        /**
         * 產生: <單詞-檔名,1><單詞-檔名,1>
         */
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            /**
             * 如果map task讀的是檔案:劃分範圍是:《檔案路徑,偏移量範圍》
             * 如果map task讀的是資料庫的資料,劃分的任務範圍是:《庫名.表名,行範圍》
             * 所以給抽象的getInputSplit
             */
            //每個map task所處理的資料任務範圍
            FileSplit inputSplit = (FileSplit) context.getInputSplit();
            String fileName = inputSplit.getPath().getName();
            String[] words = value.toString().split(" ");
            for(String w:words){
                //單詞-檔名 1
                context.write(new Text(w+"-"+fileName),new IntWritable(1));
            }
        }
    }
    public static class IndexStepOneReduce extends Reducer<Text,IntWritable,Text,IntWritable>{

        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            int count=0;
            for(IntWritable value:values){
                count+=value.get();
            }
            context.write(key,new IntWritable(count));
        }
    }
    public static void main(String[] args) throws Exception{
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        //動態獲取jar包在哪裡
        job.setJarByClass(IndexStepOne.class);
        //2.封裝引數:本次job所要呼叫的mapper實現類
        job.setMapperClass(IndexStepOneMapper.class);
        job.setReducerClass(IndexStepOneReduce.class);
        //3.封裝引數:本次job的Mapper實現類產生的資料key,value的型別
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        //4.封裝引數:本次Reduce返回的key,value資料型別
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        //6.封裝引數:想要啟動的reduce task的數量
        job.setNumReduceTasks(3);
        FileInputFormat.setInputPaths(job,new Path("F:\\mrdata\\index\\input"));
        FileOutputFormat.setOutputPath(job,new Path("F:\\mrdata\\index\\out1"));
        boolean res = job.waitForCompletion(true);
        System.exit(res ? 0:-1);
    }
}

執行輸出

part-r-000000    part-r-000001    part-r-0000002

hello-c.txt	4
jack-b.txt	1
java-c.txt	1
jerry-b.txt	1
kitty-a.txt	1
rose-a.txt	1
c++-c.txt	2
hello-a.txt	4
jerry-c.txt	1
jim-a.txt	1
kitty-b.txt	1
tom-a.txt	1
hello-b.txt	4
jim-b.txt	1
public class IndexStepOne2 {
    public static class IndexStepOneMapper extends Mapper<LongWritable,Text,Text,Text>{

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String[] split = value.toString().split("-");
            context.write(new Text(split[0]),
                    new Text(split[1].
                            replaceAll("\t","-->")));
        }
    }
    public static class IndexStepOneReduce extends Reducer<Text,Text,Text,Text>{
       //reduce階段對相同的key進行處理,相同key發給同一個reduce task處理
        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            //StringBuffer是執行緒安全的,StringBuild是執行緒不安全的
            //這裡沒有多執行緒併發,用StringBuild更快
            StringBuilder sb = new StringBuilder();
            /**
             * <hello a.txt-->4> <hello b.txt-->4> <hello c.txt-->4>
             *  <java c.txt-->1>
             *  <jetty b.txt-->1><jetty c.tex-->1>
             */
            /**
             * hello  a.txt-->4  b.txt-->4  c.txt-->4
             * java   c.txt-->1
             * jerry  b.txt-->1  c.txt-->1
             */
            for(Text value:values){
                sb.append(value.toString()).append("\t");
            }
            context.write(key,new Text(sb.toString()));
        }
    }

    public static void main(String[] args) throws Exception{
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        //動態獲取jar包在哪裡
        job.setJarByClass(IndexStepOne2.class);
        //2.封裝引數:本次job所要呼叫的mapper實現類
        job.setMapperClass(IndexStepOneMapper.class);
        job.setReducerClass(IndexStepOneReduce.class);
        //3.封裝引數:本次job的Mapper實現類產生的資料key,value的型別
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        //4.封裝引數:本次Reduce返回的key,value資料型別
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        //6.封裝引數:想要啟動的reduce task的數量
        job.setNumReduceTasks(1);
        FileInputFormat.setInputPaths(job,new Path("F:\\mrdata\\index\\out1"));
        FileOutputFormat.setOutputPath(job,new Path("F:\\mrdata\\index\\out2"));
        boolean res = job.waitForCompletion(true);
        System.exit(res ? 0:-1);
    }
}

執行輸出

c++	c.txt-->2	
hello	a.txt-->4	b.txt-->4	c.txt-->4	
jack	b.txt-->1	
java	c.txt-->1	
jerry	b.txt-->1	c.txt-->1	
jim	a.txt-->1	b.txt-->1	
kitty	b.txt-->1	a.txt-->1	
rose	a.txt-->1	
tom	a.txt-->1	

版權@須臾之餘https://my.oschina.net/u/399512