ElasticSearch遷移資料量太大遇到的問題
阿新 • • 發佈:2020-12-18
技術標籤:ElasticSearch分散式查詢
本文記錄最近遷移ES遇到的問題和解決辦法,遷移的方式是從線上es scroll 匯出到json,再想辦法匯入本地機房,可能遷移有其他的辦法,不一定這種就是最好的,我這裡選擇 scroll 是因為各種限制,這樣是最方便的,這裡主要討論的是scroll之後,遇到的問題。
1 scroll 拉取
就是基本的 scroll ,設定了10w行做一個單獨的json檔案,方便我後續的處理
from utils.esd import ES_CLIENT, ES_CLIENT_LOCAL
from elasticsearch import helpers
import time
import json
import csv
import elasticsearch
from utils.logUtils import logger
class GetEs:
def __init__(self):
self.espro = ES_CLIENT
self.es_local = ES_CLIENT_LOCAL
self.count = 0
def getvalue(self):
query = {
"query": {
"match_all" : {}
}
# , "_source": ["doc", "case_name", "case_id", "judge_date", "court_name", "case_attr"]
}
scanResp = helpers.scan(client=self.espro, query=query, size=5000, scroll="3m", index= 'precedents',
doc_type='_doc',
timeout="3m")
for k in scanResp:
yield k
def insert_es(self, k):
try:
self.es_local.create("precedents", k["_id"].encode('utf-8'), k["_source"])
except (elasticsearch.exceptions.ConflictError, elasticsearch.exceptions.RequestError) as e:
logger.info("error_id" + str(k["_id"].encode('utf-8')))
pass
def write_file(self, k):
# 這裡根據這個 count 以10w行作為一個拆分
file_index = int(self.count / 100000)
with open('/vdd/mnt/es_out/alles-{}.json'.format(str(file_index)), 'ab') as f:
f.write(json.dumps(k, ensure_ascii=False).encode('utf-8'))
f.write(b'\n')
def run(self):
list_all = self.getvalue()
for index, k in enumerate(list_all, 1):
self.count += 1
logger.info(self.count)
# if self.count <= 64998:
# continue
self.write_file(k)
if __name__ == "__main__":
S = GetEs()
S.run()
匯出結果:
# 共1000個檔案
alles-655.json alles-680.json alles-705.json alles-730.json alles-756.json alles-781.json alles-806.json alles-831.json alles-857.json alles-882.json alles-907.json alles-932.json alles-958.json alles-983.json
.........
.........
[[email protected] es_out]# ls -lh alles-757.json
-rw-r--r-- 1 root root 2.3G 12月 10 14:39 alles-757.json
2 如何將 3.3T資料匯入到另一臺機器的es
/dev/vdd ext4 3.3T 146G 98% /vdd/mnt
因為種種願意資料只能在機器A ,es機器在B ,A無法直接連線B,需要將資料先複製到B機器上。上面由於已經在匯出的時候拆分了,所以不用再spilt ,3.6T磁碟基本也滿了,3.3T檔案也無法split 。
如果直接複製這 3.3T資料 到B機器,需要的時間大約是144KB/s 大約是 144天 ~
先採取壓縮
#!/bin/bash
cd /vdd/mnt/es_out
ls *.json > tar.log
aa=$(cat tar.log|wc -l)
for (( i=2;i<="$aa";i=i+1 ))
do
bb=$( cat tar.log|awk 'NR=='$i' {print $1}')
echo $i
echo $bb
tar -jcvf /vdd/mnt/es_tar/tares-${i}.json.tar.bz2 /vdd/mnt/es_out/${bb} --remove-files
done
echo "successful"
這樣預計可以將3.3T檔案最終壓縮到 220G 左右 ,再進行傳輸就很輕鬆了~
再導完後,傳輸之前,再將頻寬臨時調整到10MB,這樣下載速度大約能到1.5MB 這樣
3 傳輸之後如何匯入es
當資料終於被傳輸到 B 機器,可以進行匯入es了,這裡也有一些道道
- 1 開始,我選擇的是單條create ,這裡用的是python 指令碼,部分程式碼如下
def GetValue(self):
with open("/home/tempData/es_out/vdd/mnt/es_out/alles-0.json", "r") as f:
a = f.readlines()
for i in a:
k = json.loads(i)
print(k["_id"])
self.insert_es(k)
def insert_es(self, k):
try:
self.es_local.create("xxxx", k["_id"].encode('utf-8'), k["_source"])
except (elasticsearch.exceptions.ConflictError, elasticsearch.exceptions.RequestError) as e:
logger.info("error_id" + str(k["_id"].encode('utf-8')))
pass
這樣的寫入效率大概是 200多條/s
- 2 用 bulk api 進行批量傳輸
def make_new_json(self):
with open("/home/tempData/es_out/vdd/mnt/es_out/alles-111.json", "r") as f:
a = f.readlines()
count = 0
actions = []
for i in tqdm(a):
k = json.loads(i)
# 拼接插入資料結構
action = {
"_index": "xxxx",
"_id": k["_id"],
"_source": json.dumps(k["_source"]).encode("utf-8").decode('utf-8')
}
# 形成一個長度與查詢結果數量相等的列表
actions.append(action)
count += 1
if count >= 5000:
helpers.bulk(ES_CLIENT_LOCAL, actions)
# print(a)
actions = []
count = 0
def run(self):
self.make_new_json()
這樣批量寫入大概是 1000條/s
- 3 用 bulk api + curl json 進行批量傳輸
將資料從scroll出來的拼成 bulk 的格式
k = json.loads(i)
new_data = {}
new_data['index'] = {}
new_data['index']['_index'] = "precedents"
new_data['index']['_id'] = str(k["_id"])
# print(new_data)
# exit(1)
temp = json.dumps(new_data).encode("utf-8").decode('unicode_escape')
new_jsonfile.write(temp)
new_jsonfile.write('\n')
temp = json.dumps(k["_source"]).encode("utf-8").decode('utf-8')
new_jsonfile.write(temp)
new_jsonfile.write('\n')
curl -H "Content-Type: application/json" -XPOST "127.0.0.1:9200/precedents/_bulk" --data-binary @bulk-9.json
這樣批量寫入大概是 5w-10w條/s