1. 程式人生 > 程式設計 >百億級實時查詢優化實戰,讓你的Elasticsearch飛起來!

百億級實時查詢優化實戰,讓你的Elasticsearch飛起來!

最近的一個專案是風控過程資料實時統計分析和聚合的一個 OLAP 分析監控平臺,日流量峰值在 10 到 12 億上下,每年資料約 4000 億條,佔用空間大概 200T。

面對這樣一個資料量級的需求,我們的資料如何儲存和實現實時查詢將是一個嚴峻的挑戰。

經過對 Elasticsearch 多方調研和超過幾百億條資料的插入和聚合查詢的驗證之後,我們總結出以下幾種能夠有效提升效能和解決這一問題的方案:

  1. 叢集規劃
  2. 儲存策略
  3. 索引拆分
  4. 壓縮
  5. 冷熱分割槽等

本文所使用的 Elasticsearch 版本為 5.3.3。

什麼是時序索引?其主要特點體現在如下兩個方面:

**存,**以時間為軸,資料只有增加,沒有變更,並且必須包含 timestamp(日期時間,名稱隨意)欄位。

其作用和意義要大於資料的 id 欄位,常見的資料比如我們通常要記錄的操作日誌、使用者行為日誌、或股市行情資料、伺服器 CPU、記憶體、網路的使用率等。

**取,**一定是以時間範圍為第一過濾條件,然後是其他查詢條件,比如近一天、一週、本月等等,然後在這個範圍內進行二次過濾。

比如性別或地域等,查詢結果中比較關注的是每條資料和 timestamp 欄位具體發生的時間點,而非 id。

此類資料一般用於 OLAP、監控分析等場景。

一、叢集部署規劃

我們都知道在 Elasticsearch(下稱 ES)叢集中有兩個主要角色:Master Node 和 Data Node,其他如 Tribe Node 等節點可根據業務需要另行設立。

為了讓叢集有更好的效能表現,我們應該對這兩個角色有一個更好的規劃,在 Nodes 之間做讀取分離,保證叢集的穩定性和快速響應,在大規模的資料儲存和查詢的壓力之下能夠坦然面對,各自愉快的協作。

1、Master Nodes

Master Node,整個叢集的管理者,負有對 index 的管理、shards 的分配,以及整個叢集拓撲資訊的管理等功能。

眾所周知,Master Node 可以通過 Data Node 兼任,但是,如果對群集規模和穩定要求很高的話,就要職責分離,Master Node 推薦獨立,它的狀態關乎整個叢集的存活。

Master 的配置:

        node.master: true
node.data: false node.ingest: false 複製程式碼

這樣 Master 不參與 I、O,從資料的搜尋和索引操作中解脫出來,專門負責叢集的管理工作,因此 Master Node 的節點配置可以相對低一些。

另外防止 ES 叢集 split brain(腦裂),合理配置 discovery.zen.minimum_master_nodes 引數,官方推薦 master-eligible nodes / 2 + 1 向下取整的個數。

這個引數決定選舉 Master 的 Node 個數,太小容易發生“腦裂”,可能會出現多個 Master,太大 Master 將無法選舉。

2、Data Nodes

Data Node 是資料的承載者,對索引的資料儲存、查詢、聚合等操作提供支援。

這些操作嚴重消耗系統的 CPU、記憶體、IO 等資源,因此,應該把最好的資源分配給 Data Node,因為它們是真正幹累活的角色,同樣 Data Node 也不兼任 Master 的功能。

Data 的配置:

        node.master: false

        node.data: true

        node.ingest: false
複製程式碼

3、Coordinating Only Nodes

ES 本身是一個分散式的計算叢集,每個 Node 都可以響應使用者的請求,包括 Master Node、Data Node,它們都有完整的 Cluster State 資訊。

正如我們知道的一樣,在某個 Node 收到使用者請求的時候,會將請求轉發到叢集中所有索引相關的 Node 上,之後將每個 Node 的計算結果合併後返回給請求方。

