1. 程式人生 > 實用技巧 >elasticsearch-dsl

elasticsearch-dsl

elasticsearch-dsl

一、簡介

elasticsearch-dsl是基於elasticsearch-py封裝實現的,提供了更簡便的操作elasticsearch的方法。

二、具體使用

elasticsearch的官方文件介紹一共包括六個部分,分別是:configuration、search dsl、persistence、update by query、API document。

2.1 Configuration

有許多方式可以配置連線,最簡單且有效的方式是設定預設連線,該預設連線可以被未傳遞其他連線的API呼叫使用。

2.1.1 Default connection

預設連線的實現需要使用到connections.create_connection()方法。

from elasticsearch_dsl import connections

connections.create_connection(hosts=['localhost'], timeout=20)

同時還可以通過alias給連線設定別名,後續可以通過別名來引用該連線,預設別名為default

from elasticsearch_dsl import connections

connections.create_connection(alias='my_new_connection', hosts=['localhost'], timeout=60)

2.1.2 Multiple clusters

可以通過configure定義多個指向不同叢集的連線。

from elasticsearch_dsl import connections

connections.configure(
    default={'hosts': 'localhost'},
    dev={
        'hosts': ['esdev1.example.com:9200'],
        'sniff_on_start': True
    }
)

還可以通過add_connection手動新增連線。

2.1.2.4 Using aliases

下面的例子展示瞭如何使用連線別名。

