ElasticSearch增刪改查之python sort、scroll、scan
阿新 • • 發佈:2019-02-15
1、用python操作elasticsearch有兩個庫可以呼叫
# ElasticSearch不支援scroll(分頁查詢)查詢 from pyelasticsearch import ElasticSearch # Elasticsearch支援scroll查詢,一般建議使用這個庫 from elasticsearch import helpers,Elasticsearch """ 注意:以上兩個庫各自在查詢或更新傳遞的引數是不同的 """ # ElasticSearch查詢使用方式 ES = ElasticSearch(URL) res = ES.search( query, index=index, size=size ) # Elasticsearch查詢使用方式 ES = ElasticSearch(URL) res = ES.search( body=query, index=index, size=size )
- ES中的高效能的部分大部分在helpers中實現
- 如果要批量查詢大量的資料,建議使用helpers.scan,search查詢最大隻能返回10000條資料
- 是有效能限制的
2、Elasticsearch中search scroll使用
- scroll的優勢:支援分頁查詢,自動排序,並把查詢結果返回
- scroll使用方式:每次查詢獲取下一次查詢需要使用的scroll_id,查詢時傳遞引數scroll='2m',後臺ES即可以將查詢的結果儲存2分鐘
- 查詢時常用技巧
1、將必須包含欄位新增到 must中
2、將必須不包含欄位新增到 must_not中
3、單一條件匹配選用 term,多個單一條件任何一個匹配選用 terms
4、from 指定從結果資料中的第多少條開始返回,from的最大值超不過2000,所以在使用大資料查詢基本使用不上
5、size 指定結果資料中共返回多少條資料
# 在使用時一定要注意Elasticsearch與ElasticSearch還是有一定的區別的,傳遞引數不一樣 from elasticsearch import Elasticsearch ES_SEARCH_HOSTURL = 'http://domain:9000/' ES = Elasticsearch(ES_SEARCH_HOSTURL) query = { "query": { "bool": { "must": [], "must_not": [] } } } # index可以為索引的列表或者單個索引,如果是索引的列表,則使用search時不能傳遞doc_type,也就是如果同時查詢多個索引,不能指定文件的型別 def scroll_search(index, query, size, page): """ 使用scroll查詢ES,實現分頁查詢 :param index: type of list or str :param query: type of dict,查詢條件 :param size: type of int(1-100),指定返回資料中每頁的資料條數 :param page: type of int(>0),指定返回第幾頁資料 :return: 查詢結果總數和某頁的資料 """ try: res = ES.search( index=index, scroll='5m', # 查詢一次資料在ES中快取5分鐘再銷燬 size=size, body=query, sort="modified:desc", # sort增加排序功能,多個欄位排序可以以逗號隔開 # sort="modified:desc,_score:desc", # 指定某個欄位按照升序或者降序排列,modifie為資料欄位 # sort="_doc", # ES會計算一個最優的排序方案 # search_type='scan', # 如果不關注排序的話,可以增加該欄位,查詢速度十分高效,效能比較好 ) except Exception as e: raise e else: sid = res['_scroll_id'] # 獲得查詢下一條資料的scroll_id total = res['hits']['total'] # 獲取查詢結果中總資料的條數 hits = res["hits"]["hits"] # 首次查詢返回第一頁的結果資料 results = [hit["_source"] for hit in hits] first_page = 1 while page > first_page: try: res = ES.scroll(scroll_id=sid, scroll='2m') except Exception as e: raise e else: sid = res['_scroll_id'] hits = res["hits"]["hits"] results = [hit["_source"] for hit in hits] first_page += 1 return total, results # terms使用,其中categories為list型別,含義為categories中任何一個滿足條件即可 temp = {"terms": {"categories": categories}} query["query"]["bool"]["must"].append(temp)
3、Elasticsearch中update區域性更新
""" 功能:從多個索引中查詢需要更新的對應資料的id,再更新此資料 """
from elasticsearch import Elasticsearch
ES_SEARCH_HOSTURL = 'http://domain:9000/'
ES = Elasticsearch(ES_SEARCH_HOSTURL)
indexs = [index1, index2]
query = {
"query": {
"bool": {
"must": []
}
}
}
for index in indexs:
try:
res = ES.search(body=query, index=index, doc_type='info')
except Exception as e:
print(e)
# logger.error("Request search_indicator function error. Error: %s" % e)
message = "Internal server error"
results = data_formatter(message=message)
return Response(results, status=500)
else:
if res["hits"]["total"] > 0:
hits = res["hits"]["hits"][0]
update_id = hits["_id"]
try:
# 注意,如果ES為pyelasticsearch的物件,則需要更新的引數傳遞形式應該為doc= {"revoked": revoked}
ES.update(index=index, doc_type='indicator_info', id=update_id, body={"doc": {"revoked": revoked}})
except Exception as e:
print(e)
message = "Update {} failed".format(id)
results = data_formatter(message=message)
return Response(results, status=500)
else:
results = data_formatter()
return Response(results, status=200)