1. 程式人生 > >ES讀寫操作詳解

ES讀寫操作詳解

目前的Elasticsearch有兩個明顯的身份,一個是分散式搜尋系統,另一個是分散式NoSQL資料庫,對於這兩種不同的身份,讀寫語義基本類似,但也有一點差異。

werite1

寫操作

  • 實時性:

    • 搜尋系統的Index一般都是NRT(Near Real Time),近實時的,比如Elasticsearch中,Index的實時性是由refresh控制的,預設是1s,最快可到100ms,那麼也就意味著Index doc成功後,需要等待一秒鐘後才可以被搜尋到。
    • NoSQL資料庫的Write基本都是RT(Real Time),實時的,寫入成功後,立即是可見的。Elasticsearch中的Index請求也能保證是實時的,因為Get請求會直接讀記憶體中尚未Flush到儲存介質的TransLog。
  • 可靠性:

    • 搜尋系統對可靠性要求都不高,一般資料的可靠性通過將原始資料儲存在另一個儲存系統來保證,當搜尋系統的資料發生丟失時,再從其他儲存系統導一份資料過來重新rebuild就可以了。在Elasticsearch中,通過設定TransLog的Flush頻率可以控制可靠性,要麼是按請求,每次請求都Flush;要麼是按時間,每隔一段時間Flush一次。一般為了效能考慮,會設定為每隔5秒或者1分鐘Flush一次,Flush間隔時間越長,可靠性就會越低。
    • NoSQL資料庫作為一款資料庫,必須要有很高的可靠性,資料可靠性是生命底線,決不能有閃失。如果把Elasticsearch當做NoSQL資料庫,此時需要設定TransLog的Flush策略為每個請求都要Flush,這樣才能保證當前Shard寫入成功後,資料能儘量持久化下來。

上面簡單介紹了下NoSQL資料庫和搜尋系統的一些異同,我們會在後面有一篇文章,專門用來介紹Elasticsearch作為NoSQL資料庫時的一些侷限和特點。

Lucene資料模型

Lucene中包含了四種基本資料型別,分別是:

  • Index:索引,由很多的Document組成。
  • Document:由很多的Field組成,是Index和Search的最小單位。
  • Field:由很多的Term組成,包括Field Name和Field Value。
  • Term:由很多的位元組組成,可以分詞。
    上述四種類型在Elasticsearch中同樣存在,意思也一樣。

Lucene中儲存的索引主要分為三種類型:

  • Invert Index:倒排索引,或者簡稱Index,通過Term可以查詢到擁有該Term的文件。可以配置為是否分詞,如果分詞可以配置不同的分詞器。索引儲存的時候有多種儲存型別,分別是:

    • DOCS:只儲存DocID。
    • DOCS_AND_FREQS:儲存DocID和詞頻(Term Freq)。
    • DOCS_AND_FREQS_AND_POSITIONS:儲存DocID、詞頻(Term Freq)和位置。
    • DOCS_AND_FREQS_AND_POSITIONS_AND_OFFSETS:儲存DocID、詞頻(Term Freq)、位置和偏移。
  • DocValues:正排索引,採用列式儲存。通過DocID可以快速讀取到該Doc的特定欄位的值。由於是列式儲存,效能會比較好。一般用於sort,agg等需要高頻讀取Doc欄位值的場景。
  • Store:欄位原始內容儲存,同一篇文章的多個Field的Store會儲存在一起,適用於一次讀取少量且多個欄位記憶體的場景,比如摘要等。

Lucene中提供索引和搜尋的最小組織形式是Segment,Segment中按照索引型別不同,分成了Invert Index,Doc Values和Store這三大類(還有一些輔助類,這裡省略),每一類裡面都是按照Doc為最小單位儲存。Invert Index中儲存的Key是Term,Value是Doc ID的連結串列;Doc Value中Key 是Doc ID和Field Name,Value是Field Value;Store的Key是Doc ID,Value是Filed Name和Filed Value。

由於Lucene中沒有主鍵概念和更新邏輯,所有對Lucene的更新都是Append一個新Doc,類似於一個只能Append的佇列,所有Doc都被同等對等,同樣的處理方式。其中的Doc由眾多Field組成,沒有特殊Field,每個Field也都被同等對待,同樣的處理方式。

從上面介紹來看,Lucene只是提供了一個索引和查詢的最基本的功能,距離一個完全可用的完整搜尋引擎還有一些距離:

Lucene的不足

  • Lucene是一個單機的搜尋庫,如何能以分散式形式支援海量資料?
    Lucene中沒有更新,每次都是Append一個新文件,如何做部分欄位的更新?
  • Lucene中沒有主鍵索引,如何處理同一個Doc的多次寫入?
  • 在稀疏列資料中,如何判斷某些文件是否存在特定欄位?
  • Lucene中生成完整Segment後,該Segment就不能再被更改,此時該Segment才能被搜尋,這種情況下,如何做實時搜尋?

上述幾個問題,對於搜尋而言都是至關重要的功能訴求,我們接下來看看Elasticsearch中是如何來解這些問題的。

Elasticsearch怎麼做

在Elasticsearch中,為了支援分散式,增加了一個系統欄位_routing(路由),通過_routing將Doc分發到不同的Shard,不同的Shard可以位於不同的機器上,這樣就能實現簡單的分散式了。

採用類似的方式,Elasticsearch增加了_id、_version、_source和_seq_no等等多個系統欄位,通過這些Elasticsearch中特有的系統欄位可以有效解決上述的幾個問題,新增的系統欄位主要是下列幾個:

es333

下面我們逐個欄位的剖析下上述系統欄位的作用,先來看第一個_id欄位:

1. _id

Doc的主鍵,在寫入的時候,可以指定該Doc的ID值,如果不指定,則系統自動生成一個唯一的UUID值。

Lucene中沒有主鍵索引,要保證系統中同一個Doc不會重複,Elasticsearch引入了_id欄位來實現主鍵。每次寫入的時候都會先查詢id,如果有,則說明已經有相同Doc存在了。

通過_id值(ES內部轉換成_uid)可以唯一在Elasticsearch中確定一個Doc。

Elasticsearch中,_id只是一個使用者級別的虛擬欄位,在Elasticsearch中並不會對映到Lucene中,所以也就不會儲存該欄位的值。

_id的值可以由_uid解析而來(_uid =type + '#' + id),Elasticsearch中會儲存_uid。

2. _uid

_uid的格式是:type + '#' + id。

_uid會儲存在Lucene中,在Lucene中的對映關係如下:dex下可能存在多個id值相同的Doc,而6.0.0之後只支援單Type,同Index下id值是唯一的。

uid會儲存在Lucene中,在Lucene中的對映關係如下:
8
_uid 只是儲存了倒排Index和原文store:倒排Index的目的是可以通過_id快速查詢到文件;原文store用來在返回的Response裡面填充完整的_id值。

在Lucene中儲存_uid,而不是_id的原因是,在6.0.0之前版本里面,_uid可以比_id表示更多的資訊,比如Type。在6.0.0版本之後,同一個Index只能有一個Type,這時候Type就沒多大意義了,後面Type應該會消失,那時候_id就會和_uid概念一樣,到時候兩者會合二為一,也能簡化大家的理解。