我們暫且將這個 Node 稱為查詢節點,整個過程跟分散式資料庫原理類似。那問題來了,這個查詢節點如果在併發和資料量比較大的情況下,由於資料的聚合可能會讓記憶體和網路出現瓶頸。

因此,在職責分離指導思想的前提下,這些操作我們也應該從這些角色中剝離出來,官方稱它是 Coordinating Nodes,只負責路由使用者的請求,包括讀、寫等操作,對記憶體、網路和 CPU 要求比較高。

本質上,Coordinating Only Nodes 可以籠統的理解為是一個負載均衡器,或者反向代理,只負責讀,本身不寫資料。

它的配置是:

        node.master: false

        node.data: false

        node.ingest: false

        search.remote.connect: false
複製程式碼

增加 Coordinating Nodes 的數量可以提高 API 請求響應的效能,我們也可以針對不同量級的 Index 分配獨立的 Coordinating Nodes 來滿足請求效能。

那是不是越多越好呢?在一定範圍內是肯定的,但凡事有個度,過了負作用就會突顯,太多的話會給叢集增加負擔。

在做 Master 選舉的時候會先確保所有 Node 的 Cluster State 是一致的,同步的時候會等待每個 Node 的 Acknowledgement 確認,所以適量分配可以讓叢集暢快的工作。

search.remote.connect 是禁用跨叢集查詢,防止在進行叢集之間查詢時發生二次路由:

二、Routing

類似於分散式資料庫中的分片原則,將符合規則的資料儲存到同一分片。ES 通過雜湊演演算法來決定資料儲存於哪個 Shard:

shard_num = hash(_routing) % num_primary_shards
複製程式碼

其中 hash(_routing) 得出一個數字,然後除以主 Shards 的數量得到一個餘數,餘數的範圍是 0 到 number_of_primary_shards - 1,這個數字就是檔案所在的 Shard。

Routing 預設是 id 值,當然可以自定義,合理指定 Routing 能夠大幅提升查詢效率,Routing 支援 GET、Delete、Update、Post、Put 等操作。

如:

PUT my_index/my_type/1?routing=user1

{

"title": "This is a document"

}

GET my_index/my_type/1?routing=user1
複製程式碼

不指定 Routing 的查詢過程:

簡單的來說,一個查詢請求過來以後會查詢每個 Shard,然後做結果聚合,總的時間大概就是所有 Shard 查詢所消耗的時間之和。

指定 Routing 以後:

會根據 Routing 查詢特定的一個或多個 Shard,這樣就大大減少了查詢時間,提高了查詢效率。

當然,如何設定 Routing 是一個難點,需要一點技巧,要根據業務特點合理組合 Routing 的值,來劃分 Shard 的儲存,最終保持資料量相對均衡。

可以組合幾個維度做為 Routing ,有點類似於 Hbase Key,例如不同的業務線加不同的類別,不同的城市和不同的型別等等,如:

        _search?routing=beijing:按城市。

        _search?routing=beijing_user123:按城市和使用者。

        _search?routing=beijing_android,shanghai_android:按城市和手機型別等。
複製程式碼

資料不均衡?假如你的業務在北京、上海的資料遠遠大於其他二三線城市的資料。

再例如我們的業務場景,A 業務線的資料量級遠遠大於 B 業務線,有時候很難通過 Routing 指定一個值保證資料在所有 Shards 上均勻分佈,會讓部分 Shard 變的越來越大,影響查詢效能,怎麼辦?

一種解決辦法是單獨為這些資料量大的渠道建立獨立的 Index

這樣可以根據需要在不同 Index 之間查詢,然而每個 Index 中 Shards 的資料可以做到相對均衡。

另一種辦法是指定 Index 引數 index.routing_partition_size,來解決最終可能產生群集不均衡的問題,指定這個引數後新的演演算法如下:

   shard_num = (hash(_routing) + hash(_id) % routing_partition_size) % num_primary_shards
複製程式碼

