1. 程式人生 > >Z_Y_J_1_2_3的專欄

Z_Y_J_1_2_3的專欄

這一篇,我們會介紹Elasticsearch查詢流程。
我們先從Elasticsearch的兩個身份:NoSQL和Search領域的查詢區別說起。

讀操作

對於搜尋而言是近實時的,延遲在100ms以上,對於NoSQL則需要是實時的。
一致性指的是寫入成功後,那麼下次讀一定要能讀取到最新的資料。對於搜尋,這個要求會低一些,可以有一些延遲。但是對於NoSQL資料庫,則一般要求最好是強一致性的。
結果匹配上,NoSQL作為資料庫,查詢過程中只有符合不符合兩種情況,而搜尋裡面還有是否相關,類似於NoSQL的結果只能是0或1,而搜尋裡面可能會有0.1,0.5,0.9等部分匹配或者更相關的情況。
結果召回上,搜尋一般只需要召回最滿足條件的Top N結果即可,而NoSQL一般都需要返回滿足條件的所有結果。
搜尋系統一般都是兩階段查詢,第一個階段查詢到對應的Doc ID,也就是PK;第二階段再通過Doc ID去查詢完整文件,而NoSQL資料庫一般是一階段就返回結果。而在Elasticsearch中兩種都支援。
目前NoSQL的查詢,聚合、分析和統計等功能上都是要比搜尋弱的。

Lucene的讀

Elasticsearch使用了Lucene作為搜尋引擎庫,通過Lucene完成特定欄位的搜尋等功能,在Lucene中這個功能是通過IndexSearcher的下列介面實現的:

public TopDocs search(Query query, int n);
public Document doc(int docID);
public int count(Query query);
......(其他)

第一個search介面實現搜尋功能,返回最滿足Query的N個結果;第二個doc介面實現通過doc id查詢Doc內容的功能;第三個count介面實現通過Query獲取到命中數的功能。

這三個功能是搜尋中的最基本的三個功能點,對於大部分Elasticsearch中的查詢都是比較複雜的,直接用這個介面是無法滿足需求的,比如分散式問題。這些問題都留給了Elasticsearch解決,我們接下來看Elasticsearch中相關讀功能的剖析。

search1.png | center

在上圖中,該Shard有1個Primary和2個Replica Node,當查詢的時候,從三個節點中根據Request中的preference引數選擇一個節點查詢。preference可以設定local,primary,_replica以及其他選項。如果選擇了primary,則每次查詢都是直接查詢Primary,可以保證每次查詢都是最新的。如果設定了其他引數,那麼可能會查詢到R1或者R2,這時候就有可能查詢不到最新的資料。
上述程式碼邏輯在OperationRouting.Java的searchShards方法中。

接下來看一下,Elasticsearch中的查詢是如何支援分散式的。

Elasticsearch中通過分割槽實現分散式,資料寫入的時候根據_routing規則將資料寫入某一個Shard中,這樣就能將海量資料分佈在多個Shard以及多臺機器上,已達到分散式的目標。這樣就導致了,查詢的時候,潛在資料會在當前index的所有的Shard中,所以Elasticsearch查詢的時候需要查詢所有Shard,同一個Shard的Primary和Replica選擇一個即可,查詢請求會分發給所有Shard,每個Shard中都是一個獨立的查詢引擎,比如需要返回Top 10的結果,那麼每個Shard就會查詢並且返回Top 10的結果,然後在Client Node裡面會接收所有Shard的結果,然後通過優先順序佇列二次排序,選擇出Top 10的結果,返回給使用者。
search2.png | center

這裡有一個問題就是請求膨脹,使用者的一個搜尋請求在Elasticsearch內部會變成Shard個請求,這裡有個優化點是,雖然是Shard個請求,但是這個Shard個數不一定要是當前Index中的Shard個數,只要是當前查詢相關的Shard即可,通過這種方式可以優化請求膨脹數。

Elasticsearch中的查詢主要分為兩類,Get請求:通過ID查詢特定Doc;Search請求:通過Query查詢匹配Doc。

search3.png | center

上圖中記憶體中的Segment是指剛Refresh Segment,但是還沒持久化到磁碟的新Segment,而非從磁碟載入到記憶體中的Segment。

對於Search類請求,查詢的時候是一起查詢記憶體和磁碟上的Segment,最後將結果合併後返回。這種查詢是近實時(Near Real Time)的。