3. _version

Elasticsearch中每個Doc都會有一個Version,該Version可以由使用者指定,也可以由系統自動生成。如果是系統自動生成,那麼每次Version都是遞增1。

_version是實時的,不受搜尋的近實時性影響,原因是可以通過_uid從記憶體中versionMap或者TransLog中讀取到。

Version在Lucene中也是對映為一個特殊的Field存在。
version
Elasticsearch中Version欄位的主要目的是通過doc_id讀取Version,所以Version只要儲存為DocValues就可以了,類似於KeyValue儲存。

Elasticsearch通過使用version來保證對文件的變更能以正確的順序執行,避免亂序造成的資料丟失:

  • 首次寫入Doc的時候,會為Doc分配一個初始的Version:V0,該值根據VersionType不同而不同。
  • 再次寫入Doc的時候,如果Request中沒有指定Version,則會先加鎖,然後去讀取該Doc的最大版本V1,然後將V1+1後的新版本寫入Lucene中。
  • 再次寫入Doc的時候,如果Request中指定了Version:V1,則繼續會先加鎖,然後去讀該Doc的最大版本V2,判斷V1==V2,如果不相等,則發生版本衝突。否則版本吻合,繼續寫入Lucene。
  • 當做部分更新的時候,會先通過GetRequest讀取當前id的完整Doc和V1,接著和當前Request中的Doc合併為一個完整Doc。然後執行一些邏輯後,加鎖,再次讀取該Doc的最大版本號V2,判斷V1==V2,如果不相等,則在剛才執行其他邏輯時被其他執行緒更改了當前文件,需要報錯後重試。如果相等,則期間沒有其他執行緒修改當前文件,繼續寫入Lucene中。這個過程就是一個典型的read-then-update事務。

4. _source

Elasticsearch中有一個重要的概念是source,儲存原始文件,也可以通過過濾設定只儲存特定Field。

Source在Lucene中也是對映為了一個特殊的Field存在:
store
Elasticsearch中_source欄位的主要目的是通過doc_id讀取該文件的原始內容,所以只需要儲存Store即可。

_source其實是名為_source的虛擬Store Field。

Elasticsearch中使用_source欄位可以實現以下功能:

  • Update:部分更新時,需要從讀取文件儲存在_source欄位中的原文,然後和請求中的部分欄位合併為一個完整文件。如果沒有_source,則不能完成部分欄位的Update操作。
  • Rebuild:最新的版本中新增了rebuild介面,可以通過Rebuild API完成索引重建,過程中不需要從其他系統匯入全量資料,而是從當前文件的_source中讀取。如果沒有_source,則不能使用Rebuild API。
  • Script:不管是Index還是Search的Script,都可能用到儲存在Store中的原始內容,如果禁用了_source,則這部分功能不再可用。
  • Summary:摘要資訊也是來源於_source欄位。

5. _seq_no

嚴格遞增的順序號,每個文件一個,Shard級別嚴格遞增,保證後寫入的Doc的_seq_no大於先寫入的Doc的_seq_no。

任何型別的寫操作,包括index、create、update和Delete,都會生成一個_seq_no。

_seq_no在Primary Node中由SequenceNumbersService生成,但其實真正產生這個值的是LocalCheckpointTracker,每次遞增1:

/**
     * The next available sequence number.
     */
    private volatile long nextSeqNo;
    /**
     * Issue the next sequence number.
     *
     * @return the next assigned sequence number
     */
    synchronized long generateSeqNo() {
        return nextSeqNo++;
    }

每個文件在使用Lucene的document操作介面之前,會獲取到一個_seq_no,這個_seq_no會以系統保留Field的名義儲存到Lucene中,文件寫入Lucene成功後,會標記該seq_no為完成狀態,這時候會使用當前seq_no更新local_checkpoint。

checkpoint分為local_checkpoint和global_checkpoint,主要是用於保證有序性,以及減少Shard恢復時資料拷貝的資料拷貝量,更詳細的介紹可以看這篇文章:Sequence IDs: Coming Soon to an Elasticsearch Cluster Near You

_seq_no在Lucene中的對映:
seq_no

Elasticsearch中_seq_no的作用有兩個,一是通過doc_id查詢到該文件的seq_no,二是通過seq_no範圍查詢相關文件,所以也就需要儲存為Index和DocValues(或者Store)。由於是在衝突檢測時才需要讀取文件的_seq_no,而且此時只需要讀取_seq_no,不需要其他欄位,這時候儲存為列式儲存的DocValues比Store在效能上更好一些。

_seq_no是嚴格遞增的,寫入Lucene的順序也是遞增的,所以DocValues儲存型別可以設定為Sorted。

另外,_seq_no的索引應該僅需要支援儲存DocId就可以了,不需要FREQS、POSITIONS和分詞。如果多儲存了這些,對功能也沒影響,就是多佔了一點資源而已。

6. _primary_term

_primary_term也和_seq_no一樣是一個整數,每當Primary Shard發生重新分配時,比如重啟,Primary選舉等,_primary_term會遞增1。

_primary_term主要是用來恢復資料時處理當多個文件的_seq_no一樣時的衝突,避免Primary Shard上的寫入被覆蓋。
primary_term
Elasticsearch中_primary_term只需要通過doc_id讀取到即可,所以只需要儲存為DocValues就可以了.

7. _routing

路由規則,寫入和查詢的routing需要一致,否則會出現寫入的文件沒法被查到情況。

在mapping中,或者Request中可以指定按某個欄位路由。預設是按照_Id值路由。

_routing在Lucene中對映為:
routing

Elasticsearch中文件級別的_routing主要有兩個目的,一是可以查詢到使用某種_routing的文件有哪些,當發生_routing變化時,可以對歷史_routing的文件重新讀取再Index,這個需要倒排Index。另一個是查詢到文件後,在Response裡面展示該文件使用的_routing規則,這裡需要儲存為Store。

8. _field_names

該欄位會索引某個Field的名稱,用來判斷某個Doc中是否存在某個Field,用於exists或者missing請求。

_field_names在Lucene中的對映:

_field_names

Elasticsearch中_field_names的目的是查詢哪些Doc的這個Field是否存在,所以只需要倒排Index即可。

 

讀操作

下一篇《Elasticsearch 查詢流程簡介》中再詳細介紹。

上面大概對比了下搜尋和NoSQL在寫方面的特點,接下來,我們看一下Elasticsearch 6.0.0版本中寫入流程都做了哪些事情,希望能對大家有用。

寫操作的關鍵點

