1. 程式人生 > 實用技巧 >「Elasticsearch」ES重建索引怎麼才能做到資料無縫遷移呢?

「Elasticsearch」ES重建索引怎麼才能做到資料無縫遷移呢?

背景

眾所周知,Elasticsearch是⼀個實時的分散式搜尋引擎,為⽤戶提供搜尋服務。當我們決定儲存某種資料,在建立索引的時候就需要將資料結構,即Mapping確定下來,於此同時索引的設定和很多固定配置將不能改變。

那如果後續業務發生變化,需要改變資料結構或者更換ES更換分詞器怎麼辦呢?為此,Elastic團隊提供了很多通過輔助⼯具來幫助開發⼈員進⾏重建索引的方案。
如果對 reindex API 不熟悉,那麼在遇到重構的時候,必然事倍功半,效率低下。反之,就可以方便地進行索引重構,省時省力。

步驟

假設之前我們已經存在一個blog索引,因為更換分詞器需要對該索引中的資料進行重建索引,以便支援業務使用新的分詞規則搜尋資料,並且儘可能使這個變化對外服務沒有感知,大概分為以下幾個步驟:​

  • 新增⼀個索引blog_lastest,Mapping資料結構與blog索引一致
  • blog資料同步至blog_lastest
  • 刪除blog索引
  • 資料同步後給blog_lastest新增別名blog

新建索引

在這裡推薦一個ES管理工具Kibana,主要針對資料的探索、視覺化和分析。

put /blog_lastest/
{
    "mappings":{
        "properties":{
            "title":{
                "type":"text",
                "analyzer":"ik_max_word"
            },
            "author":{
                "type":"keyword",
                "fields":{
                    "seg":{
                        "type":"text",
                        "analyzer":"ik_max_word"
                    }
                }
            }
        }
    }
}

將舊索引資料copy到新索引

同步等待

接⼝將會在 reindex 結束後返回

POST /_reindex
{
	"source": {
		"index": "blog"
	},
	"dest": {
		"index": "blog_lastest"
	}
}

kibana 中的使用如下所示

當然高版本(7.1.1)中,ES都有提供對應的Java REST Client,比如

ReindexRequest reindexRequest = new ReindexRequest();
reindexRequest.setSourceIndices("blog").setSource.setDestIndex("blog_lastest");
TaskSubmissionResponse taskSubmissionResponse = client.submitReindexTask(reindexRequest, RequestOptions.DEFAULT);

為了防止贅述,接下來舉例全部以kibana中請求介紹,如果有需要用Java REST Client,可以自行去ES官網檢視。

非同步執⾏

如果 reindex 時間過⻓,建議加上 wait_for_completion=false 的引數條件,這樣 reindex 將直接返回 taskId

POST /_reindex?wait_for_completion=false
{
	"source": {
		"index": "blog"
	},
	"dest": {
		"index": "blog_lastest"
	}
}

返回:

{
  "task" : "dpBihNSMQfSlboMGlTgCBA:4728038"
}

op_type 引數

op_type 引數控制著寫入資料的衝突處理方式,如果把 op_type 設定為 create【預設值】,在 _reindex API 中,表示寫入時只在 dest index中新增不存在的 doucment,如果相同的 document 已經存在,則會報 version confilct 的錯誤,那麼索引操作就會失敗。【這種方式與使用 _create API 時效果一致】

POST _reindex
{
  "source": {
    "index": "blog"
  },
  "dest": {
    "index": "blog_lastest",
    "op_type": "create"
  }
}

如果這樣設定了,也就不存在更新資料的場景了【衝突資料無法寫入】,我們也可以把 op_type 設定為 index,表示所有的資料全部重新索引建立。

conflicts 配置

預設情況下,當發生 version conflict 的時候,_reindex 會被 abort,任務終止【此時資料還沒有 reindex 完成】,在返回體中的 failures 指標中會包含衝突的資料【有時候資料會非常多】,除非把 conflicts 設定為 proceed

關於 abort 的說明,如果產生了 abort,已經執行的資料【例如更新寫入的】仍然存在於目標索引,此時任務終止,還會有資料沒有被執行,也就是漏數了。換句話說,該執行過程不會回滾,只會終止。如果設定了 proceed,任務在檢測到資料衝突的情況下,不會終止,會跳過沖突資料繼續執行,直到所有資料執行完成,此時不會漏掉正常的資料,只會漏掉有衝突的資料。

POST _reindex
{
  "source": {
    "index": "blog"
  },
  "dest": {
    "index": "blog_lastest",
    "op_type": "create"
  },
  "conflicts": "proceed"
}

我們可以故意把 op_type 設定為 create,人為製造資料衝突的場景,測試時更容易觀察到衝突現象。

如果把 conflicts 設定為 proceed,在返回體結果中不會再出現 failures 的資訊,但是通過 version_conflicts 指標可以看到具體的數量。