對於Get類請求,查詢的時候是先查詢記憶體中的TransLog,如果找到就立即返回,如果沒找到再查詢磁碟上的TransLog,如果還沒有則再去查詢磁碟上的Segment。這種查詢是實時(Real Time)的。這種查詢順序可以保證查詢到的Doc是最新版本的Doc,這個功能也是為了保證NoSQL場景下的實時性要求。

多階段查詢 | center

所有的搜尋系統一般都是兩階段查詢,第一階段查詢到匹配的DocID,第二階段再查詢DocID對應的完整文件,這種在Elasticsearch中稱為query_then_fetch,還有一種是一階段查詢的時候就返回完整Doc,在Elasticsearch中稱作query_and_fetch,一般第二種適用於只需要查詢一個Shard的請求。

除了一階段,兩階段外,還有一種三階段查詢的情況。搜尋裡面有一種算分邏輯是根據TF(Term Frequency)和DF(Document Frequency),但是Elasticsearch中查詢的時候,是在每個Shard中獨立查詢的,每個Shard中的TF和DF也是獨立的,雖然在寫入的時候通過_routing保證Doc分佈均勻,但是沒法保證TF和DF均勻,那麼就有會導致區域性的TF和DF不準的情況出現,這個時候基於TF、DF的算分就不準。為了解決這個問題,Elasticsearch中引入了DFS查詢,比如DFS_query_then_fetch,會先收集所有Shard中的TF和DF值,然後將這些值帶入請求中,再次執行query_then_fetch,這樣算分的時候TF和DF就是準確的,類似的有DFS_query_and_fetch。這種查詢的優勢是算分更加精準,但是效率會變差。另一種選擇是用BM25代替TF/DF模型。

Elasticsearch查詢流程

Elasticsearch中的大部分查詢,以及核心功能都是Search型別查詢,上面我們瞭解到查詢分為一階段,二階段和三階段,這裡我們就以最常見的的二階段查詢為例來介紹查詢流程。

查詢流程 | center

註冊Action

Elasticsearch中,查詢和寫操作一樣都是在ActionModule.java中註冊入口處理函式的。

registerHandler.accept(new RestSearchAction(settings, restController));
......
actions.register(SearchAction.INSTANCE, TransportSearchAction.class);
......

如果請求是Rest請求,則會在RestSearchAction中解析請求,檢查查詢型別,不能設定為dfs_query_and_fetch或者query_and_fetch,這兩個目前只能用於Elasticsearch中的優化場景,然後發給後面的TransportSearchAction.class處理。然後構造SearchRequest,將請求傳送給TransportSearchAction中處理。

search5.png | center
search6.png | center

如果是第一階段的Query Phase請求,則會呼叫SearchService的executeQueryPhase方法

如果是第二階段的Fetch Phase請求,則會呼叫SearchService的executeFetchPhase方法。

Client Node

Client Node 也包括了前面說過的Parse Request,這裡就不再贅述了,接下來看一下其他的部分。

1. Get Remove Cluster Shard

判斷是否需要跨叢集訪問,如果需要,則獲取到要訪問的Shard列表。

2. Get Search Shard Iterator

獲取當前Cluster中要訪問的Shard,和上一步中的Remove Cluster Shard合併,構建出最終要訪問的完整Shard列表。
這一步中,會在Primary Node和多個Replica Node中選擇出一個要訪問的Shard。

3. For Every Shard:Perform

遍歷每個Shard,對每個Shard執行後面邏輯。

4. Send Request To Query Shard

將查詢階段請求傳送給相應的Shard。

5. Merge Docs

上一步將請求傳送給多個Shard後,這一步就是非同步等待返回結果,然後對結果合併。這裡的合併策略是維護一個Top N大小的優先順序佇列,每當收到一個shard的返回,就把結果放入優先順序佇列做一次排序,直到所有的Shard都返回。
翻頁邏輯也是在這裡,如果需要取Top 30~ Top 40的結果,這個的意思是所有Shard查詢結果中的第30到40的結果,那麼在每個Shard中無法確定最終的結果,每個Shard需要返回Top 40的結果給Client Node,然後Client Node中在merge docs的時候,計算出Top 40的結果,最後再去除掉Top 30,剩餘的10個結果就是需要的Top 30~ Top 40的結果。
上述翻頁邏輯有一個明顯的缺點就是每次Shard返回的資料中包括了已經翻過的歷史結果,如果翻頁很深,則在這裡需要排序的Docs會很多,比如Shard有1000,取第9990到10000的結果,那麼這次查詢,Shard總共需要返回1000 * 10000,也就是一千萬Doc,這種情況很容易導致OOM。
另一種翻頁方式是使用search_after,這種方式會更輕量級,如果每次只需要返回10條結構,則每個Shard只需要返回search_after之後的10個結果即可,返回的總資料量只是和Shard個數以及本次需要的個數有關,和歷史已讀取的個數無關。這種方式更安全一些,推薦使用這種。
如果有aggregate,也會在這裡做聚合,但是不同的aggregate型別的merge策略不一樣,具體的可以在後面的aggregate文章中再介紹。