在考慮或分析一個分散式系統的寫操作時,一般需要從下面幾個方面考慮:

  • 可靠性:或者是永續性,資料寫入系統成功後,資料不會被回滾或丟失。
  • 一致性:資料寫入成功後,再次查詢時必須能保證讀取到最新版本的資料,不能讀取到舊資料。
  • 原子性:一個寫入或者更新操作,要麼完全成功,要麼完全失敗,不允許出現中間狀態。
  • 隔離性:多個寫入操作相互不影響。
  • 實時性:寫入後是否可以立即被查詢到。
  • 效能:寫入效能,吞吐量到底怎麼樣。
    Elasticsearch作為分散式系統,也需要在寫入的時候滿足上述的四個特點,我們在後面的寫流程介紹中會涉及到上述四個方面。

接下來,我們一層一層剖析Elasticsearch內部的寫機制。

Lucene的寫

眾所周知,Elasticsearch內部使用了Lucene完成索引建立和搜尋功能,Lucene中寫操作主要是通過IndexWriter類實現,IndexWriter提供三個介面:

 public long addDocument();
 public long updateDocuments();
 public long deleteDocuments();

通過這三個介面可以完成單個文件的寫入,更新和刪除功能,包括了分詞,倒排建立,正排建立等等所有搜尋相關的流程。只要Doc通過IndesWriter寫入後,後面就可以通過IndexSearcher搜尋了,看起來功能已經完善了,但是仍然有一些問題沒有解:

上述操作是單機的,而不是我們需要的分散式。

  • 文件寫入Lucene後並不是立即可查詢的,需要生成完整的Segment後才可被搜尋,如何保證實時性?
  • Lucene生成的Segment是在記憶體中,如果機器宕機或掉電後,記憶體中的Segment會丟失,如何保證資料可靠性 ?
  • Lucene不支援部分文件更新,但是這又是一個強需求,如何支援部分更新?
  • 上述問題,在Lucene中是沒有解決的,那麼就需要Elasticsearch中解決上述問題。

Elasticsearch在解決上述問題時,除了我們在上一篇《Elasticsearch資料模型簡介》中介紹的幾種系統欄位外,在引擎架構上也引入了多重機制來解決問題。我們再來看Elasticsearch中的寫機制。

Elasticsearch的寫

Elasticsearch採用多Shard方式,通過配置routing規則將資料分成多個數據子集,每個資料子集提供獨立的索引和搜尋功能。當寫入文件的時候,根據routing規則,將文件傳送給特定Shard中建立索引。這樣就能實現分散式了。

此外,Elasticsearch整體架構上採用了一主多副的方式:
Elasticsearch一主多副
每個Index由多個Shard組成,每個Shard有一個主節點和多個副本節點,副本個數可配。但每次寫入的時候,寫入請求會先根據_routing規則選擇發給哪個Shard,Index Request中可以設定使用哪個Filed的值作為路由引數,如果沒有設定,則使用Mapping中的配置,如果mapping中也沒有配置,則使用_id作為路由引數,然後通過_routing的Hash值選擇出Shard(在OperationRouting類中),最後從叢集的Meta中找出出該Shard的Primary節點。

請求接著會發送給Primary Shard,在Primary Shard上執行成功後,再從Primary Shard上將請求同時傳送給多個Replica Shard,請求在多個Replica Shard上執行成功並返回給Primary Shard後,寫入請求執行成功,返回結果給客戶端。

這種模式下,寫入操作的延時就等於latency = Latency(Primary Write) + Max(Replicas Write)。只要有副本在,寫入延時最小也是兩次單Shard的寫入時延總和,寫入效率會較低,但是這樣的好處也很明顯,避免寫入後,單機或磁碟故障導致資料丟失,在資料重要性和效能方面,一般都是優先選擇資料,除非一些允許丟資料的特殊場景。

採用多個副本後,避免了單機或磁碟故障發生時,對已經持久化後的資料造成損害,但是Elasticsearch裡為了減少磁碟IO保證讀寫效能,一般是每隔一段時間(比如5分鐘)才會把Lucene的Segment寫入磁碟持久化,對於寫入記憶體,但還未Flush到磁碟的Lucene資料,如果發生機器宕機或者掉電,那麼記憶體中的資料也會丟失,這時候如何保證?

對於這種問題,Elasticsearch學習了資料庫中的處理方式:增加CommitLog模組,Elasticsearch中叫TransLog。

Refresh && Flush

在每一個Shard中,寫入流程分為兩部分,先寫入Lucene,再寫入TransLog。

寫入請求到達Shard後,先寫Lucene檔案,建立好索引,此時索引還在記憶體裡面,接著去寫TransLog,寫完TransLog後,重新整理TransLog資料到磁碟上,寫磁碟成功後,請求返回給使用者。這裡有幾個關鍵點,一是和資料庫不同,資料庫是先寫CommitLog,然後再寫記憶體,而Elasticsearch是先寫記憶體,最後才寫TransLog,一種可能的原因是Lucene的記憶體寫入會有很複雜的邏輯,很容易失敗,比如分詞,欄位長度超過限制等,比較重,為了避免TransLog中有大量無效記錄,減少recover的複雜度和提高速度,所以就把寫Lucene放在了最前面。二是寫Lucene記憶體後,並不是可被搜尋的,需要通過Refresh把記憶體的物件轉成完整的Segment後,然後再次reopen後才能被搜尋,一般這個時間設定為1秒鐘,導致寫入Elasticsearch的文件,最快要1秒鐘才可被從搜尋到,所以Elasticsearch在搜尋方面是NRT(Near Real Time)近實時的系統。三是當Elasticsearch作為NoSQL資料庫時,查詢方式是GetById,這種查詢可以直接從TransLog中查詢,這時候就成了RT(Real Time)實時系統。四是每隔一段比較長的時間,比如30分鐘後,Lucene會把記憶體中生成的新Segment重新整理到磁碟上,重新整理後索引檔案已經持久化了,歷史的TransLog就沒用了,會清空掉舊的TransLog。

上面介紹了Elasticsearch在寫入時的兩個關鍵模組,Replica和TransLog,接下來,我們看一下Update流程:
Update
Lucene中不支援部分欄位的Update,所以需要在Elasticsearch中實現該功能,具體流程如下:

  • 收到Update請求後,從Segment或者TransLog中讀取同id的完整Doc,記錄版本號為V1。
  • 將版本V1的全量Doc和請求中的部分欄位Doc合併為一個完整的Doc,同時更新記憶體中的VersionMap。獲取到完整Doc後,Update請求就變成了Index請求。
  • 加鎖。
  • 再次從versionMap中讀取該id的最大版本號V2,如果versionMap中沒有,則從Segment或者TransLog中讀取,這裡基本都會從versionMap中獲取到。
  • 檢查版本是否衝突(V1==V2),如果衝突,則回退到開始的“Update doc”階段,重新執行。如果不衝突,則執行最新的Add請求。
  • 在Index Doc階段,首先將Version + 1得到V3,再將Doc加入到Lucene中去,Lucene中會先刪同id下的已存在doc id,然後再增加新Doc。寫入Lucene成功後,將當前V3更新到versionMap中。
  • 釋放鎖,部分更新的流程就結束了。

介紹完部分更新的流程後,大家應該從整體架構上對Elasticsearch的寫入有了一個初步的映象,接下來我們詳細剖析下寫入的詳細步驟。

Elasticsearch寫入請求型別