批次大小配置

當你發現reindex的速度有些慢的時候,可以在 query 引數的同一層次【即 source 引數中】新增 size 引數,表示 scroll size 的大小【會影響批次的次數,進而影響整體的速度】,如果不顯式設定,預設是一批 1000 條資料,在一開始的簡單示例中也看到了。
如下,設定 scroll size 為 5000:

POST /_reindex?wait_for_completion=false
{
	"source": {
		"index": "blog",
		"size":5000
	},
	"dest": {
		"index": "blog_lastest",
		"op_type": "create"
	},
	"conflicts": "proceed"
}

測試後,速度達到了 30 分鐘 500 萬左右,明顯提升了很多。

根據taskId可以實時檢視任務的執行狀態

一般來說,如果我們的 source index 很大【比如幾百萬資料量】,則可能需要比較長的時間來完成 _reindex 的工作,可能需要幾十分鐘。而在此期間不可能一直等待結果返回,可以去做其它事情,如果中途需要檢視進度,可以通過 _tasks API 進行檢視。

GET /_tasks/{taskId}

返回:

{
  "completed" : false,
  "task" : {
    "node" : "dpBihNSMQfSlboMGlTgCBA",
    "id" : 4704218,
    "type" : "transport",
    "action" : "indices:data/write/reindex",
    ……
}

當執行完畢時,completed為true
檢視任務進度以及取消任務,除了根據taskId檢視以外,我們還可以通過檢視所有的任務中篩選本次reindex的任務。

GET _tasks?detailed=true&actions=*reindex

返回結果:

{
  "nodes" : {
    "dpBihNSMQfSlboMGlTgCBA" : {
      "name" : "node-16111-9210",
      "transport_address" : "192.168.XXX.XXX:9310",
      "host" : "192.168.XXX.XXX",
      "ip" : "192.168.16.111:9310",
      "roles" : [
        "ingest",
        "master"
      ],
      "attributes" : {
        "xpack.installed" : "true",
        "transform.node" : "false"
      },
      "tasks" : {
        "dpBihNSMQfSlboMGlTgCBA:6629305" : {
          "node" : "dpBihNSMQfSlboMGlTgCBA",
          "id" : 6629305,
          "type" : "transport",
          "action" : "indices:data/write/reindex",
          "status" : {
            "total" : 8361421,
            "updated" : 0,
            "created" : 254006,
            "deleted" : 0,
            "batches" : 743,
            "version_conflicts" : 3455994,
            "noops" : 0,
            "retries" : {
              "bulk" : 0,
              "search" : 0
            },
            "throttled_millis" : 0,
            "requests_per_second" : -1.0,
            "throttled_until_millis" : 0
          },
          "description" : "reindex from [blog] to [blog_lastest][_doc]",
          "start_time_in_millis" : 1609338953464,
          "running_time_in_nanos" : 1276738396689,
          "cancellable" : true,
          "headers" : { }
        }
      }
    }
  }
}

注意觀察裡面的幾個重要指標,例如從 description 中可以看到任務描述,從 tasks 中可以找到任務的 id【例如 dpBihNSMQfSlboMGlTgCBA:6629305】,從 cancellable 可以判斷任務是否支援取消操作。
這個 API 其實就是模糊匹配,同理也可以查詢其它型別的任務資訊,例如使用 GET _tasks?detailed=true&actions=*byquery 檢視查詢請求的狀態。
當叢集的任務太多時我們就可以根據task_id,也就是上面提到GET /_tasks/task_id 方式更加準確地查詢指定任務的狀態,避免叢集的任務過多,不方便檢視。
如果遇到操作失誤的場景,想取消任務,有沒有辦法呢?
當然有啦,雖然覆水難收,通過呼叫
_tasks API

POST _tasks/task_id/_cancel

這裡的 task_id 就是通過上面的查詢任務介面獲取的任務id(任務要支援取消操作,即【cancellable 為 true】時方能收效)。

刪除舊索引

當我們通過 API 查詢發現任務完成後,就可以進行後續操作,我這裡是要刪除舊索引,然後再給新索引起別名,用於替換舊索引,這樣才能保證對外服務沒有任何感知。

DELETE /blog

使用別名

POST /_aliases
{
    "actions":[
        {
            "add":{
                "index":"blog_lastest",
                "alias":"blog"
            }
        }
    ]
}

通過別名訪問新索引

進行過以上操作後,我們可以使用一個簡單的搜尋驗證服務。

POST /blog/_search
{
	"query": {
		"match": {
			"author": "james"
		}
	}
}

如果搜尋結果達到我們的預期目標,至此,資料索引重建遷移完成。

本文可轉載,但需宣告原文出處。 程式設計師小明,一個很少加班的程式設計師。歡迎關注微信公眾號“程式設計師小明”,獲取更多優質文章。