1. 程式人生 > 其它 >【ElasticSearch】線上索引重建

【ElasticSearch】線上索引重建

專案背景:

  1.由於專案中存在舊索引設定不合理情況,需要進行索引重建

  2.線上的ElasticSearch由1臺擴容到3臺,原有的索引需要分片

  例如:

   舊索引 index_user 設定主分片為1,副分片為0,資料沒有高可用

GET index_user/_search
{
  "took" : 121,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  }

實現步驟:

  1.新建索引,index_user_v2設定我們所需要的主分片和副分片數量

PUT index_user_v2
{
  "settings": {
    "number_of_replicas": 1,
    "number_of_shards": 5
  }
}

  2.設定索引資料結構,因為新索引和舊索引mapping結構一致,索引可以直接copy舊索引的資料結構;

PUT index_user_v2/t_user/_mappings
{
  "properties": {
    "age": {
      "type": "integer"
    },
    "ageScope": {
      "type": "keyword"
    },
    
"birthday": { "type": "long" }, "cityId": { "type": "integer" }, "cityName": { "type": "keyword" }, "countryCode": { "type": "integer" }, "countyId": { "type": "integer" }, "create_time": { "type": "long" }, "dbId": {
"type": "long" }, "email": { "type": "keyword" }, "gameIds": { "type": "text", "analyzer": "ik_max_word" }, "isCreateServer": { "type": "integer" }, "isDelete": { "type": "boolean" }, "nickName": { "type": "text", "analyzer": "ik_smart" }, "nickNamePingYin": { "type": "text", "analyzer": "pinyin" }, "nnNumber": { "type": "long" }, "provinceId": { "type": "integer" }, "provinceName": { "type": "keyword" }, "sex": { "type": "keyword" }, "signature": { "type": "keyword" }, "status": { "type": "keyword" }, "telNum": { "type": "keyword" }, "updae_time": { "type": "long" }, "userId": { "type": "long" }, "userType": { "type": "keyword" }, "userUrl": { "type": "keyword" }, "userUrlNn": { "type": "keyword" }, "user_id": { "type": "long" } } }

  3. 執行完步驟1和步驟2之後,在Kibana->Monitoring->Node裡面可以看到索引index_user_v2已經被自動分片到三個節點,如圖

  這裡,正式開始索引重建之前,可以將index_user_v2的副分片數量設定為0,減少副分片寫入帶來的時間損耗

  PUT index_user_v2/_settings
  {
    "settings": {
      "number_of_replicas": 0
    }
  }

   4.執行索引遷移,將index_user上的資料複製到index_user_v2, 同時設定wait_for_completion=false 表示索引遷移的請求會在後臺執行

# 索引遷移
POST /_reindex?wait_for_completion=false
{
  "source": {
    "index": "index_user"
  },
  "dest": {
    "index":"index_user_v2"
  }
}

  執行後,會生成一個taskId : 例如:Mroifc1NSJq2s7mf38XxmA:1679363718,後續我們可以使用這個taskId去查詢這個遷移任務的狀態,耗時,以及執行的進度等等

GET _tasks/Mroifc1NSJq2s7mf38XxmA:1679363718
{
  "completed" : true,
  "task" : {
    "node" : "Mroifc1NSJq2s7mf38XxmA",
    "id" : 1679363718,
    "type" : "transport",
    "action" : "indices:data/write/reindex",
    "status" : {
      "total" : 15480531,
      "updated" : 0,
      "created" : 15480531,
      "deleted" : 0,
      "batches" : 15481,
      "version_conflicts" : 0,
      "noops" : 0,
      "retries" : {
        "bulk" : 0,
        "search" : 0
      },
      "throttled_millis" : 0,
      "requests_per_second" : -1.0,
      "throttled_until_millis" : 0
    },
    "description" : "reindex from [index_user] to [index_user_v2]",
    "start_time_in_millis" : 1623316057822,
    "running_time_in_nanos" : 594661905143,
    "cancellable" : true,
    "headers" : { }
  },
  "response" : {
    "took" : 594661,
    "timed_out" : false,
    "total" : 15480531,
    "updated" : 0,
    "created" : 15480531,
    "deleted" : 0,
    "batches" : 15481,
    "version_conflicts" : 0,
    "noops" : 0,
    "retries" : {
      "bulk" : 0,
      "search" : 0
    },
    "throttled" : "0s",
    "throttled_millis" : 0,
    "requests_per_second" : -1.0,
    "throttled_until" : "0s",
    "throttled_until_millis" : 0,
    "failures" : [ ]
  }
}

    5.任務完成後  

    將舊索引index_user的別名index_user_latest 移除

    新索引index_user_v2新增別名index_user_latest

    至此完成全部的索引重建任務

# 別名替換
POST _aliases
{
  "actions": [
    {
      "add": {
        "index": "index_user_v2",
        "alias": "index_user_latest"
      }
    },
    {
      "remove": {
        "index": "index_user",
        "alias": "index_user_latest"
      }
    }
  ]
}

事後思考:

  1.執行_reindex索引遷移時,會讀取當前index_user舊索引的數量15480602條資料,將這批資料複製到新索引index_user_v2中

  但是實際生產會持續寫資料到舊索引index_user中,導致reindex複製的資料,會略小於實際的資料量

  處理方式:該索引的資料是StreamSet實時同步MySQL的資料到ElasticSearch中,這裡可以將StreamSet停止,記錄複製開始的時間,待複製完成後進行資料的增量同步;

  2.這裡有個點可以優化,_reindex複製後,wait_for_completion=false 會生成任務,可以將任務Id寫入定時任務中,輪訓該任務的狀態,任務結束後,可以及時通知;