1. 程式人生 > >[ES]elasticsearch章3 ES寫入過程解析

[ES]elasticsearch章3 ES寫入過程解析

Elasticsearch的寫

Elasticsearch採用多Shard方式,通過配置routing規則將資料分成多個數據子集,每個資料子集提供獨立的索引和搜尋功能。當寫入文件的時候,根據routing規則,將文件傳送給特定Shard中建立索引。這樣就能實現分散式了。

此外,Elasticsearch整體架構上採用了一主多副的方式:

每個Index由多個Shard組成,每個Shard有一個主節點和多個副本節點,副本個數可配。但每次寫入的時候,寫入請求會先根據_routing規則選擇發給哪個Shard,Index Request中可以設定使用哪個Filed的值作為路由引數,如果沒有設定,則使用Mapping中的配置,如果mapping中也沒有配置,則使用_id作為路由引數,然後通過_routing的Hash值選擇出Shard(在OperationRouting類中),最後從叢集的Meta中找出出該Shard的Primary節點。

請求接著會發送給Primary Shard,在Primary Shard上執行成功後,再從Primary Shard上將請求同時傳送給多個Replica Shard,請求在多個Replica Shard上執行成功並返回給Primary Shard後,寫入請求執行成功,返回結果給客戶端。

這種模式下,寫入操作的延時就等於latency = Latency(Primary Write) + Max(Replicas Write)。只要有副本在,寫入延時最小也是兩次單Shard的寫入時延總和,寫入效率會較低,但是這樣的好處也很明顯,避免寫入後,單機或磁碟故障導致資料丟失,在資料重要性和效能方面,一般都是優先選擇資料,除非一些允許丟資料的特殊場景。

採用多個副本後,避免了單機或磁碟故障發生時,對已經持久化後的資料造成損害,但是Elasticsearch裡為了減少磁碟IO保證讀寫效能,一般是每隔一段時間(比如5分鐘)才會把Lucene的Segment寫入磁碟持久化,對於寫入記憶體,但還未Flush到磁碟的Lucene資料,如果發生機器宕機或者掉電,那麼記憶體中的資料也會丟失,這時候如何保證?

對於這種問題,Elasticsearch學習了資料庫中的處理方式:增加CommitLog模組,Elasticsearch中叫TransLog。

在每一個Shard中,寫入流程分為兩部分,先寫入Lucene,再寫入TransLog。

寫入請求到達Shard後,先寫Lucene檔案,建立好索引,此時索引還在記憶體裡面,接著去寫TransLog,寫完TransLog後,重新整理TransLog資料到磁碟上,寫磁碟成功後,請求返回給使用者。這裡有幾個關鍵點,一是和資料庫不同,資料庫是先寫CommitLog,然後再寫記憶體,而Elasticsearch是先寫記憶體,最後才寫TransLog,一種可能的原因是Lucene的記憶體寫入會有很複雜的邏輯,很容易失敗,比如分詞,欄位長度超過限制等,比較重,為了避免TransLog中有大量無效記錄,減少recover的複雜度和提高速度,所以就把寫Lucene放在了最前面。二是寫Lucene記憶體後,並不是可被搜尋的,需要通過Refresh把記憶體的物件轉成完整的Segment後,然後再次reopen後才能被搜尋,一般這個時間設定為1秒鐘,導致寫入Elasticsearch的文件,最快要1秒鐘才可被從搜尋到,所以Elasticsearch在搜尋方面是NRT(Near Real Time)近實時的系統。三是當Elasticsearch作為NoSQL資料庫時,查詢方式是GetById,這種查詢可以直接從TransLog中查詢,這時候就成了RT(Real Time)實時系統。四是每隔一段比較長的時間,比如30分鐘後,Lucene會把記憶體中生成的新Segment重新整理到磁碟上,重新整理後索引檔案已經持久化了,歷史的TransLog就沒用了,會清空掉舊的TransLog。

