ElasticSearch改造研報查詢實踐
阿新 • • 發佈:2018-02-13
auth 條件查詢 num 需要 千萬 背景 curl 不同 獲取
背景:
1,系統簡介:通過人工解讀研報然後獲取並錄入研報分類及摘要等信息,系統通過摘要等信息來獲得該研報的URI
2,現有實現:老系統使用MSSQL存儲摘要等信息,並將不同的關鍵字分解為不同字段來提供搜索查詢
3,存在問題:
-查詢操作繁瑣,死板:例如要查某個機構,標題含有周報的研報,現有系統需要勾選相應字段再輸入條件
-查詢速度緩慢,近千萬級別數據響應時間4-5s
4,改進:使用es優化,添加多個關鍵字模糊查詢(非長文本數據,因此未使用_socre進行評分查詢)
-例如:輸入“國泰君安 周報”就可查詢到所有相關的國泰君安的周報
1,新建Index
curl -X PUT ‘localhost:9200/src_test_1‘ -H ‘Content-Type: application/json‘ -d ‘ { "settings": { "number_of_shards": 1, "number_of_replicas": 0 }, "mappings": { "doc_test": { "properties": { "title": {#研報綜合標題 "type": "text", "analyzer": "ik_max_word","search_analyzer": "ik_max_word" }, "author": {#作者 "type": "text", "analyzer": "ik_max_word", "search_analyzer": "ik_max_word" }, "institution": {#機構 "type": "text", "analyzer": "ik_max_word", "search_analyzer": "ik_max_word" }, "industry": {#行業 "type": "text", "analyzer": "ik_max_word", "search_analyzer": "ik_max_word" }, "grade": {#評級 "type": "text", "analyzer": "ik_max_word", "search_analyzer": "ik_max_word" }, "doc_type": {#研報分類 "type": "text", "analyzer": "ik_max_word", "search_analyzer": "ik_max_word" }, "time": {#發布時間 "type": "date" , "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis" }, "doc_uri": {#地址 "type": "text", "index":false }, "doc_size": {#文件大小 "type": "integer", "index":false }, "market": {#市場 "type": "byte" } } } } }‘
2,數據導入(CSV分批)
import pandas as pd import numpy as np from elasticsearch import Elasticsearch from elasticsearch.helpers import bulk es = Elasticsearch() data_will_insert = [] x = 1 # #使用pandas讀取csv數據;如果出現亂碼加:encoding = "ISO-8859-1" src_data = pd.read_csv(‘ResearchReportEx.csv‘) for index,i in src_data.iterrows(): x+=1 #每次插入100000條 if x%100000 == 99999: #es批量插入 success, _ = bulk(es, data_will_insert, index=‘src_test_1‘, raise_on_error=True) print(‘Performed %d actions‘ % success) data_will_insert = [] #判斷市場 if i[‘ExchangeType‘] == ‘CN‘: market = 0 elif i[‘ExchangeType‘] == ‘HK‘: market = 1 elif i[‘ExchangeType‘] == ‘World‘: market = 2 else: market = 99 data_will_insert.append({"_index":‘src_test_1‘,"_type": ‘doc_test‘,‘_source‘: { ‘title‘:i[‘Title‘], ‘author‘:i[‘AuthorName‘], ‘time‘:i[‘CreateTime‘]+‘:00‘, ‘institution‘:i[‘InstituteNameCN‘], ‘doc_type‘:i[‘KindName‘] if i[‘Kind2Name‘] is np.NaN else i[‘KindName‘]+‘|%s‘ % i[‘Kind2Name‘], ‘industry‘:‘‘ if i[‘IndustryName‘] is np.NaN else i[‘IndustryName‘], ‘grade‘:‘‘ if i[‘GradeName‘] is np.NaN else i[‘GradeName‘], ‘doc_uri‘:i[‘FileURL‘], ‘doc_size‘:i[‘Size‘], ‘market‘:market } }) #將最後剩余在list中的數據插入 if len(data_will_insert)>0: success, _ = bulk(es, data_will_insert, index=‘src_test_1‘, raise_on_error=True) print(‘Performed %d actions‘ % success)
3,查詢
import time from elasticsearch import Elasticsearch from elasticsearch.helpers import scan # es連接 es = Elasticsearch() # 計算運行時間裝飾器 def cal_run_time(func): def wrapper(*args, **kwargs): start_time = time.time() res = func(*args, **kwargs) end_time = time.time() print(str(func) + ‘---run time--- %s‘ % str(end_time - start_time)) return res return wrapper @cal_run_time def query_in_es(): body = { "query": { "bool": { "must": [ { "multi_match": { "query": "國泰 報告", "type": "cross_fields",#跨字段匹配 "fields": ["title", "institution","grade" "doc_type","author","industry"],#在這6個字段中進行查找 "operator": "and" }#此查詢條件等於:query中的關鍵都在fields中所有字段拼接成的字符中 }, { "range": { "time": { "gte": ‘2018-02-01‘#默認查詢限制時間 } } } ], } } } # 根據body條件查詢 scanResp = scan(es, body, scroll="10m", index="src_test_1", doc_type="doc_test", timeout="10m") row_num = 0 for resp in scanResp: print(resp[‘_source‘]) row_num += 1 print(row_num) query_in_es()
※測試結果速度相當快:多關鍵字查詢只需零點幾秒
ElasticSearch改造研報查詢實踐