【13】把 Elasticsearch 當資料庫使:Join
把 Elasticsearch 當資料庫使 系列:
推銷Elasticsearch
用SQL查詢Elasticsearch
-
完全不join,把關聯表的欄位融合到一張表裡。當然這會造成資料的冗餘
-
錄入的時候join:使用 nested documents(nested document和主文件是同segment儲存的,對於一個symbol,幾千萬個quote這樣的場景就不適合了)
-
錄入的時候join:使用 siren
-
查詢時join:使用 parent/child (這個是elasticsearch的特性,要求parent/child同shard存在)
-
查詢時join:使用 siren-joins(就是一個在服務端求值的filter,然後把結果釋出給每個shard去做二次match)
-
查詢時join:在客戶端拼裝第二個查詢(和siren-joins差不多,但是多了一次客戶端到伺服器的來回)
我個人喜歡的是siren-joins和客戶端拼裝這兩種方案。這兩種方案都是先做了一次查詢,把查詢結果再次分發到每個分散式節點上再次去做分散式的聚合。相比在coordinate節點上去做join合併更scalable。
客戶端求值
首先我來看如何在客戶端完成結果集的求值
$ cat << EOF | python2.6 es_query.py http://127.0.0.1:9200
SELECT symbol FROM symbol WHERE sector='Finance' LIMIT 1000;
SAVE RESULT AS finance_symbols;
EOF
這裡引入的 SAVE RESULT AS 就是用於觸發前面的SQL的求值,並把結果集命名為 finance_symbols。如果因為一些中間結果我們不需要,我們也可以用REMOVE 命令把求值結果刪除
$ cat << EOF | python2.6 es_query.py http://127.0.0.1:9200
SELECT symbol FROM symbol WHERE sector='Finance' LIMIT 1000;
SAVE RESULT AS finance_symbols;
REMOVE RESULT finance_symbols;
EOF
甚至我們可以使用任意的python程式碼來修改result_map。
$ cat << EOF | python2.6 es_query.py http://127.0.0.1:9200
SELECT symbol FROM symbol WHERE sector='Finance' LIMIT 1000;
SAVE RESULT AS finance_symbols;
result_map['finance_symbols'] = result_map['finance_symbols'][1:-1];
EOF
客戶端Join
在客戶端求值的基礎上,我們可以利用客戶端保留的結果集來發第二個請求。
cat << EOF | python2.6 es_query.py http://127.0.0.1:9200
SELECT symbol FROM symbol WHERE sector='Finance' LIMIT 5;
SAVE RESULT AS finance_symbols;
SELECT MAX(adj_close) FROM quote
JOIN finance_symbols ON quote.symbol = finance_symbols.symbol;
REMOVE RESULT finance_symbols;
EOF
這個產生的Elaticsearch請求是這樣的兩條:
{
"query": {
"term": {
"sector": "Finance"
}
},
"size": 5
}
然後根據其返回,產生了第二個請求
{
"query": {
"bool": {
"filter": [
{},
{
"terms": {
"symbol": [
"TFSC",
"TFSCR",
"TFSCU",
"TFSCW",
"PIH"
]
}
}
]
}
},
"aggs": {
"MAX(adj_close)": {
"max": {
"field": "adj_close"
}
}
},
"size": 0
}
可以看到,所謂客戶端join,就是用前一次的查詢結果拼出了第二次查詢的條件(terms filter)。
服務端Join
cat << EOF | python2.6 es_query.py http://127.0.0.1:9200
WITH finance_symbols AS (SELECT symbol FROM symbol WHERE sector='Finance' LIMIT 5);
SELECT MAX(adj_close) FROM quote
JOIN finance_symbols ON quote.symbol = finance_symbols.symbol;
EOF
前面第一個查詢是用SAVE RESULT AS求值並命名為finance_symbols,這裡我們並沒有求值而是給其取了一個名字(WITH AS),然後就可以引用了。
{
"query": {
"bool": {
"filter": [
{},
{
"filterjoin": {
"symbol": {
"indices": "symbol*",
"path": "symbol",
"query": {
"term": {
"sector": "Finance"
}
}
}
}
}
]
}
},
"aggs": {
"MAX(adj_close)": {
"max": {
"field": "adj_close"
}
}
},
"size": 0
}
可見產生的filterjoin把兩步合為一步了。注意對於filterjoin查詢,需要POST _coordinate_search 而不是_search這個URL。
Profile
[
{
"query": [
{
"query_type": "BoostQuery",
"lucene": "ConstantScore(BytesFieldDataTermsQuery::[size=8272])^0.0",
"time": "29.32334300ms",
"breakdown": {
"score": 0,
"create_weight": 360426,
"next_doc": 137906,
"match": 0,
"build_scorer": 15027540,
"advance": 0
},
"children": [
{
"query_type": "BytesFieldDataTermsQuery",
"lucene": "BytesFieldDataTermsQuery::[size=8272]",
"time": "13.79747100ms",
"breakdown": {
"score": 0,
"create_weight": 14903,
"next_doc": 168010,
"match": 0,
"build_scorer": 13614558,
"advance": 0
}
}
]
}
],
"rewrite_time": 30804,
"collector": [
{
"name": "MultiCollector",
"reason": "search_multi",
"time": "1.529236000ms",
"children": [
{
"name": "TotalHitCountCollector",
"reason": "search_count",
"time": "0.08967800000ms"
},
{
"name": "MaxAggregator: [MAX(adj_close)]",
"reason": "aggregation",
"time": "0.1675550000ms"
}
]
}
]
}
]
從profile的結果來看,其原理也是 terms filter(BytesFieldDataTermsQuery)。所以這也就決定了這種join只是偽join。真正的join不僅僅可以用第一個表去filter第二個表,而且要能夠在第二個查詢的計算階段引用第一個階段的結果。這個是僅僅用terms filter無法完成的。當然所有這些join的努力僅僅是讓資料維護變得更加容易而已,如果我們真的要求Elasticsearch的join和傳統SQL一樣強大,那麼我們也無法指望那麼複雜的join可以快到哪裡去,也就失去了使用Elasticsearch的意義了。有了上面兩種Join方式,我們可以在極度快速和極度靈活之間獲得一定的選擇權利。