Lucene中不支援部分欄位的Update,所以需要在Elasticsearch中實現該功能,具體流程如下:

  1. 收到Update請求後,從Segment或者TransLog中讀取同id的完整Doc,記錄版本號為V1。
  2. 將版本V1的全量Doc和請求中的部分欄位Doc合併為一個完整的Doc,同時更新記憶體中的VersionMap。獲取到完整Doc後,Update請求就變成了Index請求。
  3. 加鎖。
  4. 再次從versionMap中讀取該id的最大版本號V2,如果versionMap中沒有,則從Segment或者TransLog中讀取,這裡基本都會從versionMap中獲取到。
  5. 檢查版本是否衝突(V1==V2),如果衝突,則回退到開始的“Update doc”階段,重新執行。如果不衝突,則執行最新的Add請求。
  6. 在Index Doc階段,首先將Version + 1得到V3,再將Doc加入到Lucene中去,Lucene中會先刪同id下的已存在doc id,然後再增加新Doc。寫入Lucene成功後,將當前V3更新到versionMap中。
  7. 釋放鎖,部分更新的流程就結束了。
  8. 介紹完部分更新的流程後,大家應該從整體架構上對Elasticsearch的寫入有了一個初步的映象,接下來我們詳細剖析下寫入的詳細步驟。

寫入流程各角色分工如下:

Client Node

Client Node 也包括了前面說過的Parse Request,這裡就不再贅述了,接下來看一下其他的部分。

1. Ingest Pipeline

在這一步可以對原始文件做一些處理,比如HTML解析,自定義的處理,具體處理邏輯可以通過外掛來實現。在Elasticsearch中,由於Ingest Pipeline會比較耗費CPU等資源,可以設定專門的Ingest Node,專門用來處理Ingest Pipeline邏輯。

如果當前Node不能執行Ingest Pipeline,則會將請求發給另一臺可以執行Ingest Pipeline的Node。

2. Auto Create Index

判斷當前Index是否存在,如果不存在,則需要自動建立Index,這裡需要和Master互動。也可以通過配置關閉自動建立Index的功能。

3. Set Routing

設定路由條件,如果Request中指定了路由條件,則直接使用Request中的Routing,否則使用Mapping中配置的,如果Mapping中無配置,則使用預設的_id欄位值。

在這一步中,如果沒有指定id欄位,則會自動生成一個唯一的_id欄位,目前使用的是UUID。

4. Construct BulkShardRequest

由於Bulk Request中會包括多個(Index/Update/Delete)請求,這些請求根據routing可能會落在多個Shard上執行,這一步會按Shard挑揀Single Write Request,同一個Shard中的請求聚集在一起,構建BulkShardRequest,每個BulkShardRequest對應一個Shard。

5. Send Request To Primary

這一步會將每一個BulkShardRequest請求傳送給相應Shard的Primary Node。

 

Primary Node

Primary 請求的入口是在PrimaryOperationTransportHandler的messageReceived,我們來看一下相關的邏輯流程。

1. Index or Update or Delete

迴圈執行每個Single Write Request,對於每個Request,根據操作型別(CREATE/INDEX/UPDATE/DELETE)選擇不同的處理邏輯。

其中,Create/Index是直接新增Doc,Delete是直接根據_id刪除Doc,Update會稍微複雜些,我們下面就以Update為例來介紹。

2. Translate Update To Index or Delete

這一步是Update操作的特有步驟,在這裡,會將Update請求轉換為Index或者Delete請求。首先,會通過GetRequest查詢到已經存在的同_id Doc(如果有)的完整欄位和值(依賴_source欄位),然後和請求中的Doc合併。同時,這裡會獲取到讀到的Doc版本號,記做V1。

3. Parse Doc

這裡會解析Doc中各個欄位。生成ParsedDocument物件,同時會生成uid Term。在Elasticsearch中,_uid = type # _id,對使用者,_Id可見,而Elasticsearch中儲存的是_uid。這一部分生成的ParsedDocument中也有Elasticsearch的系統欄位,大部分會根據當前內容填充,部分未知的會在後面繼續填充ParsedDocument。

4. Update Mapping

Elasticsearch中有個自動更新Mapping的功能,就在這一步生效。會先挑選出Mapping中未包含的新Field,然後判斷是否執行自動更新Mapping,如果允許,則更新Mapping。

5. Get Sequence Id and Version

由於當前是Primary Shard,則會從SequenceNumber Service獲取一個sequenceID和Version。SequenceID在Shard級別每次遞增1,SequenceID在寫入Doc成功後,會用來初始化LocalCheckpoint。Version則是根據當前Doc的最大Version遞增1。