6. Send Request To Fetch Shard

選出Top N個Doc ID後傳送Fetch Shard給這些Doc ID所在的Shard,最後會返回Top N的Doc的內容。

Query Phase

接下來我們看第一階段查詢的步驟:

1. Create Search Context

建立Search Context,之後Search過程中的所有中間狀態都會存在Context中,這些狀態總共有50多個,具體可以檢視DefaultSearchContext或者其他SearchContext的子類。

2. Parse Query

解析Query的Source,將結果存入Search Context。這裡會根據請求中Query型別的不同建立不同的Query物件,比如TermQuery、FuzzyQuery等。

這裡包括了dfsPhase、queryPhase和fetchPhase三個階段的preProcess部分,只有queryPhase的preProcess中有執行邏輯,其他兩個都是空邏輯,執行完preProcess後,所有需要的引數都會設定完成。

由於Elasticsearch中有些請求之間是相互關聯的,並非獨立的,比如scroll請求,所以這裡同時會設定Context的生命週期。

同時會設定lowLevelCancellation是否開啟,這個引數是叢集級別配置,同時也能動態開關,開啟後會在後面執行時做更多的檢測,檢測是否需要停止後續邏輯直接返回。

3. Get From Cache

判斷請求是否允許被Cache,如果允許,則檢查Cache中是否已經有結果,如果有則直接讀取Cache,如果沒有則繼續執行後續步驟,執行完後,再將結果加入Cache。

4. Add Collectors

Collector主要目標是收集查詢結果,實現排序,自定義結果集過濾和收集等。這一步會增加多個Collectors,多個Collector組成一個List。

  1. FilteredCollector:先判斷請求中是否有Post Filter,Post Filter用於Search,Agg等結束後再次對結果做Filter,希望Filter不影響Agg結果。如果有Post Filter則建立一個FilteredCollector,加入Collector List中。
  2. PluginInMultiCollector:判斷請求中是否制定了自定義的一些Collector,如果有,則加入Collector List。
  3. MinimumScoreCollector:判斷請求中是否制定了最小分數閾值,如果指定了,則建立MinimumScoreCollector加入Collector List中,在後續收集結果時,會過濾掉得分小於最小分數的Doc。
  4. EarlyTerminatingCollector:判斷請求中是否提前結束Doc的Seek,如果是則建立EarlyTerminatingCollector,加入Collector List中。在後續Seek和收集Doc的過程中,當Seek的Doc數達到Early Terminating後會停止Seek後續倒排鏈。
  5. CancellableCollector:判斷當前操作是否可以被中斷結束,比如是否已經超時等,如果是會丟擲一個TaskCancelledException異常。該功能一般用來提前結束較長的查詢請求,可以用來保護系統。

    1.EarlyTerminatingSortingCollector:如果Index是排序的,那麼可以提前結束對倒排鏈的Seek,相當於在一個排序遞減連結串列上返回最大的N個值,只需要直接返回前N個值就可以了。這個Collector會加到Collector List的頭部。EarlyTerminatingSorting和EarlyTerminating的區別是,EarlyTerminatingSorting是一種對結果無損傷的優化,而EarlyTerminating是有損的,人為掐斷執行的優化。
  6. TopDocsCollector:這個是最核心的Top N結果選擇器,會加入到Collector List的頭部。TopScoreDocCollector和TopFieldCollector都是TopDocsCollector的子類,TopScoreDocCollector會按照固定的方式算分,排序會按照分數+doc id的方式排列,如果多個doc的分數一樣,先選擇doc id小的文件。而TopFieldCollector則是根據使用者指定的Field的值排序。

5. lucene::search