Elasticsearch中的寫入請求型別,主要包括下列幾個:Index(Create),Update,Delete和Bulk,其中前3個是單文件操作,後一個Bulk是多文件操作,其中Bulk中可以包括Index(Create),Update和Delete。

在6.0.0及其之後的版本中,前3個單文件操作的實現基本都和Bulk操作一致,甚至有些就是通過呼叫Bulk的介面實現的。估計接下來幾個版本後,Index(Create),Update,Delete都會被當做Bulk的一種特例化操作被處理。這樣,程式碼和邏輯都會更清晰一些。

下面,我們就以Bulk請求為例來介紹寫入流程。

Elasticsearch寫入流程圖

1

  • 紅色:Client Node。
  • 綠色:Primary Node。
  • 藍色:Replica Node。

註冊Action

在Elasticsearch中,所有action的入口處理方法都是註冊在ActionModule.java中,比如Bulk Request有兩個註冊入口,分別是Rest和Transport入口:
2
3
如果請求是Rest請求,則會在RestBulkAction中Parse Request,構造出BulkRequest,然後發給後面的TransportAction處理。

TransportShardBulkAction的基類TransportReplicationAction中註冊了對Primary,Replica等的不同處理入口:
4

這裡對原始請求,Primary Node請求和Replica Node請求各自注冊了一個handler處理入口。

Client Node

Client Node 也包括了前面說過的Parse Request,這裡就不再贅述了,接下來看一下其他的部分。

1. Ingest Pipeline

在這一步可以對原始文件做一些處理,比如HTML解析,自定義的處理,具體處理邏輯可以通過外掛來實現。在Elasticsearch中,由於Ingest Pipeline會比較耗費CPU等資源,可以設定專門的Ingest Node,專門用來處理Ingest Pipeline邏輯。

如果當前Node不能執行Ingest Pipeline,則會將請求發給另一臺可以執行Ingest Pipeline的Node。

2. Auto Create Index

判斷當前Index是否存在,如果不存在,則需要自動建立Index,這裡需要和Master互動。也可以通過配置關閉自動建立Index的功能。

3. Set Routing

設定路由條件,如果Request中指定了路由條件,則直接使用Request中的Routing,否則使用Mapping中配置的,如果Mapping中無配置,則使用預設的_id欄位值。

在這一步中,如果沒有指定id欄位,則會自動生成一個唯一的_id欄位,目前使用的是UUID。

4. Construct BulkShardRequest

由於Bulk Request中會包括多個(Index/Update/Delete)請求,這些請求根據routing可能會落在多個Shard上執行,這一步會按Shard挑揀Single Write Request,同一個Shard中的請求聚集在一起,構建BulkShardRequest,每個BulkShardRequest對應一個Shard。

5. Send Request To Primary

這一步會將每一個BulkShardRequest請求傳送給相應Shard的Primary Node。

Primary Node

Primary 請求的入口是在PrimaryOperationTransportHandler的messageReceived,我們來看一下相關的邏輯流程。

1. Index or Update or Delete

迴圈執行每個Single Write Request,對於每個Request,根據操作型別(CREATE/INDEX/UPDATE/DELETE)選擇不同的處理邏輯。

其中,Create/Index是直接新增Doc,Delete是直接根據_id刪除Doc,Update會稍微複雜些,我們下面就以Update為例來介紹。

2. Translate Update To Index or Delete

這一步是Update操作的特有步驟,在這裡,會將Update請求轉換為Index或者Delete請求。首先,會通過GetRequest查詢到已經存在的同_id Doc(如果有)的完整欄位和值(依賴_source欄位),然後和請求中的Doc合併。同時,這裡會獲取到讀到的Doc版本號,記做V1。

3. Parse Doc

這裡會解析Doc中各個欄位。生成ParsedDocument物件,同時會生成uid Term。在Elasticsearch中,_uid = type # _id,對使用者,_Id可見,而Elasticsearch中儲存的是_uid。這一部分生成的ParsedDocument中也有Elasticsearch的系統欄位,大部分會根據當前內容填充,部分未知的會在後面繼續填充ParsedDocument。

4. Update Mapping

Elasticsearch中有個自動更新Mapping的功能,就在這一步生效。會先挑選出Mapping中未包含的新Field,然後判斷是否執行自動更新Mapping,如果允許,則更新Mapping。

5. Get Sequence Id and Version

由於當前是Primary Shard,則會從SequenceNumber Service獲取一個sequenceID和Version。SequenceID在Shard級別每次遞增1,SequenceID在寫入Doc成功後,會用來初始化LocalCheckpoint。Version則是根據當前Doc的最大Version遞增1。

6. Add Doc To Lucene

這一步開始的時候會給特定_uid加鎖,然後判斷該_uid對應的Version是否等於之前Translate Update To Index步驟裡獲取到的Version,如果不相等,則說明剛才讀取Doc後,該Doc發生了變化,出現了版本衝突,這時候會丟擲一個VersionConflict的異常,該異常會在Primary Node最開始處捕獲,重新從“Translate Update To Index or Delete”開始執行。

如果Version相等,則繼續執行,如果已經存在同id的Doc,則會呼叫Lucene的UpdateDocument(uid, doc)介面,先根據uid刪除Doc,然後再Index新Doc。如果是首次寫入,則直接呼叫Lucene的AddDocument介面完成Doc的Index,AddDocument也是通過UpdateDocument實現。

這一步中有個問題是,如何保證Delete-Then-Add的原子性,怎麼避免中間狀態時被Refresh?答案是在開始Delete之前,會加一個Refresh Lock,禁止被Refresh,只有等Add完後釋放了Refresh Lock後才能被Refresh,這樣就保證了Delete-Then-Add的原子性。

Lucene的UpdateDocument介面中就只是處理多個Field,會遍歷每個Field逐個處理,處理順序是invert index,store field,doc values,point dimension,後續會有文章專門介紹Lucene中的寫入。

7. Write Translog

寫完Lucene的Segment後,會以keyvalue的形式寫TransLog,Key是_id,Value是Doc內容。當查詢的時候,如果請求是GetDocByID,則可以直接根據_id從TransLog中讀取到,滿足NoSQL場景下的實時性要去。

需要注意的是,這裡只是寫入到記憶體的TransLog,是否Sync到磁碟的邏輯還在後面。

這一步的最後,會標記當前SequenceID已經成功執行,接著會更新當前Shard的LocalCheckPoint。

8. Renew Bulk Request

這裡會重新構造Bulk Request,原因是前面已經將UpdateRequest翻譯成了Index或Delete請求,則後續所有Replica中只需要執行Index或Delete請求就可以了,不需要再執行Update邏輯,一是保證Replica中邏輯更簡單,效能更好,二是保證同一個請求在Primary和Replica中的執行結果一樣。

9. Flush Translog

這裡會根據TransLog的策略,選擇不同的執行方式,要麼是立即Flush到磁碟,要麼是等到以後再Flush。Flush的頻率越高,可靠性越高,對寫入效能影響越大。

10. Send Requests To Replicas