6. Add Doc To Lucene

這一步開始的時候會給特定_uid加鎖,然後判斷該_uid對應的Version是否等於之前Translate Update To Index步驟裡獲取到的Version,如果不相等,則說明剛才讀取Doc後,該Doc發生了變化,出現了版本衝突,這時候會丟擲一個VersionConflict的異常,該異常會在Primary Node最開始處捕獲,重新從“Translate Update To Index or Delete”開始執行。

如果Version相等,則繼續執行,如果已經存在同id的Doc,則會呼叫Lucene的UpdateDocument(uid, doc)介面,先根據uid刪除Doc,然後再Index新Doc。如果是首次寫入,則直接呼叫Lucene的AddDocument介面完成Doc的Index,AddDocument也是通過UpdateDocument實現。

這一步中有個問題是,如何保證Delete-Then-Add的原子性,怎麼避免中間狀態時被Refresh?答案是在開始Delete之前,會加一個Refresh Lock,禁止被Refresh,只有等Add完後釋放了Refresh Lock後才能被Refresh,這樣就保證了Delete-Then-Add的原子性。

Lucene的UpdateDocument介面中就只是處理多個Field,會遍歷每個Field逐個處理,處理順序是invert index,store field,doc values,point dimension,後續會有文章專門介紹Lucene中的寫入。

7. Write Translog

寫完Lucene的Segment後,會以keyvalue的形式寫TransLog,Key是_id,Value是Doc內容。當查詢的時候,如果請求是GetDocByID,則可以直接根據_id從TransLog中讀取到,滿足NoSQL場景下的實時性要去。

需要注意的是,這裡只是寫入到記憶體的TransLog,是否Sync到磁碟的邏輯還在後面。

這一步的最後,會標記當前SequenceID已經成功執行,接著會更新當前Shard的LocalCheckPoint。

8. Renew Bulk Request

這裡會重新構造Bulk Request,原因是前面已經將UpdateRequest翻譯成了Index或Delete請求,則後續所有Replica中只需要執行Index或Delete請求就可以了,不需要再執行Update邏輯,一是保證Replica中邏輯更簡單,效能更好,二是保證同一個請求在Primary和Replica中的執行結果一樣。

9. Flush Translog

這裡會根據TransLog的策略,選擇不同的執行方式,要麼是立即Flush到磁碟,要麼是等到以後再Flush。Flush的頻率越高,可靠性越高,對寫入效能影響越大。

10. Send Requests To Replicas

這裡會將剛才構造的新的Bulk Request並行傳送給多個Replica,然後等待Replica的返回,這裡需要等待所有Replica返回後(可能有成功,也有可能失敗),Primary Node才會返回使用者。如果某個Replica失敗了,則Primary會給Master傳送一個Remove Shard請求,要求Master將該Replica Shard從可用節點中移除。

這裡,同時會將SequenceID,PrimaryTerm,GlobalCheckPoint等傳遞給Replica。

傳送給Replica的請求中,Action Name等於原始ActionName + [R],這裡的R表示Replica。通過這個[R]的不同,可以找到處理Replica請求的Handler。

11. Receive Response From Replicas

Replica中請求都處理完後,會更新Primary Node的LocalCheckPoint。

Replica Node

Replica 請求的入口是在ReplicaOperationTransportHandler的messageReceived,我們來看一下相關的邏輯流程。

1. Index or Delete

根據請求型別是Index還是Delete,選擇不同的執行邏輯。這裡沒有Update,是因為在Primary Node中已經將Update轉換成了Index或Delete請求了。

2. Parse Doc

3. Update Mapping

以上都和Primary Node中邏輯一致。

4. Get Sequence Id and Version

Primary Node中會生成Sequence ID和Version,然後放入ReplicaRequest中,這裡只需要從Request中獲取到就行。

5. Add Doc To Lucene

由於已經在Primary Node中將部分Update請求轉換成了Index或Delete請求,這裡只需要處理Index和Delete兩種請求,不再需要處理Update請求了。比Primary Node會更簡單一些。

6. Write Translog

7. Flush Translog

以上都和Primary Node中邏輯一致。