ElasticSearch倒排索引原理揭祕——基於mapreduce實現自己的倒排索引
Elasticsearch簡單介紹
Elasticsearch (ES)是一個基於Lucene構建的開源、分散式、RESTful 介面全文搜尋引擎。Elasticsearch 還是一個分散式文件資料庫,其中每個欄位均是被索引的資料且可被搜尋,它能夠擴充套件至數以百計的伺服器儲存以及處理PB級的資料。它可以在很短的時間內在儲、搜尋和分析大量的資料。它通常作為具有複雜搜尋場景情況下的核心發動機。
Elasticsearch就是為高可用和可擴充套件而生的。可以通過購置效能更強的伺服器來完成
Elasticsearch優勢
1.橫向可擴充套件性:只需要增加臺伺服器,做一點兒配置,啟動一下Elasticsearch就可以併入叢集。 2.分片機制提供更好的分佈性:同一個索引分成多個分片(sharding), 這點類似於HDFS的塊機制;分而治之的方式可提升處理效率。 3.高可用:提供複製( replica) 機制,一個分片可以設定多個複製,使得某臺伺服器在宕機的情況下,叢集仍舊可以照常執行,並會把伺服器宕機丟失的資料資訊複製恢復到其他可用節點上。 4.使用簡單:共需一條命令就可以下載檔案,然後很快就能搭建一一個站內搜尋引擎。
Elasticsearch應用場景
大型分散式日誌分析系統ELK elasticsearch(儲存日誌)+logstash(收集日誌)+kibana(展示資料)
大型電商商品搜尋系統、網盤搜尋引擎等。
什麼是倒排索引
倒排表以字或詞為關鍵字進行索引,表中關鍵字所對應的記錄表項記錄了出現這個字或詞的所有文件,一個表項就是一個字表段,它記錄該文件的ID和字元在該文件中出現的位置情況。 由於每個字或詞對應的文件數量在動態變化,所以倒排表的建立和維護都較為複雜,但是在查詢的時候由於可以一次得到查詢關鍵字所對應的所有文件,所以效率高於正排表。在全文檢索中,檢索的快速響應是一個最為關鍵的效能,而索引建立由於在後臺進行,儘管效率相對低一些,但不會影響整個搜尋引擎的效率。
倒排索引案例分析
文件內容:
序號
文件內容
1
小俊是一家科技公司創始人,開的汽車是奧迪a8l,加速爽。
2
小薇是一家科技公司的前臺,開的汽車是保時捷911
3
小紅買了小薇的保時捷911,加速爽。
4
小明是一家科技公司開發主管,開的汽車是奧迪a6l,加速爽。
5
小軍是一家科技公司開發,開的汽車是比亞迪速銳,加速有點慢
倒排索引會對以上文件內容進行關鍵詞分詞,可以使用關鍵次直接定位到文件內容。
單詞ID
單詞
倒排列表docId
1
小
1,2,3,4,5
2
一家
1,2,4,5
3
科技公司
1,2,4,5
4
開發
4,5
5
汽車
1,2,4,5
6
奧迪
1,4
7
加速爽
1,3,4
8
保時捷
2,3
9
保時捷911
2
10
比亞迪
5
基於mapreduce純手寫打造自己的倒排索引
需求:有大量的文字文件,如下所示:
a.txt
hello tom
hello jim
hello kitty
hello rose
b.txt
hello jerry
hello jim
hello kitty
hello jack
c.txt
hello jerry
hello java
hello c++
hello c++
需要得到以下結果:
hello a.txt-->4 b.txt-->4 c.txt-->4
java c.txt-->1
jerry b.txt-->1 c.txt-->1
....
思路:
1、先寫一個mr程式:統計出每個單詞在每個檔案中的總次數
hello-a.txt 4
hello-b.txt 4
hello-c.txt 4
java-c.txt 1
jerry-b.txt 1
jerry-c.txt 1
要點1:map方法中,如何獲取所處理的這一行資料所在的檔名?
worker在呼叫map方法時,會傳入一個context,而context中包含了這個worker所讀取的資料切片資訊。而切片資訊又包含這個切片所在的檔案資訊,那麼就可以在map中:
FileSplit split=context.getInputSplit();
String fileName=split.getPath().getName();
要點二:setup方法
worker在正式處理資料之前,會先呼叫一次setup方法,所以,常利用這個機制來做一些初始化操作
2、然後在寫一個mr程式,讀取上述結果資料:
map: 根據-切,以單詞做key,後面一段作為value
reduce: 拼接values裡面的每一段,以單詞做key,拼接結果做value,輸出即可
程式碼實現
public class IndexStepOne {
public static class IndexStepOneMapper extends Mapper<LongWritable,Text,Text,IntWritable>{
/**
* 產生: <單詞-檔名,1><單詞-檔名,1>
*/
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
/**
* 如果map task讀的是檔案:劃分範圍是:《檔案路徑,偏移量範圍》
* 如果map task讀的是資料庫的資料,劃分的任務範圍是:《庫名.表名,行範圍》
* 所以給抽象的getInputSplit
*/
//每個map task所處理的資料任務範圍
FileSplit inputSplit = (FileSplit) context.getInputSplit();
String fileName = inputSplit.getPath().getName();
String[] words = value.toString().split(" ");
for(String w:words){
//單詞-檔名 1
context.write(new Text(w+"-"+fileName),new IntWritable(1));
}
}
}
public static class IndexStepOneReduce extends Reducer<Text,IntWritable,Text,IntWritable>{
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int count=0;
for(IntWritable value:values){
count+=value.get();
}
context.write(key,new IntWritable(count));
}
}
public static void main(String[] args) throws Exception{
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//動態獲取jar包在哪裡
job.setJarByClass(IndexStepOne.class);
//2.封裝引數:本次job所要呼叫的mapper實現類
job.setMapperClass(IndexStepOneMapper.class);
job.setReducerClass(IndexStepOneReduce.class);
//3.封裝引數:本次job的Mapper實現類產生的資料key,value的型別
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//4.封裝引數:本次Reduce返回的key,value資料型別
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//6.封裝引數:想要啟動的reduce task的數量
job.setNumReduceTasks(3);
FileInputFormat.setInputPaths(job,new Path("F:\\mrdata\\index\\input"));
FileOutputFormat.setOutputPath(job,new Path("F:\\mrdata\\index\\out1"));
boolean res = job.waitForCompletion(true);
System.exit(res ? 0:-1);
}
}
執行輸出
part-r-000000 part-r-000001 part-r-0000002
hello-c.txt 4
jack-b.txt 1
java-c.txt 1
jerry-b.txt 1
kitty-a.txt 1
rose-a.txt 1
c++-c.txt 2
hello-a.txt 4
jerry-c.txt 1
jim-a.txt 1
kitty-b.txt 1
tom-a.txt 1
hello-b.txt 4
jim-b.txt 1
public class IndexStepOne2 {
public static class IndexStepOneMapper extends Mapper<LongWritable,Text,Text,Text>{
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] split = value.toString().split("-");
context.write(new Text(split[0]),
new Text(split[1].
replaceAll("\t","-->")));
}
}
public static class IndexStepOneReduce extends Reducer<Text,Text,Text,Text>{
//reduce階段對相同的key進行處理,相同key發給同一個reduce task處理
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
//StringBuffer是執行緒安全的,StringBuild是執行緒不安全的
//這裡沒有多執行緒併發,用StringBuild更快
StringBuilder sb = new StringBuilder();
/**
* <hello a.txt-->4> <hello b.txt-->4> <hello c.txt-->4>
* <java c.txt-->1>
* <jetty b.txt-->1><jetty c.tex-->1>
*/
/**
* hello a.txt-->4 b.txt-->4 c.txt-->4
* java c.txt-->1
* jerry b.txt-->1 c.txt-->1
*/
for(Text value:values){
sb.append(value.toString()).append("\t");
}
context.write(key,new Text(sb.toString()));
}
}
public static void main(String[] args) throws Exception{
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//動態獲取jar包在哪裡
job.setJarByClass(IndexStepOne2.class);
//2.封裝引數:本次job所要呼叫的mapper實現類
job.setMapperClass(IndexStepOneMapper.class);
job.setReducerClass(IndexStepOneReduce.class);
//3.封裝引數:本次job的Mapper實現類產生的資料key,value的型別
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
//4.封裝引數:本次Reduce返回的key,value資料型別
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
//6.封裝引數:想要啟動的reduce task的數量
job.setNumReduceTasks(1);
FileInputFormat.setInputPaths(job,new Path("F:\\mrdata\\index\\out1"));
FileOutputFormat.setOutputPath(job,new Path("F:\\mrdata\\index\\out2"));
boolean res = job.waitForCompletion(true);
System.exit(res ? 0:-1);
}
}
執行輸出
c++ c.txt-->2
hello a.txt-->4 b.txt-->4 c.txt-->4
jack b.txt-->1
java c.txt-->1
jerry b.txt-->1 c.txt-->1
jim a.txt-->1 b.txt-->1
kitty b.txt-->1 a.txt-->1
rose a.txt-->1
tom a.txt-->1