index.routing_partition_size 應具有大於 1 且小於 index.number_of_shards 的值。

最終資料會在 routing_partition_size 幾個 Shard 上均勻儲存,是哪個 Shard 取決於 hash(_id) % routing_partition_size 的計算結果。

指定引數 index.routing_partition_size 後,索引中的 Mappings 必須指定 _routing 為 "required": true,另外 Mappings 不支援 parent-child 父子關係。

很多情況下,指定 Routing 後會大幅提升查詢效能,畢竟查詢的 Shard 只有那麼幾個,但是如何設定 Routing 是個難題,可根據業務特性巧妙組合。

三、索引拆分

Index 通過橫向擴充套件 Shards 實現分散式儲存,這樣可以解決 Index 大資料儲存的問題。

但在一個 Index 變的越來越大,單個 Shard 也越來越大,查詢和儲存的速度也越來越慢。

更重要的是一個 Index 其實是有儲存上限的(除非你設定足夠多的 Shards 和機器),如官方宣告單個 Shard 的檔案數不能超過 20 億(受限於 Lucene index,每個 Shard 是一個 Lucene index)。

考慮到 I、O,針對 Index 每個 Node 的 Shards 數最好不超過 3 個,那面對這樣一個龐大的 Index,我們是採用更多的 Shards,還是更多的 Index,我們如何選擇?

Index 的 Shards 總量也不宜太多,更多的 Shards 會帶來更多的 I、O 開銷,其實答案就已經很明確,除非你能接受長時間的查詢等待。

Index 拆分的思路很簡單,時序索引有一個好處就是隻有增加,沒有變更,按時間累積,天然對索引的拆分友好支援,可以按照時間和資料量做任意時間段的拆分。

ES 提供的 Rollover Api + Index Template 可以非常便捷和友好的實現 Index 的拆分工作,把單個 index docs 數量控制在百億內,也就是一個 Index 預設 5 個 Shards 左右即可,保證查詢的即時響應。

簡單介紹一下 Rollover API 和 Index Template 這兩個東西,如何實現 index 的拆分。

1、Index Template

我們知道 ES 可以為同一目的或同一類索引建立一個 Index Template,之後建立的索引只要符合匹配規則就會套用這個 Template,不必每次指定 Settings 和 Mappings 等屬性。

一個 Index 可以被多個 Template 匹配,那 Settings 和 Mappings 就是多個 Template 合併後的結果。

有衝突通過 Template 的屬性"order" : 0 從低到高覆蓋(這部分據說會在 ES6 中會做調整,更好的解決 Template 匹配衝突問題)。

示例:

        PUT _template/template_1{

        "index_patterns" : ["log-*"],        "order" : 0,        "settings" : {

            "number_of_shards" : 5

        },    "aliases" : {

         "alias1" : {}

        }

    }
複製程式碼

2、Rollover Index

Rollover Index 可以將現有的索引通過一定的規則,如資料量和時間,索引的命名必須是 logs-000001 這種格式,並指定 aliases,示例:

    PUT /logs-000001

    {

         "aliases": {

         "logs_write": {}

        }

    }

    # Add > 1000 documents to logs-000001

     POST /logs_write/_rollover

     {

      "conditions": {

            "max_age": "7d",             "max_docs": 1000

        }

    }
複製程式碼

先建立索引並指定別名 logs_write,插入 1000 條資料,然後請求 _rollover api 並指定拆分規則。

如果索引中的資料大於規則中指定的資料量或者時間過時,新的索引將被建立,索引名稱為 logs-000002,並根據規則套用 Index Template, 同時別名 logs_write 也將被變更到 logs-000002。

注意事項:

索引命名規則必須如同:logs-000001。

索引必須指定 aliases。

Rollover Index API 呼叫時才去檢查索引是否超出指定規則,不會自動觸發,需要手動呼叫,可以通過 Curator 實現自動化。

如果符合條件會建立新的索引,老索引的資料不會發生變化,如果你已經插入 2000 條,拆分後還是 2000 條。