這裡會將剛才構造的新的Bulk Request並行傳送給多個Replica,然後等待Replica的返回,這裡需要等待多個Replica成功返回後,Primary Node才會返回使用者,具體需要多少Replica Node返回,可以通過配置或者請求引數設定。

這裡,同時會將SequenceID,PrimaryTerm,GlobalCheckPoint等傳遞給Replica。

傳送給Replica的請求中,Action Name等於原始ActionName + [R],這裡的R表示Replica。通過這個[R]的不同,可以找到處理Replica請求的Handler。

11. Receive Response From Replicas

Replica中請求都處理完後,且滿足最小Replica返回數後,會更新Primary Node的LocalCheckPoint。

Replica Node

Replica 請求的入口是在ReplicaOperationTransportHandler的messageReceived,我們來看一下相關的邏輯流程。

1. Index or Delete

根據請求型別是Index還是Delete,選擇不同的執行邏輯。這裡沒有Update,是因為在Primary Node中已經將Update轉換成了Index或Delete請求了。

2. Parse Doc

3. Update Mapping

以上都和Primary Node中邏輯一致

4. Get Sequence Id and Version

Primary Node中會生成Sequence ID和Version,然後放入ReplicaRequest中,這裡只需要從Request中獲取到就行。

5. Add Doc To Lucene

由於已經在Primary Node中將部分Update請求轉換成了Index或Delete請求,這裡只需要處理Index和Delete兩種請求,不再需要處理Update請求了。比Primary Node會更簡單一些。

6. Write Translog

7. Flush Translog

以上都和Primary Node中邏輯一致。

最後

上面詳細介紹了Elasticsearch的寫入流程及其各個流程的工作機制,我們在這裡再次總結下之前提出的分散式系統中的六大特性:

 ====================================================================================

  • 可靠性:由於Lucene的設計中不考慮可靠性,在Elasticsearch中通過Replica和TransLog兩套機制保證資料的可靠性。
  • 一致性:Lucene中的Flush鎖只保證Update接口裡面Delete和Add中間不會Flush,但是Add完成後仍然有可能立即發生Flush,導致Segment可讀。這樣就沒法保證Primary和所有其他Replica可以同一時間Flush,就會出現查詢不穩定的情況,這裡只能實現最終一致性。
  • 原子性:Add和Delete都是直接呼叫Lucene的介面,是原子的。當部分更新時,使用Version和鎖保證更新是原子的。
  • 隔離性:仍然採用Version和區域性鎖來保證更新的是特定版本的資料。
  • 實時性:使用定期Refresh Segment到記憶體,並且Reopen Segment方式保證搜尋可以在較短時間(比如1秒)內被搜尋到。通過將未重新整理到磁碟資料記入TransLog,保證對未提交資料可以通過ID實時訪問到。
  • 效能:效能是一個系統性工程,所有環節都要考慮對效能的影響,在Elasticsearch中,在很多地方的設計都考慮到了效能,一是不需要所有Replica都返回後才能返回給使用者,只需要返回特定數目的就行;二是生成的Segment現在記憶體中提供服務,等一段時間後才重新整理到磁碟,Segment在記憶體這段時間的可靠性由TransLog保證;三是TransLog可以配置為週期性的Flush,但這個會給可靠性帶來傷害;四是每個執行緒持有一個Segment,多執行緒時相互不影響,相互獨立,效能更好;五是系統的寫入流程對版本依賴較重,讀取頻率較高,因此採用了versionMap,減少熱點資料的多次磁碟IO開銷。Lucene中針對性能做了大量的優化。後面我們也會有文章專門介紹Lucene中的優化思路。

讀操作

對於搜尋而言是近實時的,延遲在100ms以上,對於NoSQL則需要是實時的。
一致性指的是寫入成功後,那麼下次讀一定要能讀取到最新的資料。對於搜尋,這個要求會低一些,可以有一些延遲。但是對於NoSQL資料庫,則一般要求最好是強一致性的。
結果匹配上,NoSQL作為資料庫,查詢過程中只有符合不符合兩種情況,而搜尋裡面還有是否相關,類似於NoSQL的結果只能是0或1,而搜尋裡面可能會有0.1,0.5,0.9等部分匹配或者更相關的情況。
結果召回上,搜尋一般只需要召回最滿足條件的Top N結果即可,而NoSQL一般都需要返回滿足條件的所有結果。
搜尋系統一般都是兩階段查詢,第一個階段查詢到對應的Doc ID,也就是PK;第二階段再通過Doc ID去查詢完整文件,而NoSQL資料庫一般是一階段就返回結果。而在Elasticsearch中兩種都支援。
目前NoSQL的查詢,聚合、分析和統計等功能上都是要比搜尋弱的。

Lucene的讀

Elasticsearch使用了Lucene作為搜尋引擎庫,通過Lucene完成特定欄位的搜尋等功能,在Lucene中這個功能是通過IndexSearcher的下列介面實現的:

public TopDocs search(Query query, int n);
public Document doc(int docID);
public int count(Query query);
......(其他)

第一個search介面實現搜尋功能,返回最滿足Query的N個結果;第二個doc介面實現通過doc id查詢Doc內容的功能;第三個count介面實現通過Query獲取到命中數的功能。

這三個功能是搜尋中的最基本的三個功能點,對於大部分Elasticsearch中的查詢都是比較複雜的,直接用這個介面是無法滿足需求的,比如分散式問題。這些問題都留給了Elasticsearch解決,我們接下來看Elasticsearch中相關讀功能的剖析。

search1.png | center

在上圖中,該Shard有1個Primary和2個Replica Node,當查詢的時候,從三個節點中根據Request中的preference引數選擇一個節點查詢。preference可以設定local,primary,_replica以及其他選項。如果選擇了primary,則每次查詢都是直接查詢Primary,可以保證每次查詢都是最新的。如果設定了其他引數,那麼可能會查詢到R1或者R2,這時候就有可能查詢不到最新的資料。
上述程式碼邏輯在OperationRouting.Java的searchShards方法中。

接下來看一下,Elasticsearch中的查詢是如何支援分散式的。

Elasticsearch中通過分割槽實現分散式,資料寫入的時候根據_routing規則將資料寫入某一個Shard中,這樣就能將海量資料分佈在多個Shard以及多臺機器上,已達到分散式的目標。這樣就導致了,查詢的時候,潛在資料會在當前index的所有的Shard中,所以Elasticsearch查詢的時候需要查詢所有Shard,同一個Shard的Primary和Replica選擇一個即可,查詢請求會分發給所有Shard,每個Shard中都是一個獨立的查詢引擎,比如需要返回Top 10的結果,那麼每個Shard就會查詢並且返回Top 10的結果,然後在Client Node裡面會接收所有Shard的結果,然後通過優先順序佇列二次排序,選擇出Top 10的結果,返回給使用者。
search2.png | center

這裡有一個問題就是請求膨脹,使用者的一個搜尋請求在Elasticsearch內部會變成Shard個請求,這裡有個優化點是,雖然是Shard個請求,但是這個Shard個數不一定要是當前Index中的Shard個數,只要是當前查詢相關的Shard即可,通過這種方式可以優化請求膨脹數。