s = Search(using='
qa')

2.1.3 Manual

如果你不想提供一個全域性的連線,你可以通過使用using引數傳遞一個elasticsearch.Elasticsearch的例項做為連線,如下:

s = Search(using=Elasticsearch('localhost'))

你還可以通過下面的方式來覆蓋已經關聯的連線。

s = s.using(Elasticsearch('otherhost:9200'))

2.2 Search DSL

2.2.1 The search object

search物件代表整個搜尋請求,包括:queries、filters、aggregations、sort、pagination、additional parameters、associated client。

API被設定為可連結的。search物件是不可變的,除了聚合,對物件的所有更改都將導致建立包含該更改的淺表副本。

當初始化Search物件時,你可以傳遞low-level elasticsearch客戶端作為引數。

from elasticsearch import Elasticsearch
from elasticsearch_dsl import Search

client = Elasticsearch()

s = Search(using=client)

注意

所有的方法都返回一個該物件的拷貝,這樣可以保證它被傳遞給外部程式碼時是安全的。

該API是可以連結的,允許你組合多個方法呼叫在一個語句中:

s = Search().using(client).query("match", title="python")

執行execute方法將請求傳送給elasticsearch:

response = s.execute()

如果僅僅是想要遍歷返回結果提示,可以通過遍歷Search物件(前提是執行過execute方法):

for hit in s:
    print(hit.title)

可以通過to_dict()方法將Search物件序列化為一個dict物件,這樣可以方便除錯。

print(s.to_dict())

2.2.1.1 Delete By Query

可以通過呼叫Search物件上的delete方法而不是execute來實現刪除匹配查詢的文件,如下:

s = Search(index='i').query("match", title="python")
response = s.delete()

2.2.1.2 Queries

該庫為所有的Elasticsearch查詢型別都提供了類。以關鍵字引數傳遞所有的引數,最終會把引數序列化後傳遞給Elasticsearch,這意味著在原始查詢和它對應的dsl之間有這一個清理的一對一的對映。

from elasticsearch_dsl.query import MultiMatch, Match

# {"multi_match": {"query": "python django", "fields": ["title", "body"]}}
MultiMatch(query='python django', fields=['title', 'body'])

# {"match": {"title": {"query": "web framework", "type": "phrase"}}}
Match(title={"query": "web framework", "type": "phrase"})

你可以使用快捷方式Q通過命名引數或者原始dict型別資料來構建一個查詢例項:

from elasticsearch_dsl import Q

Q("multi_match", query='python django', fields=['title', 'body'])
Q({"multi_match": {"query": "python django", "fields": ["title", "body"]}})

通過.query()方法將查詢新增到Search物件中:

q = Q("multi_match", query='python django', fields=['title', 'body'])
s = s.query(q)

該方法還可以接收所有Q的引數作為引數。

s = s.query("multi_match", query='python django', fields=['title', 'body'])
2.2.1.2.1 Dotted fields

有時候你想要引用一個在其他欄位中的欄位,例如多欄位(title.keyword)或者在一個json文件中的address.city。為了方便,Q允許你使用雙下劃線‘__’代替關鍵詞引數中的‘.’

s = Search()
s = s.filter('term', category__keyword='Python')
s = s.query('match', address__city='prague')

除此之外,如果你願意,也可以隨時使用python的kwarg解壓縮功能。

s = Search()
s = s.filter('term', **{'category.keyword': 'Python'})
s = s.query('match', **{'address.city': 'prague'})
2.2.1.2.2 Query combination

查詢物件可以通過邏輯運算子組合起來:

Q("match", title='python') | Q("match", title='django')
# {"bool": {"should": [...]}}

Q("match", title='python') & Q("match", title='django')
# {"bool": {"must": [...]}}

~Q("match", title="python")
# {"bool": {"must_not": [...]}}

當呼叫.query()方法多次時,內部會使用&操作符:

s = s.query().query()
print(s.to_dict())
# {"query": {"bool": {...}}}

如果你想要精確控制查詢的格式,可以通過Q直接構造組合查詢:

q = Q('bool',
    must=[Q('match', title='python')],
    should=[Q(...), Q(...)],
    minimum_should_match=1
)
s = Search().query(q)

2.2.1.3 Filters

如果你想要在過濾上下文中新增查詢,可以使用filter()函式來使之變的簡單。

s = Search()
s = s.filter('terms', tags=['search', 'python'])

在背後,這會產生一個bool查詢,並將指定的條件查詢放入其filter分支,等價與下面的操作:

s = Search()
s = s.query('bool', filter=[Q('terms', tags=['search', 'python'])])

如果你想要使用post_filter元素進行多面導航,請使用.post_filter()方法,你還可以使用exculde()方法從查詢中排除專案:

s = Search()
s = s.exclude('terms', tags=['search', 'python'])

2.2.1.4 Aggregations

你可以是使用A快捷方式來定義一個聚合。

from elasticsearch_dsl import A

A('terms', field='tags')
# {"terms": {"field": "tags"}}

為了實現聚合巢狀,你可以使用.bucket()、.metirc()以及.pipeline()方法。

a = A('terms', field='category')
# {'terms': {'field': 'category'}}

a.metric('clicks_per_category', 'sum', field='clicks')\
    .bucket('tags_per_category', 'terms', field='tags')
# {
#   'terms': {'field': 'category'},
#   'aggs': {
#     'clicks_per_category': {'sum': {'field': 'clicks'}},
#     'tags_per_category': {'terms': {'field': 'tags'}}
#   }
# }

為了將聚合新增到Search物件中,使用.aggs屬性,它是作為一個top-level聚合的。

s = Search()
a = A('terms', field='category')
s.aggs.bucket('category_terms', a)
# {
#   'aggs': {
#     'category_terms': {
#       'terms': {
#         'field': 'category'
#       }
#     }
#   }
# }

或者:

s = Search()
s.aggs.bucket('articles_per_day', 'date_histogram', field='publish_date', interval='day')\
    .metric('clicks_per_day', 'sum', field='clicks')\
    .pipeline('moving_click_average', 'moving_avg', buckets_path='clicks_per_day')\
    .bucket('tags_per_day', 'terms', field='tags')

s.to_dict()
# {
#   "aggs": {
#     "articles_per_day": {
#       "date_histogram": { "interval": "day", "field": "publish_date" },
#       "aggs": {
#         "clicks_per_day": { "sum": { "field": "clicks" } },
#         "moving_click_average": { "moving_avg": { "buckets_path": "clicks_per_day" } },
#         "tags_per_day": { "terms": { "field": "tags" } }
#       }
#     }
#   }
# }

你可以通過名字來訪問一個存在的桶。

s = Search()

s.aggs.bucket('per_category', 'terms', field='category')
s.aggs['per_category'].metric('clicks_per_category', 'sum', field='clicks')
s.aggs['per_category'].bucket('tags_per_category', 'terms', field='tags')

2.2.1.5 Sorting

要指定排序順序,可以使用.order()方法。

s = Search().sort(
    'category',
    '-title',
    {"lines" : {"order" : "asc", "mode" : "avg"}}
)

可以通過不傳任何引數呼叫sort()函式來重置排序。

2.2.1.6 Pagination

要指定from、size,使用slicing API:

s = s[10:20]
# {"from": 10, "size": 10}

要訪問匹配的所有文件,可以使用scan()函式,scan()函式使用scan、scroll elasticsearch API:

for hit in s.scan():
    print(hit.title)

需要注意的是這種情況下結果是不會被排序的。

2.2.1.7 Highlighting

要指定高亮的通用屬性,可以使用highlight_options()方法:

s = s.highlight_options(order='score')

可以通過highlight()方法來為了每個單獨的欄位設定高亮:

s = s.highlight('title')
# or, including parameters:
s = s.highlight('title', fragment_size=50)

然後,響應中的分段將在每個結果物件上以.meta.highlight.FIELD形式提供,其中將包含分段列表:

response = s.execute()
for hit in response:
    for fragment in hit.meta.highlight.title:
        print(fragment)

2.2.1.8 Suggestions

要指定一個suggest請求在你的search物件上,可以使用suggest()方法:

# check for correct spelling
s = s.suggest('my_suggestion', 'pyhton', term={'field': 'title'})

2.2.1.9 Extra properties and parameters

要為search物件設定額外的屬性,可以使用.extra()方法。可以用來定義body中的key,那些不能通過指定API方法來設定的,例如explain、search_filter。

s = s.extra(explain=True)

要設定查詢引數,可以使用.params()方法:

s = s.params(routing="42")

如果要限制elasticsearch返回的欄位,可以使用source()方法:

# only return the selected fields
s = s.source(['title', 'body'])
# don't return any fields, just the metadata
s = s.source(False)
# explicitly include/exclude fields
s = s.source(includes=["title"], excludes=["user.*"])
# reset the field selection
s = s.source(None)

2.2.1.10 Serialization and Deserialization

查詢物件可以通過使用.to_dict()方法被序列化為一個字典。

你也可以使用類方法from_dict從一個dict建立一個Search物件。這會建立一個新的Search物件並使用字典中的資料填充它。

s = Search.from_dict({"query": {"match": {"title": "python"}}})

如果你希望修改現有的Search物件,並覆蓋其屬性,則可以使用update_from_dict()方法就地更改例項。

s = Search(index='i')
s.update_from_dict({"query": {"match": {"title": "python"}}, "size": 42})

2.2.2 Response

你可以通過呼叫execute方法來執行你的搜尋,它會返回一個Response物件,Response物件允許你通過屬性的方式訪問返回結果字典中的任何key。

print(response.success())
# True

print(response.took)
# 12

print(response.hits.total.relation)
# eq
print(response.hits.total.value)
# 142

print(response.suggest.my_suggestions)

如果想要檢查response物件的內容,可以通過to_dict方法訪問原始資料。

2.2.2.1 Hits

可以通過hits屬性訪問返回的匹配結果,或者遍歷Response物件。

response = s.execute()
print('Total %d hits found.' % response.hits.total)
for h in response:
    print(h.title, h.body)

2.2.2.2 Result

每個匹配項被封裝到一個類中,可以方便通過類屬性來訪問返回結果字典中的key,所有的元資料儲存在meta屬性中。

response = s.execute()
h = response.hits[0]
print('/%s/%s/%s returned with score %f' % (
    h.meta.index, h.meta.doc_type, h.meta.id, h.meta.score))

2.2.2.3 Aggregations

可以通過aggregations屬性來訪問聚合結果:

for tag in response.aggregations.per_tag.buckets:
    print(tag.key, tag.max_lines.value)

2.2.3 MultiSearch

可以通過MultiSearch類同時執行多個搜尋,它將會使用_msearch API:

from elasticsearch_dsl import MultiSearch, Search

ms = MultiSearch(index='blogs')

ms = ms.add(Search().filter('term', tags='python'))
ms = ms.add(Search().filter('term', tags='elasticsearch'))

responses = ms.execute()

for response in responses:
    print("Results for query %r." % response.search.query)
    for hit in response:
        print(hit.title)

2.3 Persistence

你可以使用dsl庫來定義你的mappings和一個基本的持久化層為你的應用程式。

2.3.1 Document

如果你要為你的文件建立一個model-like的封裝,可以使用Document類。它可以被用作建立在elasticsearch中所有需要的mappings和settings。

from datetime import datetime
from elasticsearch_dsl import Document, Date, Nested, Boolean, \
    analyzer, InnerDoc, Completion, Keyword, Text

html_strip = analyzer('html_strip',
    tokenizer="standard",
    filter=["standard", "lowercase", "stop", "snowball"],
    char_filter=["html_strip"]
)

class Comment(InnerDoc):
    author = Text(fields={'raw': Keyword()})
    content = Text(analyzer='snowball')
    created_at = Date()

    def age(self):
        return datetime.now() - self.created_at

class Post(Document):
    title = Text()
    title_suggest = Completion()
    created_at = Date()
    published = Boolean()
    category = Text(
        analyzer=html_strip,
        fields={'raw': Keyword()}
    )

    comments = Nested(Comment)

     class Index:
        name = 'blog'

    def add_comment(self, author, content):
        self.comments.append(
          Comment(author=author, content=content, created_at=datetime.now()))

    def save(self, ** kwargs):
        self.created_at = datetime.now()
        return super().save(** kwargs)

2.3.1.1 Data types

定義Document例項時,除了可以使用python型別,還可以使用InnerDoc、Range等型別來表示非簡單型別的資料。

from elasticsearch_dsl import Document, DateRange, Keyword, Range

class RoomBooking(Document):
    room = Keyword()
    dates = DateRange()


rb = RoomBooking(
  room='Conference Room II',
  dates=Range(
    gte=datetime(2018, 11, 17, 9, 0, 0),
    lt=datetime(2018, 11, 17, 10, 0, 0)
  )
)

# Range supports the in operator correctly:
datetime(2018, 11, 17, 9, 30, 0) in rb.dates # True

# you can also get the limits and whether they are inclusive or exclusive:
rb.dates.lower # datetime(2018, 11, 17, 9, 0, 0), True
rb.dates.upper # datetime(2018, 11, 17, 10, 0, 0), False

# empty range is unbounded
Range().lower # None, False

2.3.1.2 Note on dates

當例項化一個Date欄位時,可以通過設定default_timezone引數來明確指定時區。

class Post(Document):
    created_at = Date(default_timezone='UTC')

2.3.1.3 Document life cycle

在你第一次使用Post文件型別前,你需要在elasticsearch中建立mappings。可以通過Index物件或者呼叫init()方法直接建立mappings。

# create the mappings in Elasticsearch
Post.init()

所有metadata欄位,可以通過meta屬性訪問。

post = Post(meta={'id': 42})

# prints 42
print(post.meta.id)

# override default index
post.meta.index = 'my-blog'

可以通過get()方法來檢索一個存在的文件:

# retrieve the document
first = Post.get(id=42)
# now we can call methods, change fields, ...
first.add_comment('me', 'This is nice!')
# and save the changes into the cluster again
first.save()

要刪除一個文件,直接呼叫delete()方法即可:

first = Post.get(id=42)
first.delete()

2.3.1.4 Analysis

要為text欄位指定analyzer,你只需要使用analyze的名字,使用已有的analyze或者自己定義。

2.3.1.5 Search

為了在該文件型別上搜索,使用search方法即可。

# by calling .search we get back a standard Search object
s = Post.search()
# the search is already limited to the index and doc_type of our document
s = s.filter('term', published=True).query('match', title='first')


results = s.execute()

# when you execute the search the results are wrapped in your document class (Post)
for post in results:
    print(post.meta.score, post.title)

2.3.1.6 class Meta options

在Meta類中定義了多個你可以為你的文件定義的metadata,例如mapping。

2.3.1.7 class Index options

Index類中定義了該索引的資訊,它的名字、settings和其他屬性。

2.3.1.8 Document Inheritance

2.3.2 Index

在典型情況下,在Document類上使用Index類足夠處理任何操作的。在少量case下,直接操作Index物件可能更有用。

Index是一個類,負責儲存一個索引在elasticsearch中的所有關聯元資料,例如mapping和settings。由於它允許方便的同時建立多個mapping,所以當定義mapping的時候它是最有用的。當在遷移elasticsearch物件的時候是特別有用的。

from elasticsearch_dsl import Index, Document, Text, analyzer

blogs = Index('blogs')

# define custom settings
blogs.settings(
    number_of_shards=1,
    number_of_replicas=0
)

# define aliases
blogs.aliases(
    old_blogs={}
)

# register a document with the index
blogs.document(Post)

# can also be used as class decorator when defining the Document
@blogs.document
class Post(Document):
    title = Text()

# You can attach custom analyzers to the index

html_strip = analyzer('html_strip',
    tokenizer="standard",
    filter=["standard", "lowercase", "stop", "snowball"],
    char_filter=["html_strip"]
)

blogs.analyzer(html_strip)

# delete the index, ignore if it doesn't exist
blogs.delete(ignore=404)

# create the index in elasticsearch
blogs.create()

你可以為你的索引設定模板,並使用clone()方法建立一個指定的拷貝:

blogs = Index('blogs', using='production')
blogs.settings(number_of_shards=2)
blogs.document(Post)

# create a copy of the index with different name
company_blogs = blogs.clone('company-blogs')

# create a different copy on different cluster
dev_blogs = blogs.clone('blogs', using='dev')
# and change its settings
dev_blogs.setting(number_of_shards=1)

2.3.2.1 Index Template

elasticsearch-dsl還提供了使用IndexTemplate類在elasticsearch中來管理索引模板的選項,該類與Index的API非常相似。

一旦一個索引模板被儲存到elasticsearch,他的內容將會自動應用到匹配模式的新索引上(已存在的索引不會受影響),即使索引是當索引一個文件時自動建立的。

from datetime import datetime

from elasticsearch_dsl import Document, Date, Text


class Log(Document):
    content = Text()
    timestamp = Date()

    class Index:
        name = "logs-*"
        settings = {
          "number_of_shards": 2
        }

    def save(self, **kwargs):
        # assign now if no timestamp given
        if not self.timestamp:
            self.timestamp = datetime.now()

        # override the index to go to the proper timeslot
        kwargs['index'] = self.timestamp.strftime('logs-%Y%m%d')
        return super().save(**kwargs)

# once, as part of application setup, during deploy/migrations:
logs = Log._index.as_template('logs', order=0)
logs.save()

# to perform search across all logs:
search = Log.search()

2.4 Faceted Search

該API是實驗性的,並且也沒有用到,所以先跳過。

2.5 Update By Query

2.5.1 The Update By Query object

Update By Query物件允許使用_update_by_query實現在一個匹配過程中更新一個文件。

2.5.1.1 Serialization and Deserialization

該查詢物件可以通過.to_dict()方法序列化為一個字典,也可以通過類方法from_dict()從一個字典構建一個物件。

ubq = UpdateByQuery.from_dict({"query": {"match": {"title": "python"}}})

2.5.1.2 Extra properties and parameters

可以通過.extra()方法設定額外的屬性:

ubq = ubq.extra(explain=True)

可以通過.params()方法設定查詢引數:

ubq = ubq.params(routing="42")

2.5.2 Response

你可以呼叫.execute()方法執行查詢,它會返回一個Response物件。Response物件允許通過屬性訪問結果字典中的任何key。

response = ubq.execute()

print(response.success())
# True

print(response.took)
# 12

如果需要檢視response物件的內容,使用to_dic()方法獲取它的原始資料即可。

2.6 API Documentation

API Documention詳細介紹了elasticsearch-dsl庫中的公共類和方法的用法,具體使用的時候直接翻閱參考即可。

三、總結

1、elasticsearch-dsl相比於elasticsearch來說,提供了更簡便的方法來操作elasticsearch,減少了生成DSL查詢語言的複雜性,推薦使用。

2、elasticsearch-dsl的方法其實還是和elasticsearch的restful API對應的,所以它的API文件有些地方寫的並不清晰,例如例項構造可以傳遞哪些引數?它的說明時可以接收任何關鍵字引數並會直接把引數傳遞給elasticsearch,所以要確定哪些引數生效,還是需要我們去檢視elasticsearch的restful API文件。