插入資料時一定要用別名,否則你可能一直在往一個索引裡追加資料。

技巧是按日期滾動索引:

PUT /<logs-{now/d}-1>

{

"aliases": {

"logs_write": {}

}

}
複製程式碼

假如生成的索引名為 logs-2017.04.13-1,如果你在當天執行 Rollover 會生成 logs-2017.04.13-000001,次日的話是 logs-2017.04.14-000001。

這樣就會按日期進行切割索引,那如果你想查詢 3 天內的資料可以通過日期規則來匹配索引名,如:

GET /<logs-{now/d}-*>,<logs-{now/d-1d}-*>,<logs-{now/d-2d}-*>/_search
複製程式碼

到此,我們就可以通過 Index Template 和 Rollover API 實現對 Index 的任意拆分,並按照需要進行任意時間段的合併查詢,這樣只要你的時間跨度不是很大,查詢速度一般可以控制在毫秒級,儲存效能也不會遇到瓶頸。

四、Hot-Warm 架構

冷熱架構,為了保證大規模時序索引實時資料分析的時效性,可以根據資源配置不同將 Data Nodes 進行分類形成分層或分組架構。

一部分支援新資料的讀寫,另一部分僅支援歷史資料的儲存,存放一些查詢發生機率較低的資料。

即 Hot-Warm 架構,對 CPU,磁碟、記憶體等硬體資源合理的規劃和利用,達到效能和效率的最大化。

我們可以通過 ES 的 Shard Allocation Filtering 來實現 Hot-Warm 的架構。

實現思路如下:

將 Data Node 根據不同的資源配比打上標籤,如:Host、Warm。

定義 2 個時序索引的 Index Template,包括 Hot Template 和 Warm Template,Hot Template 可以多分配一些 Shard 和擁有更好資源的 Hot Node。

用 Hot Template 建立一個 Active Index 名為 active-logs-1,別名 active-logs,支援索引切割。

插入一定資料後,通過 roller over api 將 active-logs 切割,並將切割前的 Index 移動到 Warm Nodes 上,如 active-logs-1,並阻止寫入。

通過 Shrinking API 收縮索引 active-logs-1 為 inactive-logs-1,原 Shard 為 5,適當收縮到 2 或 3,可以在 Warm Template 中指定,減少檢索的 Shard,使查詢更快。

通過 force-merging api 合併 inactive-logs-1 索引每個 Shard 的 Segment,節省儲存空間。

刪除 active-logs-1。

1、Hot,Warm Nodes

Hot Nodes

擁有最好資源的 Data Nodes,如更高效能的 CPU、SSD 磁碟、記憶體等資源,這些特殊的 Nodes 支援索引所有的讀、寫操作。

如果你計劃以 100 億為單位來切割 Index,那至少需要三個這樣的 Data Nodes,Index 的 Shard 數為 5,每個 Shard 支援 20 億 Documents 資料的儲存。

為這類 Data Nodes 打上標籤,以便我們在 Template 中識別和指定,啟動引數如下:

./bin/elasticsearch -Enode.attr.box_type=hot

或者配置檔案:

node.attr.box_type: hot

Warm Nodes

儲存只讀資料,並且查詢量較少,但用於儲存長多時間歷史資料的 Data Nodes,這類 Nodes 相對 Hot Nodes 較差的硬體配置,根據需求配置稍差的 CPU、機械磁碟和其他硬體資源,至於數量根據需要保留多長時間的資料來配比,同樣需要打上標籤,方法跟 Hot Nodes 一樣,指定為 Warm,box_type: warm。

2、Hot,Warm Template

Hot Template

我們可以通過指定引數"routing.allocation.include.box_type": "hot",讓所有符合命名規則索引的 Shard 都將被分配到 Hot Nodes 上:

PUT _template/active-logs

{

"template": "active-logs-*","settings": {

"number_of_shards": 5,"number_of_replicas": 1,"routing.allocation.include.box_type": "hot","routing.allocation.total_shards_per_node": 2

},"aliases": {

"active-logs": {}

}

}
複製程式碼
Warm Template