Elasticsearch中的查詢主要分為兩類,Get請求:通過ID查詢特定Doc;Search請求:通過Query查詢匹配Doc。

search3.png | center

上圖中記憶體中的Segment是指剛Refresh Segment,但是還沒持久化到磁碟的新Segment,而非從磁碟載入到記憶體中的Segment。

對於Search類請求,查詢的時候是一起查詢記憶體和磁碟上的Segment,最後將結果合併後返回。這種查詢是近實時(Near Real Time)的。

對於Get類請求,查詢的時候是先查詢記憶體中的TransLog,如果找到就立即返回,如果沒找到再查詢磁碟上的TransLog,如果還沒有則再去查詢磁碟上的Segment。這種查詢是實時(Real Time)的。這種查詢順序可以保證查詢到的Doc是最新版本的Doc,這個功能也是為了保證NoSQL場景下的實時性要求。

多階段查詢 | center

所有的搜尋系統一般都是兩階段查詢,第一階段查詢到匹配的DocID,第二階段再查詢DocID對應的完整文件,這種在Elasticsearch中稱為query_then_fetch,還有一種是一階段查詢的時候就返回完整Doc,在Elasticsearch中稱作query_and_fetch,一般第二種適用於只需要查詢一個Shard的請求。

除了一階段,兩階段外,還有一種三階段查詢的情況。搜尋裡面有一種算分邏輯是根據TF(Term Frequency)和DF(Document Frequency),但是Elasticsearch中查詢的時候,是在每個Shard中獨立查詢的,每個Shard中的TF和DF也是獨立的,雖然在寫入的時候通過_routing保證Doc分佈均勻,但是沒法保證TF和DF均勻,那麼就有會導致區域性的TF和DF不準的情況出現,這個時候基於TF、DF的算分就不準。為了解決這個問題,Elasticsearch中引入了DFS查詢,比如DFS_query_then_fetch,會先收集所有Shard中的TF和DF值,然後將這些值帶入請求中,再次執行query_then_fetch,這樣算分的時候TF和DF就是準確的,類似的有DFS_query_and_fetch。這種查詢的優勢是算分更加精準,但是效率會變差。另一種選擇是用BM25代替TF/DF模型。

Elasticsearch查詢流程

Elasticsearch中的大部分查詢,以及核心功能都是Search型別查詢,上面我們瞭解到查詢分為一階段,二階段和三階段,這裡我們就以最常見的的二階段查詢為例來介紹查詢流程。

查詢流程 | center

註冊Action

Elasticsearch中,查詢和寫操作一樣都是在ActionModule.java中註冊入口處理函式的。

registerHandler.accept(new RestSearchAction(settings, restController));
......
actions.register(SearchAction.INSTANCE, TransportSearchAction.class);
......

如果請求是Rest請求,則會在RestSearchAction中解析請求,檢查查詢型別,不能設定為dfs_query_and_fetch或者query_and_fetch,這兩個目前只能用於Elasticsearch中的優化場景,然後發給後面的TransportSearchAction.class處理。然後構造SearchRequest,將請求傳送給TransportSearchAction中處理。

search5.png | center
search6.png | center

如果是第一階段的Query Phase請求,則會呼叫SearchService的executeQueryPhase方法

如果是第二階段的Fetch Phase請求,則會呼叫SearchService的executeFetchPhase方法。

Client Node

Client Node 也包括了前面說過的Parse Request,這裡就不再贅述了,接下來看一下其他的部分。

1. Get Remove Cluster Shard

判斷是否需要跨叢集訪問,如果需要,則獲取到要訪問的Shard列表。

2. Get Search Shard Iterator

獲取當前Cluster中要訪問的Shard,和上一步中的Remove Cluster Shard合併,構建出最終要訪問的完整Shard列表。
這一步中,會在Primary Node和多個Replica Node中選擇出一個要訪問的Shard。

3. For Every Shard:Perform

遍歷每個Shard,對每個Shard執行後面邏輯。

4. Send Request To Query Shard

將查詢階段請求傳送給相應的Shard。

5. Merge Docs

上一步將請求傳送給多個Shard後,這一步就是非同步等待返回結果,然後對結果合併。這裡的合併策略是維護一個Top N大小的優先順序佇列,每當收到一個shard的返回,就把結果放入優先順序佇列做一次排序,直到所有的Shard都返回。
翻頁邏輯也是在這裡,如果需要取Top 30~ Top 40的結果,這個的意思是所有Shard查詢結果中的第30到40的結果,那麼在每個Shard中無法確定最終的結果,每個Shard需要返回Top 40的結果給Client Node,然後Client Node中在merge docs的時候,計算出Top 40的結果,最後再去除掉Top 30,剩餘的10個結果就是需要的Top 30~ Top 40的結果。
上述翻頁邏輯有一個明顯的缺點就是每次Shard返回的資料中包括了已經翻過的歷史結果,如果翻頁很深,則在這裡需要排序的Docs會很多,比如Shard有1000,取第9990到10000的結果,那麼這次查詢,Shard總共需要返回1000 * 10000,也就是一千萬Doc,這種情況很容易導致OOM。
另一種翻頁方式是使用search_after,這種方式會更輕量級,如果每次只需要返回10條結構,則每個Shard只需要返回search_after之後的10個結果即可,返回的總資料量只是和Shard個數以及本次需要的個數有關,和歷史已讀取的個數無關。這種方式更安全一些,推薦使用這種。
如果有aggregate,也會在這裡做聚合,但是不同的aggregate型別的merge策略不一樣,具體的可以在後面的aggregate文章中再介紹。

6. Send Request To Fetch Shard

選出Top N個Doc ID後傳送Fetch Shard給這些Doc ID所在的Shard,最後會返回Top N的Doc的內容。

Query Phase

接下來我們看第一階段查詢的步驟:

1. Create Search Context

建立Search Context,之後Search過程中的所有中間狀態都會存在Context中,這些狀態總共有50多個,具體可以檢視DefaultSearchContext或者其他SearchContext的子類。

2. Parse Query

解析Query的Source,將結果存入Search Context。這裡會根據請求中Query型別的不同建立不同的Query物件,比如TermQuery、FuzzyQuery等。

這裡包括了dfsPhase、queryPhase和fetchPhase三個階段的preProcess部分,只有queryPhase的preProcess中有執行邏輯,其他兩個都是空邏輯,執行完preProcess後,所有需要的引數都會設定完成。

由於Elasticsearch中有些請求之間是相互關聯的,並非獨立的,比如scroll請求,所以這裡同時會設定Context的生命週期。

同時會設定lowLevelCancellation是否開啟,這個引數是叢集級別配置,同時也能動態開關,開啟後會在後面執行時做更多的檢測,檢測是否需要停止後續邏輯直接返回。

3. Get From Cache

判斷請求是否允許被Cache,如果允許,則檢查Cache中是否已經有結果,如果有則直接讀取Cache,如果沒有則繼續執行後續步驟,執行完後,再將結果加入Cache。

