1. 程式人生 > >ElasticSearch改造研報查詢實踐

ElasticSearch改造研報查詢實踐

auth 條件查詢 num 需要 千萬 背景 curl 不同 獲取

背景:

  1,系統簡介:通過人工解讀研報然後獲取並錄入研報分類及摘要等信息,系統通過摘要等信息來獲得該研報的URI

  2,現有實現:老系統使用MSSQL存儲摘要等信息,並將不同的關鍵字分解為不同字段來提供搜索查詢

  3,存在問題:

    -查詢操作繁瑣,死板:例如要查某個機構,標題含有周報的研報,現有系統需要勾選相應字段再輸入條件

    -查詢速度緩慢,近千萬級別數據響應時間4-5s

  4,改進:使用es優化,添加多個關鍵字模糊查詢(非長文本數據,因此未使用_socre進行評分查詢)

    -例如:輸入“國泰君安 周報”就可查詢到所有相關的國泰君安的周報

1,新建Index

curl -X PUT 
localhost:9200/src_test_1 -H Content-Type: application/json -d { "settings": { "number_of_shards": 1, "number_of_replicas": 0 }, "mappings": { "doc_test": { "properties": { "title": {#研報綜合標題 "type": "text", "analyzer": "ik_max_word",
"search_analyzer": "ik_max_word" }, "author": {#作者 "type": "text", "analyzer": "ik_max_word", "search_analyzer": "ik_max_word" }, "institution": {#機構 "type": "text", "analyzer": "ik_max_word", "search_analyzer
": "ik_max_word" }, "industry": {#行業 "type": "text", "analyzer": "ik_max_word", "search_analyzer": "ik_max_word" }, "grade": {#評級 "type": "text", "analyzer": "ik_max_word", "search_analyzer": "ik_max_word" }, "doc_type": {#研報分類 "type": "text", "analyzer": "ik_max_word", "search_analyzer": "ik_max_word" }, "time": {#發布時間 "type": "date" , "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis" }, "doc_uri": {#地址 "type": "text", "index":false }, "doc_size": {#文件大小 "type": "integer", "index":false }, "market": {#市場 "type": "byte" } } } } }

2,數據導入(CSV分批)

import pandas as pd
import numpy as np
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
es = Elasticsearch()

data_will_insert = []
x = 1

# #使用pandas讀取csv數據;如果出現亂碼加:encoding = "ISO-8859-1"
src_data = pd.read_csv(ResearchReportEx.csv)

for index,i in src_data.iterrows():
    x+=1
    #每次插入100000條
    if x%100000 == 99999:
        #es批量插入
        success, _ = bulk(es, data_will_insert, index=src_test_1, raise_on_error=True)
        print(Performed %d actions % success)
        data_will_insert = []

    #判斷市場
    if i[ExchangeType] == CN:
        market = 0
    elif i[ExchangeType] == HK:
        market = 1
    elif i[ExchangeType] == World:
        market = 2
    else:
        market = 99

    data_will_insert.append({"_index":src_test_1,"_type": doc_test,_source:
                {
                title:i[Title],
                author:i[AuthorName],
                time:i[CreateTime]+:00,
                institution:i[InstituteNameCN],
                doc_type:i[KindName] if i[Kind2Name] is np.NaN else i[KindName]+|%s % i[Kind2Name],
                industry:‘‘ if i[IndustryName] is np.NaN else i[IndustryName],
                grade:‘‘ if i[GradeName] is np.NaN else i[GradeName],
                doc_uri:i[FileURL],
                doc_size:i[Size],
                market:market
                }
                })

#將最後剩余在list中的數據插入
if len(data_will_insert)>0:
    success, _ = bulk(es, data_will_insert, index=src_test_1, raise_on_error=True)
    print(Performed %d actions % success)

3,查詢

import time
from elasticsearch import Elasticsearch
from elasticsearch.helpers import scan

# es連接
es = Elasticsearch()


# 計算運行時間裝飾器
def cal_run_time(func):
    def wrapper(*args, **kwargs):
        start_time = time.time()
        res = func(*args, **kwargs)
        end_time = time.time()
        print(str(func) + ---run time--- %s % str(end_time - start_time))
        return res

    return wrapper


@cal_run_time
def query_in_es():
    body = {
        "query": {
            "bool": {
                "must": [
                    {
                        "multi_match": {
                            "query": "國泰 報告",
                            "type": "cross_fields",#跨字段匹配
                            "fields": ["title", "institution","grade"
                                       "doc_type","author","industry"],#在這6個字段中進行查找
                            "operator": "and" 
                        }#此查詢條件等於:query中的關鍵都在fields中所有字段拼接成的字符中
                    },
                    {
                        "range": {
                            "time": {
                                "gte": 2018-02-01#默認查詢限制時間
                            }
                        }
                    }
                ],
            }
        }
    }

    # 根據body條件查詢
    scanResp = scan(es, body, scroll="10m", index="src_test_1", doc_type="doc_test", timeout="10m")
    row_num = 0

    for resp in scanResp:
        print(resp[_source])
        row_num += 1

    print(row_num)


query_in_es()

※測試結果速度相當快:多關鍵字查詢只需零點幾秒

ElasticSearch改造研報查詢實踐