1. 程式人生 > 其它 >ElasticSearch遷移資料量太大遇到的問題

ElasticSearch遷移資料量太大遇到的問題

技術標籤:ElasticSearch分散式查詢

本文記錄最近遷移ES遇到的問題和解決辦法,遷移的方式是從線上es scroll 匯出到json,再想辦法匯入本地機房,可能遷移有其他的辦法,不一定這種就是最好的,我這裡選擇 scroll 是因為各種限制,這樣是最方便的,這裡主要討論的是scroll之後,遇到的問題。

在這裡插入圖片描述

1 scroll 拉取

就是基本的 scroll ,設定了10w行做一個單獨的json檔案,方便我後續的處理

from utils.esd import ES_CLIENT, ES_CLIENT_LOCAL
from elasticsearch import helpers
import
time import json import csv import elasticsearch from utils.logUtils import logger class GetEs: def __init__(self): self.espro = ES_CLIENT self.es_local = ES_CLIENT_LOCAL self.count = 0 def getvalue(self): query = { "query": { "match_all"
: {} } # , "_source": ["doc", "case_name", "case_id", "judge_date", "court_name", "case_attr"] } scanResp = helpers.scan(client=self.espro, query=query, size=5000, scroll="3m", index=
'precedents', doc_type='_doc', timeout="3m") for k in scanResp: yield k def insert_es(self, k): try: self.es_local.create("precedents", k["_id"].encode('utf-8'), k["_source"]) except (elasticsearch.exceptions.ConflictError, elasticsearch.exceptions.RequestError) as e: logger.info("error_id" + str(k["_id"].encode('utf-8'))) pass def write_file(self, k): # 這裡根據這個 count 以10w行作為一個拆分 file_index = int(self.count / 100000) with open('/vdd/mnt/es_out/alles-{}.json'.format(str(file_index)), 'ab') as f: f.write(json.dumps(k, ensure_ascii=False).encode('utf-8')) f.write(b'\n') def run(self): list_all = self.getvalue() for index, k in enumerate(list_all, 1): self.count += 1 logger.info(self.count) # if self.count <= 64998: # continue self.write_file(k) if __name__ == "__main__": S = GetEs() S.run()

匯出結果:

# 共1000個檔案
alles-655.json   alles-680.json  alles-705.json  alles-730.json  alles-756.json  alles-781.json  alles-806.json  alles-831.json  alles-857.json  alles-882.json  alles-907.json  alles-932.json  alles-958.json  alles-983.json
.........
.........

[[email protected] es_out]# ls -lh alles-757.json
-rw-r--r-- 1 root root 2.3G 12月 10 14:39 alles-757.json

2 如何將 3.3T資料匯入到另一臺機器的es

/dev/vdd       ext4      3.3T  146G    98% /vdd/mnt

因為種種願意資料只能在機器A ,es機器在B ,A無法直接連線B,需要將資料先複製到B機器上。上面由於已經在匯出的時候拆分了,所以不用再spilt ,3.6T磁碟基本也滿了,3.3T檔案也無法split 。

如果直接複製這 3.3T資料 到B機器,需要的時間大約是144KB/s 大約是 144天 ~
先採取壓縮

#!/bin/bash
cd /vdd/mnt/es_out
ls *.json > tar.log

aa=$(cat tar.log|wc -l)

for (( i=2;i<="$aa";i=i+1 ))
do
    bb=$( cat tar.log|awk 'NR=='$i' {print $1}')
    echo $i
    echo $bb
    tar -jcvf  /vdd/mnt/es_tar/tares-${i}.json.tar.bz2  /vdd/mnt/es_out/${bb}  --remove-files
done
echo "successful"

這樣預計可以將3.3T檔案最終壓縮到 220G 左右 ,再進行傳輸就很輕鬆了~
再導完後,傳輸之前,再將頻寬臨時調整到10MB,這樣下載速度大約能到1.5MB 這樣

3 傳輸之後如何匯入es

當資料終於被傳輸到 B 機器,可以進行匯入es了,這裡也有一些道道

  • 1 開始,我選擇的是單條create ,這裡用的是python 指令碼,部分程式碼如下
    def GetValue(self):
        with open("/home/tempData/es_out/vdd/mnt/es_out/alles-0.json", "r") as f:
            a = f.readlines()
            for i in a:
                k = json.loads(i)
                print(k["_id"])
                self.insert_es(k)

    def insert_es(self, k):
        try:
            self.es_local.create("xxxx", k["_id"].encode('utf-8'), k["_source"])
        except (elasticsearch.exceptions.ConflictError, elasticsearch.exceptions.RequestError) as e:
            logger.info("error_id" + str(k["_id"].encode('utf-8')))
            pass

這樣的寫入效率大概是 200多條/s

  • 2 用 bulk api 進行批量傳輸
    def make_new_json(self):
        with open("/home/tempData/es_out/vdd/mnt/es_out/alles-111.json", "r") as f:
            a = f.readlines()
            count = 0
            actions = []
            for i in tqdm(a):
                k = json.loads(i)
                # 拼接插入資料結構
                action = {
                    "_index": "xxxx",
                    "_id": k["_id"],
                    "_source": json.dumps(k["_source"]).encode("utf-8").decode('utf-8')
                }
               
                # 形成一個長度與查詢結果數量相等的列表
                actions.append(action)
                count += 1
                if count >= 5000:
                    helpers.bulk(ES_CLIENT_LOCAL, actions)
                        # print(a)
                        actions = []
                        count = 0
                        
                  

    def run(self):
        self.make_new_json()

這樣批量寫入大概是 1000條/s

  • 3 用 bulk api + curl json 進行批量傳輸

將資料從scroll出來的拼成 bulk 的格式

k = json.loads(i)
new_data = {}
new_data['index'] = {}
new_data['index']['_index'] = "precedents"
new_data['index']['_id'] = str(k["_id"])

# print(new_data)
# exit(1)
temp = json.dumps(new_data).encode("utf-8").decode('unicode_escape')
new_jsonfile.write(temp)
new_jsonfile.write('\n')
temp = json.dumps(k["_source"]).encode("utf-8").decode('utf-8')
new_jsonfile.write(temp)
new_jsonfile.write('\n')
curl -H "Content-Type: application/json"  -XPOST "127.0.0.1:9200/precedents/_bulk" --data-binary @bulk-9.json

這樣批量寫入大概是 5w-10w條/s