同樣符合命名規則索引的 Shard 會被分配到 Warm Nodes 上,我們指定了更少的 Shards 數量和複本數。

注意,這裡的複本數為 0,和 best_compression 級別的壓縮,方便做遷移等操作,以及進行一些資料的壓縮:

PUT _template/inactive-logs

{

"template": "inactive-logs-*","settings": {

"number_of_shards": 1,"number_of_replicas": 0,"routing.allocation.include.box_type": "warm","codec": "best_compression"

}

}
複製程式碼

假如我們已經建立了索引 active-logs-1 ,當然你可以通過 _bulk API 快速寫入測試的資料,然後參考上文中介紹的 Rollover API 進行切割。

3、Shrink Index Rollover API 切割以後,active-logs-1 將變成一個冷索引,我們將它移動到 Warm Nodes 上。

先將索引置為只讀狀態,拒絕任何寫入操作,然後修改 index.routing.allocation.include.box_type 屬性,ES 會自動移動所有 Shards 到目標 Data Nodes 上:

PUT active-logs-1/_settings

{

"index.blocks.write": true,"index.routing.allocation.include.box_type": "warm"

}
複製程式碼

Cluster Health API 可以檢視遷移狀態,完成後進行收縮操作,其實相當於複製出來一個新的索引,舊的索引還存在。

POST active-logs-1/_shrink/inactive-logs-1

我們可以通過 Head 外掛檢視整個叢集索引的變化情況。

Forcemerge

到目前為止我們已經實現了索引的冷熱分離,和索引的收縮,我們知道每個 Shard 下面由多個 Segment 組成,那 inactive-logs-1 的 Shard 數是 1,但 Segment 還是多個。

這類索引不會在接受寫入操作,為了節約空間和改善查詢效能,通過 Forcemerge API 將 Segment 適量合併:

PUT inactive-logs-1/_settings

{ "number_of_replicas": 1 }
複製程式碼

ES 的 Forcemerge 過程是先建立新的 Segment 刪除舊的,所以舊 Segment 的壓縮方式 best_compression 不會在新的 Segment 中生效,新的 Segment 還是預設的壓縮方式。

現在 inactive-logs-1 的複本還是 0,如果有需要的話,可以分配新的複本數:

PUT inactive-logs-1/_settings

{ "number_of_replicas": 1 }
複製程式碼

最後刪除 active-logs-1,因為我們已經為它做了一個查詢複本 inactive-logs-1。

DELETE active-logs-1

走到這裡,我們就已經實現了 Index 的 Hot-Warm 架構,根據業務特點將一段時間的資料放在 Hot Nodes,更多的歷史資料儲存於 Warm Nodes。

五、其他優化方案

這一部分我們會展示更多的優化方案。

1、索引隔離

在多條業務線的索引共用一個 ES 叢集時會發生流量被獨吃獨佔的情況,因為大家都共用相同的叢集資源,流量大的索引會佔用大部分計算資源而流量小的也會被拖慢,得不到即時響應,或者說業務流量大的索引可以按天拆分,幾個流量小的索引可以按周或月拆分。

這種情況下我們可以將規模大的索引和其他相對小規模的索引獨立儲存,分開查詢或合併查詢。

除了 Master Nodes 以外,Data Nodes 和 Coordinating Nodes 都可以獨立使用(其實如果每個索引的量都特別大也應該採用這種架構),還有一個好處是對方的某個 Node 掛掉,自己不受影響。

同樣利用 ES 支援 Shard Allocation Filtering 功能來實現索引的資源獨立分配,先將 Nodes 進行打標籤,劃分割槽域,類似於 Hot-Warm 架構:

node.attr.zone=zone_a、node.attr.zone=zone_b
複製程式碼

或者:

node.attr.zone =zone_hot_a、node.attr.zone=zone_hot_b
複製程式碼