4. Add Collectors

Collector主要目標是收集查詢結果,實現排序,自定義結果集過濾和收集等。這一步會增加多個Collectors,多個Collector組成一個List。

  1. FilteredCollector:先判斷請求中是否有Post Filter,Post Filter用於Search,Agg等結束後再次對結果做Filter,希望Filter不影響Agg結果。如果有Post Filter則建立一個FilteredCollector,加入Collector List中。
  2. PluginInMultiCollector:判斷請求中是否制定了自定義的一些Collector,如果有,則加入Collector List。
  3. MinimumScoreCollector:判斷請求中是否制定了最小分數閾值,如果指定了,則建立MinimumScoreCollector加入Collector List中,在後續收集結果時,會過濾掉得分小於最小分數的Doc。
  4. EarlyTerminatingCollector:判斷請求中是否提前結束Doc的Seek,如果是則建立EarlyTerminatingCollector,加入Collector List中。在後續Seek和收集Doc的過程中,當Seek的Doc數達到Early Terminating後會停止Seek後續倒排鏈。
  5. CancellableCollector:判斷當前操作是否可以被中斷結束,比如是否已經超時等,如果是會丟擲一個TaskCancelledException異常。該功能一般用來提前結束較長的查詢請求,可以用來保護系統。

    1.EarlyTerminatingSortingCollector:如果Index是排序的,那麼可以提前結束對倒排鏈的Seek,相當於在一個排序遞減連結串列上返回最大的N個值,只需要直接返回前N個值就可以了。這個Collector會加到Collector List的頭部。EarlyTerminatingSorting和EarlyTerminating的區別是,EarlyTerminatingSorting是一種對結果無損傷的優化,而EarlyTerminating是有損的,人為掐斷執行的優化。
  6. TopDocsCollector:這個是最核心的Top N結果選擇器,會加入到Collector List的頭部。TopScoreDocCollector和TopFieldCollector都是TopDocsCollector的子類,TopScoreDocCollector會按照固定的方式算分,排序會按照分數+doc id的方式排列,如果多個doc的分數一樣,先選擇doc id小的文件。而TopFieldCollector則是根據使用者指定的Field的值排序。

5. lucene::search

這一步會呼叫Lucene中IndexSearch的search介面,執行真正的搜尋邏輯。每個Shard中會有多個Segment,每個Segment對應一個LeafReaderContext,這裡會遍歷每個Segment,到每個Segment中去Search結果,然後計算分數。

搜尋裡面一般有兩階段算分,第一階段是在這裡算的,會對每個Seek到的Doc都計算分數,一般是算一個基本分數。這一階段完成後,會有個排序。然後再第二階段,再對Top 的結果做一次二階段算分,在二階段算分的時候會考慮更多的因子。二階段算分在下一步中。

Lucene中詳細的查詢流程,後面會有專門文章介紹。

6. rescore

根據Request中是否包含rescore配置決定是否進行二階段排序,如果有則執行二階段算分邏輯,會考慮更多的算分因子。二階段算分也是一種計算機中常見的多層設計,是一種資源消耗和效率的折中。
Elasticsearch中支援配置多個Rescore,這些rescore邏輯會順序遍歷執行。每個rescore內部會先按照請求引數window選擇出Top window的doc,然後對這些doc排序,排完後再合併回原有的Top 結果順序中。

7. suggest::execute()

如果有推薦請求,則在這裡執行推薦請求。如果請求中只包含了推薦的部分,則很多地方可以優化。推薦不是今天的重點,這裡就不介紹了,後面有機會再介紹。

8. aggregation::execute()

如果含有聚合統計請求,則在這裡執行。Elasticsearch中的aggregate的處理邏輯也類似於Search,通過多個Collector來實現。在Client  Node中也需要對aggregation做合併。aggregate邏輯更復雜一些,就不在這裡贅述了,後面有需要就再單獨開文章介紹。
上述邏輯都執行完成後,如果當前查詢請求只需要查詢一個Shard,那麼會直接在當前Node執行Fetch Phase。

Fetch Phase

Elasticsearch作為搜尋系統時,或者任何搜尋系統中,除了Query階段外,還會有一個Fetch階段,這個Fetch階段在資料庫類系統中是沒有的,是搜尋系統中額外增加的階段。搜尋系統中額外增加Fetch階段的原因是搜尋系統中資料分佈導致的,在搜尋中,資料通過routing分Shard的時候,只能根據一個主欄位值來決定,但是查詢的時候可能會根據其他非主欄位查詢,那麼這個時候所有Shard中都可能會存在相同非主欄位值的Doc,所以需要查詢所有Shard才能不會出現結果遺漏。同時如果查詢主欄位,那麼這個時候就能直接定位到Shard,就只需要查詢特定Shard即可,這個時候就類似於資料庫系統了。另外,資料庫中的二級索引又是另外一種情況,但類似於查主欄位的情況,這裡就不多說了。
基於上述原因,第一階段查詢的時候並不知道最終結果會在哪個Shard上,所以每個Shard中管都需要查詢完整結果,比如需要Top 10,那麼每個Shard都需要查詢當前Shard的所有資料,找出當前Shard的Top 10,然後返回給Client Node。如果有100個Shard,那麼就需要返回100 * 10 = 1000個結果,而Fetch Doc內容的操作比較耗費IO和CPU,如果在第一階段就Fetch Doc,那麼這個資源開銷就會非常大。所以,一般是當Client Node選擇出最終Top N的結果後,再對最終的Top N讀取Doc內容。通過增加一點網路開銷而避免大量IO和CPU操作,這個折中是非常划算的。
Fetch階段的目的是通過DocID獲取到使用者需要的完整Doc內容。這些內容包括了DocValues,Store,Source,Script和Highlight等,具體的功能點是在SearchModule中註冊的,系統預設註冊的有:

  • ExplainFetchSubPhase
  • DocValueFieldsFetchSubPhase
  • ScriptFieldsFetchSubPhase
  • FetchSourceSubPhase
  • VersionFetchSubPhase
  • MatchedQueriesFetchSubPhase
  • HighlightPhase
  • ParentFieldSubFetchPhase

除了系統預設的8種外,還有通過外掛的形式註冊自定義的功能,這些SubPhase中最重要的是Source和Highlight,Source是載入原文,Highlight是計算高亮顯示的內容片斷。
上述多個SubPhase會針對每個Doc順序執行,可能會產生多次的隨機IO,這裡會有一些優化方案,但是都是針對特定場景的,不具有通用性。
Fetch Phase執行完後,整個查詢流程就結束了。

概念解析

CURD 操作

CURD 操作都是針對具體的某個或某些文件的操作,每個文件的 routing 都是確認的,所以其所在分片也是可以事先確定的。該過程對應 ES 的 Document API。

  • 新建(C): 指對某個文件進行索引操作的過程。
  • 檢索(R): 指從 ES 中獲取某個或多個特定文件的過程。
  • 刪除(D): 指從 ES 中刪除某個文件讓其不再可被搜尋。
  • 更新(U): 指在 ES 中更新某個文件的過程,其實質是刪除+新建的過程。

