1. 程式人生 > >es數據遷移腳本(python)

es數據遷移腳本(python)

數據遷移 同步 read tar def ons 系統 customer replica

#!/usr/bin/python
# -*- coding: UTF-8 -*-
# 文件名:indiceCreate.py
import sys
import base64
import time
import httplib
import json
## 老集群host(ip+port)
oldClusterHost = "192.168.1.85:9200"
## 老集群用戶名,可為空
oldClusterUserName = "elastic"
## 老集群密碼,可為空
oldClusterPassword = "elastic"
## 新集群host(ip+port)
newClusterHost = "
192.168.1.118:9200" ## 新集群用戶名,可為空 newClusterUser = "" ## 新集群密碼,可為空 newClusterPassword = "" DEFAULT_REPLICAS = 0 def httpRequest(method, host, endpoint, params="", username="", password=""): conn = httplib.HTTPConnection(host) headers = {} if (username != "") : Hello {name}, your age is {age} !
.format(name = Tom, age = 20) base64string = base64.encodestring({username}:{password}.format(username = username, password = password)).replace(\n, ‘‘) headers["Authorization"] = "Basic %s" % base64string; if "GET" == method: headers["Content-Type"] = "application/x-www-form-urlencoded
" conn.request(method=method, url=endpoint, headers=headers) else : headers["Content-Type"] = "application/json" conn.request(method=method, url=endpoint, body=params, headers=headers) response = conn.getresponse() res = response.read() return res def httpGet(host, endpoint, username="", password=""): return httpRequest("GET", host, endpoint, "", username, password) def httpPost(host, endpoint, params, username="", password=""): return httpRequest("POST", host, endpoint, params, username, password) def httpPut(host, endpoint, params, username="", password=""): return httpRequest("PUT", host, endpoint, params, username, password) def getIndices(host, username="", password=""): endpoint = "/_cat/indices" indicesResult = httpGet(oldClusterHost, endpoint, oldClusterUserName, oldClusterPassword) indicesList = indicesResult.split("\n") indexList = [] for indices in indicesList: if (indices.find("open") > 0): indexList.append(indices.split()[2]) return indexList def getSettings(index, host, username="", password=""): endpoint = "/" + index + "/_settings" indexSettings = httpGet(host, endpoint, username, password) print index + " 原始settings如下:\n" + indexSettings settingsDict = json.loads(indexSettings) ## 分片數默認和老集群索引保持一致 number_of_shards = settingsDict[index]["settings"]["index"]["number_of_shards"] ## 副本數默認為0 number_of_replicas = DEFAULT_REPLICAS newSetting = "\"settings\": {\"number_of_shards\": %s, \"number_of_replicas\": %s}" % (number_of_shards, number_of_replicas) return newSetting def getMapping(index, host, username="", password=""): endpoint = "/" + index + "/_mapping" indexMapping = httpGet(host, endpoint, username, password) print index + " 原始mapping如下:\n" + indexMapping mappingDict = json.loads(indexMapping) mappings = json.dumps(mappingDict[index]["mappings"]) newMapping = "\"mappings\" : " + mappings return newMapping def createIndexStatement(oldIndexName): settingStr = getSettings(oldIndexName, oldClusterHost, oldClusterUserName, oldClusterPassword) mappingStr = getMapping(oldIndexName, oldClusterHost, oldClusterUserName, oldClusterPassword) createstatement = "{\n" + str(settingStr) + ",\n" + str(mappingStr) + "\n}" return createstatement def createIndex(oldIndexName, newIndexName=""): if (newIndexName == "") : newIndexName = oldIndexName createstatement = createIndexStatement(oldIndexName) print "新索引 " + newIndexName + " 的setting和mapping如下:\n" + createstatement endpoint = "/" + newIndexName createResult = httpPut(newClusterHost, endpoint, createstatement, newClusterUser, newClusterPassword) print "新索引 " + newIndexName + " 創建結果:" + createResult ## main indexList = getIndices(oldClusterHost, oldClusterUserName, oldClusterPassword) systemIndex = [] for index in indexList: if (index.startswith(".")): systemIndex.append(index) else : createIndex(index, index) if (len(systemIndex) > 0) : for index in systemIndex: print index + " 或許是系統索引,不會重新創建,如有需要,請單獨處理~"

以上同步的時候設置的副本數是0,目的是加快同步速度,同步完成後需要設置副本數,如下:
curl -H "Content-Type: application/json" -XPUT ‘http://192.168.1.118:9200/db_customer/_settings‘ -d ‘{
"number_of_replicas" : 1
}‘

es數據遷移腳本(python)