1. 程式人生 > 實用技巧 >python 查詢es資料

python 查詢es資料

#!/usr/bin/python env
# -*- coding: utf-8 -*-
# Author:cc
# date: 2020/1/9

import datetime
import time
from elasticsearch import Elasticsearch
import logging
import sys
from collections import Counter
import pymysql

logger = logging.getLogger("elasticsearch")
logging.basicConfig(filename="/Users/panbiao/logging.txt
", level=logging.INFO, format='{"time":"%(asctime)s","script":"%(name)s","thread":"%(thread)d",' '"threadName":"%(threadName)s","loglevel":"%(levelname)s"} - %(message)s') class Search_es: def __init__(self): """ 初始化例項變數
""" self.search = search self.gte = gte self.lte = lte self.index = index self.host = host self.user = user self.passwd = passwd self.port = port self.es_data = {} self.es_data_servuceId = [] self.es_data_servuceId_sort
= [] def Get_es(self): """ 提交查詢引數,返回結果 :return: es_data """ es = Elasticsearch([self.host], http_auth=(self.user, self.passwd), port=self.port ) para = { "_source": "serviceId" # "_source":"field1, field2,field3..." } query = { "size": 10000, "_source": { "excludes": [] }, "docvalue_fields": [ "@timestamp" ], "query": { "bool": { "must": [ { "query_string": { "query": self.search, "analyze_wildcard": "true", "default_field": "*" } }, { "range": { "@timestamp": { "gte": self.gte, "lte": self.lte, "format": "epoch_millis" } } } ], "filter": [], "should": [], "must_not": [] } } } try: logger.info("開始查詢") self.es_data = es.search(index=self.index, body=query, params=para); logger.info("開始統計") if self.es_data["timed_out"] == False: # logger.info(self.es_data) # print(self.es_data) # logger.info(self.es_data["hits"]["hits"]) logger.info(self.es_data["hits"]["total"]) for serviceId in self.es_data["hits"]["hits"]: self.es_data_servuceId.append(serviceId["_source"]["serviceId"]) logger.info(len(self.es_data_servuceId)) self.es_data_servuceId_sort = sorted(Counter(self.es_data_servuceId).items(), key=lambda x: x[1], reverse=True) logger.info("計算時間") for i in self.es_data_servuceId_sort: print('{0} {1}'.format(i[0], i[1])); #logger.info(sorted(self.es_data_servuceId)) logger.info("列印時間") # logger.info(self.es_data_servuceId) return self.es_data["hits"]["total"] else: logger.error("{0}{1}".format("timed_out:",self.es_data["timed_out"])) sys.exit(1) except Exception as e: logger.error(f"error:{e}", exc_info=True, stack_info=True) if __name__ == '__main__': search = "hostname:\"cc-k8s01\" AND timeDiff_int:[0 TO 1000] AND type:\"*-access\"" # gte = int(time.mktime(time.strptime(str(datetime.date.today() - datetime.timedelta(minutes=30)), '%Y-%m-%d'))) * 1000 gte = int(time.mktime((datetime.datetime.now() - datetime.timedelta(minutes=30)).timetuple())) * 1000 # now_time = datetime.datetime.now() # # 1小時前 # t2 = (now_time - datetime.timedelta(hours=1)).strftime("%Y-%m-%d %H:%M:%S") # # 轉為秒級時間戳 # ts2 = time.mktime(time.strptime(t2, '%Y-%m-%d %H:%M:%S')) # # 轉為毫秒級 # gte = int(str(ts2 * 1000).split(".")[0]) lte = int(round(time.time() * 1000)) index = "logstash-xxx-*" host = "http://xxx" user = "xxx" passwd = "xxx" port = "9200" Search_es_1 = Search_es() print(Search_es_1.Get_es())