等打標籤的方式來區別對應不同的索引,然後在各自的 Index Template 中指定不同的 node.attr.zone 即可。

如"index.routing.allocation.include.zone" : "zone_a,zone_hot_a",或者排除法"index.routing.allocation.exclude.size": "zone_hot_b"分配到 zone_hot_b 以外的所有 Data Nodes 上。

更多用法可以參考 Hot-Warm 架構,或 shard-allocation-filtering:

2、跨資料中心

如果你的業務遍佈全國各地,四海八荒,如果你資料要儲存到多個機房,如果你的 Index 有幾萬個甚至更多( Index 特別多,叢集龐大會導致 Cluster State 資訊量特別大,因為 Cluster State 包含了所有 Shard、Index、Node 等所有相關資訊,它儲存在每個 Node 上,這些資料發生變化都會實時同步到所有 Node 上,當這個資料很大的時候會對叢集的效能造成影響)。

這些情況下我們會考慮部署多個 ES Cluster,那我們將如何解決跨叢集查詢的問題呢?

目前 ES 針對跨叢集操作提供了兩種方案 Tribe Node 和 Cross Cluster Search。

Tribe Node

需要一個獨立的 Node 節點,加入其他 ES Cluster,用法有點類似於 Coordinating Only Node。

所不同的是 Tribe 是針對多個 ES 叢集之間的所有節點,Tribe Node 收到請求廣播到相關叢集中所有節點,將結果合併處理後返回。

表面上看起來 Tribe Node 將多個叢集串連成了一個整體,遇到同名 Index 發生衝突,會選擇其中一個 Index,也可以指定:

tribe:

on_conflict: prefer_t1

t1:

cluster.name: us-cluster

discovery.zen.ping.multicast.enabled: false

discovery.zen.ping.unicast.hosts: ['usm1','usm2','usm3']

t2:

cluster.name: eu-cluster

discovery.zen.ping.multicast.enabled: false

discovery.zen.ping.unicast.hosts: ['eum1','eum2','eum3']
複製程式碼
Cross Cluster Search

Cross Cluster Search 可以讓叢集中的任意一個節點聯合查詢其他叢集中的資料, 通過配置 elasticsearch.yml 或者 API 來啟用這個功能,API 示例:

PUT _cluster/settings

{

"persistent": {

"search": {

"remote": {

"cluster_one": {

"seeds": [

"127.0.0.1:9300"

]

...

}

}

}

}

}
複製程式碼

提交以後整個叢集所有節點都會生效,都可以做為代理去做跨叢集聯合查詢,不過我們最好還是通過 Coordinating Only Nodes 去發起請求。

POST /cluster_one:decision,decision/_search

{

"match_all": {}

}
複製程式碼

對叢集 cluster_one 和本叢集中名為 Decision 的索引聯合查詢。

目前這個功能還在測試階段,但未來可能會取代 Tribe Node,之間的最大的差異是 Tribe Node 需要設定獨立的節點,而 Cross Cluster Search 不需要,叢集中的任意一個節點都可以兼任。

比如可以用我們的 Coordinating Only Nodes 做為聯合查詢節點,另一個優點是配置是動態的,不需要重啟節點。

實際上可以理解為是一個 ES 叢集之間特定的動態代理工具,支援所有操作,包括 Index 的建立和修改,並且通過 Namespace 對 Index 進行隔離,也解決了 Tribe Node 之 Index 名稱衝突的問題。

六、總結

我們在文中介紹了幾種方案用來解決時序索引的海量資料儲存和查詢的問題,根據業務特點和使用場景來單獨或組合使用能夠發揮出意想不到的效果。

特別是 Nodes 之間的讀寫分離、索引拆分、Hot-Warm 等方案的組合應用對索引的查詢和儲存效能有顯著的提升。

另外 Routing 在新版本中增加了 routing_partition_size,解決了 Shard 難以均衡的問題。

如果你的索引 Mapping 中沒有 parent-child 關聯關係可以考慮使用,對查詢的效能提升非常有效。