搜尋

搜尋操作是指通過查詢條件從 ES 中獲取匹配的文件的過程,搜尋前不知道哪個文件會匹配查詢。該過程對應 ES 的 Search API。

 

路由和分片

分片

  • 文件在索引的時候,需要確定文件存放到哪個分片上去。(通過把 _id 作為 routing 來計算 shard)
  • 文件在檢索的時候,需要確定文件處在具體哪個分片上。(通過把 _id 作為 routing 來計算 shard)

路由

分片的確定,都是由路由來完成的,具體計算公式如下:

shard = hash(routing) % number_of_primary_shards
  • routing 值是一個任意字串,它預設是 _id 但也可以自定義。
  • routing 字串通過雜湊函式生成一個數字,然後除以主切片的數量得到一個餘數(remainder),餘數的範圍永遠是 0 到 number_of_primary_shards - 1 ,這個數字就是特定文件所
    在的分片。
  • 這也解釋了為什麼主分片的數量只能在建立索引時定義且不能修改:如果主分片的數量在未來改變了,所有先前的路由值就失效了,文件也就永遠找不到了。

 

文件的新建、索引和刪除

流程

新建、索引和刪除請求都是寫(write)操作,它們必須在主分片上成功完成才能複製到相關的複製分片上。並且要等所有複製分片完成後才向請求節點返回。

image

在主分片和複製分片上成功新建、索引或刪除一個文件必要的順序步驟:

  • 1. 客戶端給 Node 1 傳送新建、索引或刪除請求,Node 1 作為協調節點。
  • 2. 協調節點使用文件的 _id 確定文件屬於分片 0 (通過把 _id 作為 routing 來計算 shard)。它轉發請求到 Node 3 (主分片位於這個節點上)。
  • 3. Node 3 在主分片上執行請求,如果成功,它轉發請求到相應的位於 Node 1 和 Node 2 的複製節點上。當所有的複製節點報告成功, Node 3 報告成功給協調節點。
  • 4. 協調節點返回結果給客戶端。

客戶端接收到成功響應的時候,文件的修改已經被應用於主分片和所有的複製分片。

由於要主分片和複製分片都成功後才返回成功,所以寫操作是比較耗時的。

優化

replication:

replication 預設為 sync。也就是要等所有複製分片都操作完後才返回。

設定為 async 執行在主分片操作完成後即返回。

 

文件的檢索

流程

檢索文件為讀(read)操作,請求只需分片的任意一個副本返回操作結果即完成。

image

在主分片或複製分片上檢索一個文件必要的順序步驟:

  • 1. 客戶端給 Node1(主節點) 傳送 get 請求,Node 1 作為協調節點。
  • 2. 協調節點使用文件的 _id 確定文件屬於分片 0(通過把 _id 作為 routing 來計算 shard) 。分片 0 對應的複製分片在三個節點上都有。此時,它轉發請求到 Node 2 。
  • 3. Node 2 返回執行結果給協調節點。
  • 4. 協調節點返回結果給客戶端。

對於讀請求,為了平衡負載,協調節點會為每個分片的請求選擇不同的副本——它會迴圈所有分片副本。

 

文件的更新

流程

更新過程整體流程就是 “讀” + “寫” 操作。

image

執行更新必要的順序步驟:

  • 1. 客戶端給 Node 1 傳送更新請求,Node 1 作為協調節點。
  • 2. 協調節點轉發請求到主分片所在節點 Node 3(主分片) 。
  • 3. Node 3 從主分片檢索出文檔,修改 _source 欄位的JSON,然後在主分片上重新索引。如果有其他程序修改了文件,它以 retry_on_conflict 設定的次數重複步驟3,都未成功則放棄。
  • 4. 如果 Node 3 成功更新文件,它同時轉發文件的新版本到 Node 1 和 Node 2 上的複製分片以重新索引。當所有節點報告成功, Node 3 返回成功給協調節點。
  • 5. 協調節點返回結果給客戶端。

 

批量文件操作

批量檢索(mget)和批量新建、索引、更新、刪除(bulk)操作和單個文件的操作過程類似。

區別在於協調節點知道所有文件所在的分片,並將請求的文件根據所在的分片來分組。然後同時請求需要的節點。

一旦收到所有節點的響應,協調節點再將這多個節點的響應組合成一個響應結果返回給客戶端。

流程

mget 操作

image

mget 操作過程基本步驟:

  • 1. 客戶端傳送請求到 Node 1,Node 1 作為協調節點。
  • 2. 協調節點確認每一個操作請求的目標分片,並根據需要請求的目標分片重新分組。
  • 2. 協調節點同時轉發每組請求到目標主分片或複製分片(檢索操作任意分片都可以)。
  • 3. 一旦所有請求的分片都返回,協調節點整理結果,並返回給客戶端。

bulk操作

 bulk 操作過程基本步驟:

  • 1. 客戶端傳送請求到 Node 1,Node 1 作為協調節點。
  • 2. 協調節點確認每一個操作請求的目標主分片,並根據目標主分片重新分組。
  • 2. 協調節點同時請求包含這些主分片的節點(步驟2)。
  • 3. 每一個主分片一個接一個的處理每一個文件請求——某個文件在主分片上請求成功了,主分片將請求傳送給它所有的複製分片,然後就接著處理下一個文件請求。
  • 4. 當所有的複製分片請求成功後,主分片所在節點就向協調節點報告成功。
  • 5. 協調節點整理所有文件的結果,並返回給客戶端。

 

搜尋

文件的 CRUD 操作一次只處理一個單獨的文件(批量操作也是單個執行)。CRUD 操作中,文件的唯一性由 _index , _type 和 routing (通常預設是該文件的 _id )的組合來確定。這意味著我們可以準確知道叢集中的哪個分片有這個文件。

搜尋過程,由於不知道哪個文件會匹配查詢(文件可能存放在叢集中的任意分片上),所以搜尋需要一個更復雜的模型。搜尋通過查詢每一個我們感興趣的索引的分片副本,來看是否含有任何匹配的文件。

搜尋的執行過程分兩個階段,稱為查詢然後取回(query then fetch)。

查詢階段

GET /_search
{    "from": 90,
     "size": 10
}

image

1. 客戶端傳送一個 search(搜尋) 請求給 Node 3 , Node 3 建立了一個長度為 from+size 的空優先順序佇列。

2. Node 3 轉發這個搜尋請求到索引中每個分片的原本或副本。搜尋請求可以被每個分片的原本或任意副本處理。

(並非所有副本都處理同樣的請求,而是輪詢處理不同的請求,所以多副本能夠提高吞吐)

3. 每個分片在本地執行這個查詢並且結果將結果到一個大小為 from+size 的有序本地優先佇列裡去。

4. 每個分片返回document的ID和它優先佇列裡的所有document的排序值給協調節點 Node 3 。 Node 3 把這些值合併到自己的優先佇列裡產生全域性排序結果。

5. 對於多(multiple)或全部(all)索引的搜尋的工作機制和這完全一致——僅僅是