1. 程式人生 > 實用技巧 >python操作es方法

python操作es方法

將如下程式碼中按需做下替換即可

$ip  替換成es的ip;
$port 替換成es的埠;
$user 替換成es的使用者;
$password 替換成es的密碼;

# coding: gbk
# 模型資料匹配檔案
import datetime
import os
from elasticsearch import Elasticsearch, helpers
os.environ['NLS_LANG'] = 'SIMPLIFIED CHINESE_CHINA.UTF8'
def conn_es():
    '''
    es連線方法,帶有使用者名稱密碼連線
    :return:

    
''' es=Elasticsearch(hosts="$ip", port=$port, sniff_on_start=True, sniff_on_connection_fail=True, sniff_timeout=60, http_auth=('$user', '$password'), timeout=120) print('當前es環境 $ip:$port
') return es def get_data_from_es(es, index, doc_type="doc", body={"query": {"match_all": {}}}, scroll='5m', timeout='1m', size=1000): ''' es7分頁方法 :param es: :param index: :param doc_type: :param body: :param scroll: :param timeout: :param size: :return:
''' query_data = es.search( index=index, scroll=scroll, timeout=timeout, size=size, body=body ) result_data = query_data.get("hits").get("hits") if not result_data: return [] scroll_id = query_data["_scroll_id"] total = query_data["hits"]["total"]["value"] for i in range(int(total / 1000)): res = es.scroll(scroll_id=scroll_id, scroll='5m') result_data += res["hits"]["hits"] l = [i["_source"] for i in result_data] return l def copy_index_es(from_index,doc_type): ''' es表複製備份方法 :param from_index: 需要備份表名字 :param to_index:新表名字 :return: ''' es=conn_es() body = { "query": { "match_all": { } } } data = get_data_from_es(es=es, index=from_index, doc_type=doc_type, body=body) data_key_tuple = [tuple(k) for k in data][0] data_value_list_all = [] for i in data: data_value = [] for k, v in i.items(): data_value.append(v) data_value_list_all.append(tuple(data_value)) crdt = datetime.datetime.now() crdt_str = crdt.strftime("%Y%m%d%H%M%S") to_index=from_index+'_bak'+crdt_str actions = [{'_op_type': 'index', '_index': to_index, '_type': doc_type, '_source': dict(zip(data_key_tuple, item))} for item in data_value_list_all] print(u'開始備份表',from_index,u'===>',to_index,u'當前時間',crdt) helpers.bulk(es, actions) crdt_after = datetime.datetime.now() crdt_after_str = crdt_after.strftime("%Y%m%d%H%M%S") print(u'備份表', from_index, u'===>', to_index,u'完成,', u'當前時間', crdt_after_str) def delete_data_index(index_name): ''' es表清空資料的方法 :param index_name: 需要清空資料的index :return: ''' es = conn_es() body = { "query": { "match_all": { } } } data = get_data_from_es(es=es, index=index_name, doc_type="doc", body=body) crdt = datetime.datetime.now() crdt_str = crdt.strftime("%Y%m%d%H%M%S") print(u'開始清空表', index_name, u'當前時間', crdt) es.delete_by_query(index=index_name, body=body) crdt_after = datetime.datetime.now() crdt_after_str = crdt_after.strftime("%Y%m%d%H%M%S") print(u'清空表', index_name, u'完成,', u'當前時間', crdt_after_str) if __name__ == '__main__': print(u'以下是python操作es的各種方法') print(u'1.以當前時間備份index') #copy_index_es('piccvou_piccappdata','doc') print(u'2.清空index資料,不刪除表結構') #delete_data_index('piccvou_piccappdata_bak20200921134456')