【ElasticSearch】線上索引重建
專案背景:
1.由於專案中存在舊索引設定不合理情況,需要進行索引重建
2.線上的ElasticSearch由1臺擴容到3臺,原有的索引需要分片
例如:
舊索引 index_user 設定主分片為1,副分片為0,資料沒有高可用
GET index_user/_search
{ "took" : 121, "timed_out" : false, "_shards" : { "total" : 1, "successful" : 1, "skipped" : 0, "failed" : 0 }
實現步驟:
1.新建索引,index_user_v2設定我們所需要的主分片和副分片數量
PUT index_user_v2 { "settings": { "number_of_replicas": 1, "number_of_shards": 5 } }
2.設定索引資料結構,因為新索引和舊索引mapping結構一致,索引可以直接copy舊索引的資料結構;
PUT index_user_v2/t_user/_mappings { "properties": { "age": { "type": "integer" }, "ageScope": { "type": "keyword" },"birthday": { "type": "long" }, "cityId": { "type": "integer" }, "cityName": { "type": "keyword" }, "countryCode": { "type": "integer" }, "countyId": { "type": "integer" }, "create_time": { "type": "long" }, "dbId": {"type": "long" }, "email": { "type": "keyword" }, "gameIds": { "type": "text", "analyzer": "ik_max_word" }, "isCreateServer": { "type": "integer" }, "isDelete": { "type": "boolean" }, "nickName": { "type": "text", "analyzer": "ik_smart" }, "nickNamePingYin": { "type": "text", "analyzer": "pinyin" }, "nnNumber": { "type": "long" }, "provinceId": { "type": "integer" }, "provinceName": { "type": "keyword" }, "sex": { "type": "keyword" }, "signature": { "type": "keyword" }, "status": { "type": "keyword" }, "telNum": { "type": "keyword" }, "updae_time": { "type": "long" }, "userId": { "type": "long" }, "userType": { "type": "keyword" }, "userUrl": { "type": "keyword" }, "userUrlNn": { "type": "keyword" }, "user_id": { "type": "long" } } }
3. 執行完步驟1和步驟2之後,在Kibana->Monitoring->Node裡面可以看到索引index_user_v2已經被自動分片到三個節點,如圖
這裡,正式開始索引重建之前,可以將index_user_v2的副分片數量設定為0,減少副分片寫入帶來的時間損耗
PUT index_user_v2/_settings { "settings": { "number_of_replicas": 0 } }
4.執行索引遷移,將index_user上的資料複製到index_user_v2, 同時設定wait_for_completion=false 表示索引遷移的請求會在後臺執行
# 索引遷移 POST /_reindex?wait_for_completion=false { "source": { "index": "index_user" }, "dest": { "index":"index_user_v2" } }
執行後,會生成一個taskId : 例如:Mroifc1NSJq2s7mf38XxmA:1679363718,後續我們可以使用這個taskId去查詢這個遷移任務的狀態,耗時,以及執行的進度等等
GET _tasks/Mroifc1NSJq2s7mf38XxmA:1679363718
{ "completed" : true, "task" : { "node" : "Mroifc1NSJq2s7mf38XxmA", "id" : 1679363718, "type" : "transport", "action" : "indices:data/write/reindex", "status" : { "total" : 15480531, "updated" : 0, "created" : 15480531, "deleted" : 0, "batches" : 15481, "version_conflicts" : 0, "noops" : 0, "retries" : { "bulk" : 0, "search" : 0 }, "throttled_millis" : 0, "requests_per_second" : -1.0, "throttled_until_millis" : 0 }, "description" : "reindex from [index_user] to [index_user_v2]", "start_time_in_millis" : 1623316057822, "running_time_in_nanos" : 594661905143, "cancellable" : true, "headers" : { } }, "response" : { "took" : 594661, "timed_out" : false, "total" : 15480531, "updated" : 0, "created" : 15480531, "deleted" : 0, "batches" : 15481, "version_conflicts" : 0, "noops" : 0, "retries" : { "bulk" : 0, "search" : 0 }, "throttled" : "0s", "throttled_millis" : 0, "requests_per_second" : -1.0, "throttled_until" : "0s", "throttled_until_millis" : 0, "failures" : [ ] } }
5.任務完成後
將舊索引index_user的別名index_user_latest 移除
新索引index_user_v2新增別名index_user_latest
至此完成全部的索引重建任務
# 別名替換 POST _aliases { "actions": [ { "add": { "index": "index_user_v2", "alias": "index_user_latest" } }, { "remove": { "index": "index_user", "alias": "index_user_latest" } } ] }
事後思考:
1.執行_reindex索引遷移時,會讀取當前index_user舊索引的數量15480602條資料,將這批資料複製到新索引index_user_v2中
但是實際生產會持續寫資料到舊索引index_user中,導致reindex複製的資料,會略小於實際的資料量
處理方式:該索引的資料是StreamSet實時同步MySQL的資料到ElasticSearch中,這裡可以將StreamSet停止,記錄複製開始的時間,待複製完成後進行資料的增量同步;
2.這裡有個點可以優化,_reindex複製後,wait_for_completion=false 會生成任務,可以將任務Id寫入定時任務中,輪訓該任務的狀態,任務結束後,可以及時通知;