「Elasticsearch」ES重建索引怎麼才能做到資料無縫遷移呢?
背景
眾所周知,Elasticsearch是⼀個實時的分散式搜尋引擎,為⽤戶提供搜尋服務。當我們決定儲存某種資料,在建立索引的時候就需要將資料結構,即Mapping確定下來,於此同時索引的設定和很多固定配置將不能改變。
那如果後續業務發生變化,需要改變資料結構或者更換ES更換分詞器怎麼辦呢?為此,Elastic團隊提供了很多通過輔助⼯具來幫助開發⼈員進⾏重建索引的方案。
如果對 reindex
API 不熟悉,那麼在遇到重構的時候,必然事倍功半,效率低下。反之,就可以方便地進行索引重構,省時省力。
步驟
假設之前我們已經存在一個blog索引,因為更換分詞器需要對該索引中的資料進行重建索引,以便支援業務使用新的分詞規則搜尋資料,並且儘可能使這個變化對外服務沒有感知,大概分為以下幾個步驟:
- 新增⼀個索引
blog_lastest
,Mapping資料結構與blog
索引一致 - 將
blog
資料同步至blog_lastest
- 刪除
blog
索引 - 資料同步後給
blog_lastest
新增別名blog
新建索引
在這裡推薦一個ES管理工具Kibana,主要針對資料的探索、視覺化和分析。
put /blog_lastest/ { "mappings":{ "properties":{ "title":{ "type":"text", "analyzer":"ik_max_word" }, "author":{ "type":"keyword", "fields":{ "seg":{ "type":"text", "analyzer":"ik_max_word" } } } } } }
將舊索引資料copy到新索引
同步等待
接⼝將會在 reindex 結束後返回
POST /_reindex
{
"source": {
"index": "blog"
},
"dest": {
"index": "blog_lastest"
}
}
在 kibana
中的使用如下所示
當然高版本(7.1.1)中,ES都有提供對應的Java REST Client
,比如
ReindexRequest reindexRequest = new ReindexRequest(); reindexRequest.setSourceIndices("blog").setSource.setDestIndex("blog_lastest"); TaskSubmissionResponse taskSubmissionResponse = client.submitReindexTask(reindexRequest, RequestOptions.DEFAULT);
為了防止贅述,接下來舉例全部以kibana
中請求介紹,如果有需要用Java REST Client
,可以自行去ES官網檢視。
非同步執⾏
如果 reindex 時間過⻓,建議加上 wait_for_completion=false
的引數條件,這樣 reindex 將直接返回 taskId
。
POST /_reindex?wait_for_completion=false
{
"source": {
"index": "blog"
},
"dest": {
"index": "blog_lastest"
}
}
返回:
{
"task" : "dpBihNSMQfSlboMGlTgCBA:4728038"
}
op_type 引數
op_type
引數控制著寫入資料的衝突處理方式,如果把 op_type
設定為 create
【預設值】,在 _reindex
API 中,表示寫入時只在 dest
index
中新增不存在的 doucment,如果相同的 document 已經存在,則會報 version confilct
的錯誤,那麼索引操作就會失敗。【這種方式與使用 _create API 時效果一致】
POST _reindex
{
"source": {
"index": "blog"
},
"dest": {
"index": "blog_lastest",
"op_type": "create"
}
}
如果這樣設定了,也就不存在更新資料的場景了【衝突資料無法寫入】,我們也可以把 op_type
設定為 index
,表示所有的資料全部重新索引建立。
conflicts 配置
預設情況下,當發生 version conflict
的時候,_reindex
會被 abort
,任務終止【此時資料還沒有 reindex
完成】,在返回體中的 failures
指標中會包含衝突的資料【有時候資料會非常多】,除非把 conflicts
設定為 proceed
。
關於 abort
的說明,如果產生了 abort
,已經執行的資料【例如更新寫入的】仍然存在於目標索引,此時任務終止,還會有資料沒有被執行,也就是漏數了。換句話說,該執行過程不會回滾,只會終止。如果設定了 proceed
,任務在檢測到資料衝突的情況下,不會終止,會跳過沖突資料繼續執行,直到所有資料執行完成,此時不會漏掉正常的資料,只會漏掉有衝突的資料。
POST _reindex
{
"source": {
"index": "blog"
},
"dest": {
"index": "blog_lastest",
"op_type": "create"
},
"conflicts": "proceed"
}
我們可以故意把 op_type
設定為 create
,人為製造資料衝突的場景,測試時更容易觀察到衝突現象。
如果把 conflicts
設定為 proceed
,在返回體結果中不會再出現 failures
的資訊,但是通過 version_conflicts
指標可以看到具體的數量。
批次大小配置
當你發現reindex
的速度有些慢的時候,可以在 query
引數的同一層次【即 source
引數中】新增 size
引數,表示 scroll size
的大小【會影響批次的次數,進而影響整體的速度】,如果不顯式設定,預設是一批 1000 條資料,在一開始的簡單示例中也看到了。
如下,設定 scroll size
為 5000:
POST /_reindex?wait_for_completion=false
{
"source": {
"index": "blog",
"size":5000
},
"dest": {
"index": "blog_lastest",
"op_type": "create"
},
"conflicts": "proceed"
}
測試後,速度達到了 30 分鐘 500 萬左右,明顯提升了很多。
根據taskId可以實時檢視任務的執行狀態
一般來說,如果我們的 source index
很大【比如幾百萬資料量】,則可能需要比較長的時間來完成 _reindex
的工作,可能需要幾十分鐘。而在此期間不可能一直等待結果返回,可以去做其它事情,如果中途需要檢視進度,可以通過 _tasks
API 進行檢視。
GET /_tasks/{taskId}
返回:
{
"completed" : false,
"task" : {
"node" : "dpBihNSMQfSlboMGlTgCBA",
"id" : 4704218,
"type" : "transport",
"action" : "indices:data/write/reindex",
……
}
當執行完畢時,completed
為true
檢視任務進度以及取消任務,除了根據taskId檢視以外,我們還可以通過檢視所有的任務中篩選本次reindex
的任務。
GET _tasks?detailed=true&actions=*reindex
返回結果:
{
"nodes" : {
"dpBihNSMQfSlboMGlTgCBA" : {
"name" : "node-16111-9210",
"transport_address" : "192.168.XXX.XXX:9310",
"host" : "192.168.XXX.XXX",
"ip" : "192.168.16.111:9310",
"roles" : [
"ingest",
"master"
],
"attributes" : {
"xpack.installed" : "true",
"transform.node" : "false"
},
"tasks" : {
"dpBihNSMQfSlboMGlTgCBA:6629305" : {
"node" : "dpBihNSMQfSlboMGlTgCBA",
"id" : 6629305,
"type" : "transport",
"action" : "indices:data/write/reindex",
"status" : {
"total" : 8361421,
"updated" : 0,
"created" : 254006,
"deleted" : 0,
"batches" : 743,
"version_conflicts" : 3455994,
"noops" : 0,
"retries" : {
"bulk" : 0,
"search" : 0
},
"throttled_millis" : 0,
"requests_per_second" : -1.0,
"throttled_until_millis" : 0
},
"description" : "reindex from [blog] to [blog_lastest][_doc]",
"start_time_in_millis" : 1609338953464,
"running_time_in_nanos" : 1276738396689,
"cancellable" : true,
"headers" : { }
}
}
}
}
}
注意觀察裡面的幾個重要指標,例如從 description
中可以看到任務描述,從 tasks 中可以找到任務的 id
【例如 dpBihNSMQfSlboMGlTgCBA:6629305
】,從 cancellable
可以判斷任務是否支援取消操作。
這個 API 其實就是模糊匹配,同理也可以查詢其它型別的任務資訊,例如使用 GET _tasks?detailed=true&actions=*byquery
檢視查詢請求的狀態。
當叢集的任務太多時我們就可以根據task_id
,也就是上面提到GET /_tasks/task_id
方式更加準確地查詢指定任務的狀態,避免叢集的任務過多,不方便檢視。
如果遇到操作失誤的場景,想取消任務,有沒有辦法呢?
當然有啦,雖然覆水難收,通過呼叫
_tasks API
:
POST _tasks/task_id/_cancel
這裡的 task_id
就是通過上面的查詢任務介面獲取的任務id(任務要支援取消操作,即【cancellable 為 true】時方能收效)。
刪除舊索引
當我們通過 API 查詢發現任務完成後,就可以進行後續操作,我這裡是要刪除舊索引,然後再給新索引起別名,用於替換舊索引,這樣才能保證對外服務沒有任何感知。
DELETE /blog
使用別名
POST /_aliases
{
"actions":[
{
"add":{
"index":"blog_lastest",
"alias":"blog"
}
}
]
}
通過別名訪問新索引
進行過以上操作後,我們可以使用一個簡單的搜尋驗證服務。
POST /blog/_search
{
"query": {
"match": {
"author": "james"
}
}
}
如果搜尋結果達到我們的預期目標,至此,資料索引重建遷移完成。
本文可轉載,但需宣告原文出處。 程式設計師小明,一個很少加班的程式設計師。歡迎關注微信公眾號“程式設計師小明”,獲取更多優質文章。