這一步會呼叫Lucene中IndexSearch的search介面,執行真正的搜尋邏輯。每個Shard中會有多個Segment,每個Segment對應一個LeafReaderContext,這裡會遍歷每個Segment,到每個Segment中去Search結果,然後計算分數。

搜尋裡面一般有兩階段算分,第一階段是在這裡算的,會對每個Seek到的Doc都計算分數,一般是算一個基本分數。這一階段完成後,會有個排序。然後再第二階段,再對Top 的結果做一次二階段算分,在二階段算分的時候會考慮更多的因子。二階段算分在下一步中。

Lucene中詳細的查詢流程,後面會有專門文章介紹。

6. rescore

根據Request中是否包含rescore配置決定是否進行二階段排序,如果有則執行二階段算分邏輯,會考慮更多的算分因子。二階段算分也是一種計算機中常見的多層設計,是一種資源消耗和效率的折中。
Elasticsearch中支援配置多個Rescore,這些rescore邏輯會順序遍歷執行。每個rescore內部會先按照請求引數window選擇出Top window的doc,然後對這些doc排序,排完後再合併回原有的Top 結果順序中。

7. suggest::execute()

如果有推薦請求,則在這裡執行推薦請求。如果請求中只包含了推薦的部分,則很多地方可以優化。推薦不是今天的重點,這裡就不介紹了,後面有機會再介紹。

8. aggregation::execute()

如果含有聚合統計請求,則在這裡執行。Elasticsearch中的aggregate的處理邏輯也類似於Search,通過多個Collector來實現。在Client  Node中也需要對aggregation做合併。aggregate邏輯更復雜一些,就不在這裡贅述了,後面有需要就再單獨開文章介紹。
上述邏輯都執行完成後,如果當前查詢請求只需要查詢一個Shard,那麼會直接在當前Node執行Fetch Phase。

Fetch Phase

Elasticsearch作為搜尋系統時,或者任何搜尋系統中,除了Query階段外,還會有一個Fetch階段,這個Fetch階段在資料庫類系統中是沒有的,是搜尋系統中額外增加的階段。搜尋系統中額外增加Fetch階段的原因是搜尋系統中資料分佈導致的,在搜尋中,資料通過routing分Shard的時候,只能根據一個主欄位值來決定,但是查詢的時候可能會根據其他非主欄位查詢,那麼這個時候所有Shard中都可能會存在相同非主欄位值的Doc,所以需要查詢所有Shard才能不會出現結果遺漏。同時如果查詢主欄位,那麼這個時候就能直接定位到Shard,就只需要查詢特定Shard即可,這個時候就類似於資料庫系統了。另外,資料庫中的二級索引又是另外一種情況,但類似於查主欄位的情況,這裡就不多說了。
基於上述原因,第一階段查詢的時候並不知道最終結果會在哪個Shard上,所以每個Shard中管都需要查詢完整結果,比如需要Top 10,那麼每個Shard都需要查詢當前Shard的所有資料,找出當前Shard的Top 10,然後返回給Client Node。如果有100個Shard,那麼就需要返回100 * 10 = 1000個結果,而Fetch Doc內容的操作比較耗費IO和CPU,如果在第一階段就Fetch Doc,那麼這個資源開銷就會非常大。所以,一般是當Client Node選擇出最終Top N的結果後,再對最終的Top N讀取Doc內容。通過增加一點網路開銷而避免大量IO和CPU操作,這個折中是非常划算的。
Fetch階段的目的是通過DocID獲取到使用者需要的完整Doc內容。這些內容包括了DocValues,Store,Source,Script和Highlight等,具體的功能點是在SearchModule中註冊的,系統預設註冊的有:

  • ExplainFetchSubPhase
  • DocValueFieldsFetchSubPhase
  • ScriptFieldsFetchSubPhase
  • FetchSourceSubPhase
  • VersionFetchSubPhase
  • MatchedQueriesFetchSubPhase
  • HighlightPhase
  • ParentFieldSubFetchPhase

除了系統預設的8種外,還有通過外掛的形式註冊自定義的功能,這些SubPhase中最重要的是Source和Highlight,Source是載入原文,Highlight是計算高亮顯示的內容片斷。
上述多個SubPhase會針對每個Doc順序執行,可能會產生多次的隨機IO,這裡會有一些優化方案,但是都是針對特定場景的,不具有通用性。
Fetch Phase執行完後,整個查詢流程就結束了。