1. 程式人生 > >Elasticsearch 模組 - Shard Allocation 機制

Elasticsearch 模組 - Shard Allocation 機制

[原文](https://www.cnblogs.com/memento/p/14494010.html) ## 1. 背景 `shard allocation` 意思是`分片分配`, 是一個將分片分配到節點的過程; 可能發生該操作的過程包括: - 初始恢復(`initial recovery`) - 副本分配(`replica allocation`) - 重新平衡(`rebalance`) - 節點的新增和刪除 [來源](https://www.elastic.co/guide/en/elasticsearch/reference/7.4/modules-cluster.html) 分片的分配操作, 是由 `master` 角色的節點來決定什麼時候移動分片, 以及移動到哪個節點上, 以達到叢集的均衡; > **說明** > > 本文基於 Elasticsearch 7.4.0 版本 ## 2. 機制分析 ### 2.1. `Allocation` 觸發條件 - 新增或刪除 `index` 索引 - `node` 節點的新增或刪除 - 執行 `reroute` 命令 - 修改 `replica` 副本數量 - 叢集重啟 具體對應原始碼解釋:[來源](https://upload-images.jianshu.io/upload_images/6563032-f817c410e1793734.png?imageMogr2/auto-orient/strip|imageView2/2/w/1200/format/webp) | 序號 | 呼叫函式 | 說明 | | ---- | ---------------------------------------------------- | ------------------------------------------ | | 1 | AllocationService.applyStartedShards | Shard 啟動狀態修改 | | 2 | AllocationService.applyFailedShards | Shard 失效狀態修改 | | 3 | AllocationService.deassociateDeadNodes | Node 節點離開叢集 | | 4 | AllocationService.reroute(AllocationCommands) | 執行 relocation 命令 | | 5 | TransportClusterUpdateSettingsAction.masterOperation | 叢集配置修改操作 | | 6 | MetaDataCreateIndexService.onlyCreateIndex | 建立新索引 index 請求 | | 7 | MetaDataDeleteIndexService.deleteIndexs | 刪除索引 index 操作 | | 8 | MetaDataIndexStateService.closeIndex | 關閉 index 操作 | | 9 | MetaDataIndexStateService.openIndex | 開啟 index操作 | | 10 | NodeJoinController.JoinTaskExecutor | 通過叢集發現的節點加入叢集 | | 11 | GatewayService.GatewayRecoveryListener | 通過 GatewayRecovery 恢復的節點加入叢集 | | 12 | LocalAllocateDangledIndices.submitStateUpdateTask | 恢復磁碟記憶體而在 MateDate 內不存在的 index | | 13 | RestoreService.restoreSnapshot | 從 snapshot 中恢復的 index | ### 2.2. `Rebalance` 的觸發條件 在 `rebalance` 之前會經過 `2.3.2` 中介紹的**所有策略**裡實現的 `canRebalance` 方法, 全部通過後才會執行下面的 `Rebalance` 過程; `Rebalance` 過程是通過呼叫 `balanceByWeights()` 方法, 計算 shard 所在的每個 node 的 `weight` 值, $$ weightShard = node.numShards() + numAdditionalShards - balancer.avgShardsPerNode() \\ weightIndex = node.numShards(index) + numAdditionalShards - balancer.avgShardsPerNode(index) \\ weight = theta0 * weightShard + theta1 * weightIndex \\ $$ 其中: - `numAdditionalShards` 一般為 0, 呼叫 `weightShardAdded`, `weightShardRemoved` 方法時分別取值為 `1` 和 `-1`; - theta0 = `cluster.routing.allocation.balance.shard` 系統動態配置項, 預設值為 `0.45f`; - theta1 = `cluster.routing.allocation.balance.index` 系統動態配置項, 預設值為 `0.55f`; ![權重計算公式](https://img2020.cnblogs.com/blog/335284/202103/335284-20210307113042847-1974828865.png) 原始碼如下: ```java private static class WeightFunction { private final float indexBalance; private final float shardBalance; private final float theta0; private final float theta1; WeightFunction(float indexBalance, float shardBalance) { float sum = indexBalance + shardBalance; if (sum <= 0.0f) { throw new IllegalArgumentException("Balance factors must sum to a value > 0 but was: " + sum); } theta0 = shardBalance / sum; theta1 = indexBalance / sum; this.indexBalance = indexBalance; this.shardBalance = shardBalance; } float weight(Balancer balancer, ModelNode node, String index) { final float weightShard = node.numShards() - balancer.avgShardsPerNode(); final float weightIndex = node.numShards(index) - balancer.avgShardsPerNode(index); return theta0 * weightShard + theta1 * weightIndex; } } ``` ### 2.3. 原始碼分析 分片分配就是把一個分片分配到叢集中某個節點的過程, 其中分配決策包含了兩個方面: - 哪些分片應該分配到哪些節點上 - 哪個分片作為主分片, 哪個作為副本分片 Elasticsearch 主要通過兩個基礎元件來完成分片分配這個過程的: `allocator` 和 `deciders`; - `allocator` 尋找最優的節點來分配分片; - `deciders` 負責判斷並決定是否要進行分配; 1) 新建的索引 ​ `allocator` 負責找出擁有分片數量最少的節點列表, 按**分片數量**遞增排序, 分片數量較少的會被優先選擇; 對於新建索引, `allocator` 的目標是以更為均衡的方式把新索引的分片分配到叢集的節點中; ​ `deciders` 依次遍歷 `allocator` 給出的節點列表, 判斷是否要把分片分配給該節點, 比如是否滿足分配過濾規則, 分片是否將超出節點磁碟容量閾值等等; 2) 已有的索引 ​ `allocator` 對於主分片, 只允許把主分片指定在已經擁有該分片完整資料的節點上; 對於副本分片, 則是先判斷其他節點上是否已有該分片的資料的拷貝, 如果有這樣的節點, `allocator` 則優先把分片分配到這其中一個節點上; #### 2.3.1. Allocator ![Allocator](https://img2020.cnblogs.com/blog/335284/202103/335284-20210307113043229-107769562.png) - `PrimaryShardAllocator` 找到擁有某 `Shard` 最新資料(主分片)的節點; - `ReplicaShardAllocator` 找到磁碟上擁有這個 `Shard` 資料(副本分片)的節點; - `BalancedShardsAllocator` 找到擁有最少 `Shard` 個數的節點; ```java public class BalancedShardsAllocator implements ShardsAllocator { public static final Setting INDEX_BALANCE_FACTOR_SETTING = Setting.floatSetting("cluster.routing.allocation.balance.index", 0.55f, 0.0f, Property.Dynamic, Property.NodeScope); public static final Setting SHARD_BALANCE_FACTOR_SETTING = Setting.floatSetting("cluster.routing.allocation.balance.shard", 0.45f, 0.0f, Property.Dynamic, Property.NodeScope); public static final Setting THRESHOLD_SETTING = Setting.floatSetting("cluster.routing.allocation.balance.threshold", 1.0f, 0.0f, Property.Dynamic, Property.NodeScope); private volatile WeightFunction weightFunction; private volatile float threshold; } ``` #### 2.3.2. Deciders `Deciders` 決策期基礎元件的抽象類為 `AllocationDecider`: ```java public abstract class AllocationDecider { public Decision canRebalance(ShardRouting shardRouting, RoutingAllocation allocation) { return Decision.ALWAYS; } public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { return Decision.ALWAYS; } public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { return Decision.ALWAYS; } public Decision canAllocate(ShardRouting shardRouting, RoutingAllocation allocation) { return Decision.ALWAYS; } public Decision canAllocate(IndexMetadata indexMetadata, RoutingNode node, RoutingAllocation allocation) { return Decision.ALWAYS; } public Decision canAllocate(RoutingNode node, RoutingAllocation allocation) { return Decision.ALWAYS; } public Decision shouldAutoExpandToNode(IndexMetadata indexMetadata, DiscoveryNode node, RoutingAllocation allocation) { return Decision.ALWAYS; } public Decision canRebalance(RoutingAllocation allocation) { return Decision.ALWAYS; } } ``` ES 7.4.0 中的 `Decider` 決策器包括以下所示, 他們均實現上面的 `AllocationDecider` 抽象類, 並重寫 `canRebalance`, `canAllocate`, `canRemain`, `canForceAllocatePrimary` 等方法; ![AllocationDecider](https://img2020.cnblogs.com/blog/335284/202103/335284-20210307113043467-536697295.png) 決策器比較多, 大致分類如下, 並列舉決策器對應的配置項: ##### 2.3.2.1. 負載均衡類 - `SameShardAllocationDecider`: 避免主副分片分配到同一個節點; - `AwarenessAllocationDecider`: 感知分配器, 感知伺服器, 機架等, 儘量分散儲存 Shard; > 對應的配置引數有: > > `cluster.routing.allocation.awareness.attributes: rack_id` > > `cluster.routing.allocation.awareness.attributes: zone` - `ShardsLimitAllocationDecider`: 同一個節點上允許存在同一個 `index` 的 `shard` 數目; > `index.routing.allocation.total_shards_per_node`: 表示該索引每個節點上允許最多的 `shard` 數量; **預設值=-1**, 表示無限制; > > `cluster.routing.allocation.total_shards_per_node`: `cluster` 級別, 表示叢集範圍內每個節點上允許最多的 `shard` 數量, **預設值=-1**, 表示無限制; > > `index` 級別會覆蓋 `cluster` 級別; ##### 2.3.2.2. 併發控制類 - `ThrottlingAllocationDecider`: `recovery` 階段的限速配置, 避免過多的 `recovering allocation` 導致該節點的負載過高; > `cluster.routing.allocation.node_initial_primaries_recoveries`: 當前節點在進行主分片恢復時的數量, 預設值=4; > > `cluster.routing.allocation.node_concurrent_incoming_recoveries`: 預設值=2, 通常是其他節點上的副本 shard 恢復到該節點上; > > `cluster.routing.allocation.node_concurrent_outgoing_recoveries`: 預設值=2, 通常是當前節點上的主分片 shard 恢復副本分片到其他節點上; > > `cluster.routing.allocation.node_concurrent_recoveries`: 統一配置上面兩個配置項; - `ConcurrentRebalanceAllocationDecider`: `rebalace` 併發控制, 表示叢集同時允許進行 `rebalance` 操作的併發數量; > `cluster.routing.allocation.cluster_concurrent_rebalance`, 預設值=2 > > 通過檢查 `RoutingNodes` 類中維護的 `reloadingShard` 計數器, 看是否超過配置的併發數; - `DiskThresholdDecider`: 根據節點的磁碟剩餘量來決定是否分配到該節點上; > `cluster.routing.allocation.disk.threshold_enabled`, 預設值=true; > > `cluster.routing.allocation.disk.watermark.low`: 預設值=85%, 達到這個值後, 新索引的分片不會分配到該節點上; > > `cluster.routing.allocation.disk.watermark.high`: 預設值=90%, 達到這個值後, 會觸發已分配到該節點上的 `Shard` 會 `rebalance` 到其他節點上去; ##### 2.3.2.3. 條件限制類 - `RebalanceOnlyWhenActiveAllocationDecider`: 所有 `Shard` 都處於 `active` 狀態下才可以執行 `rebalance` 操作; - `FilterAllocationDecider`: 通過介面動態設定的過濾器; `cluster` 級別會覆蓋 `index` 級別; > `index.routing.allocation.require.{attribute}` > `index.routing.allocation.include.{attribute}` > `index.routing.allocation.exclude.{attribute}` > `cluster.routing.allocation.require.{attribute}` > `cluster.routing.allocation.include.{attribute}` > `cluster.routing.allocation.exclude.{attribute}` > > - require 表示必須滿足, include 表示可以分配到指定節點, exclude 表示不允許分配到指定節點; > - {attribute} 還有 ES 內建的幾個選擇, _name, _ip, _host; - `ReplicaAfterPrimaryActiveAllocationDecider`: 保證只在主分片分配完成後(`active` 狀態)才開始分配副本分片; - `ClusterRebalanceAllocationDecider`: 通過叢集中 `active` 的 `shard` 狀態來決定是否可以執行 `rebalance`; > `cluster.routing.allocation.allow_rebalance` > > `indices_all_active`(預設): 當叢集所有的節點分配完成, 才可以執行 `rebalance` 操作; > `indices_primaries_active`: 只要所有主分片分配完成, 才可以執行 `rebalance` 操作; > `always`: 任何情況下都允許 rebalance 操作; - `MaxRetryAllocationDecider`: 防止 shard 在失敗次數達到上限後繼續分配; > `index.allocation.max_retries`: 設定分配的最大失敗重試次數, 預設值=5; ##### 2.3.2.4. 其他決策類 - `EnableAllocationDecider`: 設定允許分配的分片型別; `index` 級別配置會覆蓋 `cluster` 級別配置; > `all`(預設): 允許所有型別的分片; > > `primaries`: 僅允許主分片; > > `new_primaries`: 僅允許新建索引的主分片; > > `none`: 禁止分片分配操作; - `NodeVersionAllocationDecider`: 檢查分片所在 Node 的版本是否高於目標 Node 的 ES 版本; - `SnapshotInProgressAllocationDecider`: 決定 `snapshot` 期間是否允許 `allocation`, 因為 `snapshot` 只會發生在主分片上, 所以該配置只會限制主分片的 `allocation`; > `cluster.routing.allocation.snapshot.relocation_enabled` 接下來介紹一下在 Elasticsearch 中涉及到 `Allocation` 和 `Rebalance` 的相關配置項; ## 3. cluster-level 配置 ### 3.1. Shard allocation 配置 控制分片的分配和恢復; | 配置 | 預設值 | 說明 | | :----------------------------------------------------------: | :----: | ------------------------------------------------------------ | | cluster.routing.allocation.enable | all | 啟用或禁用針對特定型別分片的分配;
1. `all`: 允許分配所有型別的分片;
2. `primaries`: 只允許分配主分片(`primary shard`);
3. `new_primaries`: 只允許分配**新索引**的主分片(`primary shard`);
4. `none`: 禁用分片分配;
該設定不會影響重啟節點時本地主分片的恢復; | | cluster.routing.allocation.node_concurrent_incoming_recoveries | 2 | 一個節點允許併發的傳入分片(`incoming shard`)數量 | | cluster.routing.allocation.node_concurrent_outgoing_recoveries | 2 | 一個節點允許併發的傳出分片(`incoming shard`)數量 | | cluster.routing.allocation.node_concurrent_recoveries | | 上面兩者的合併配置 | | cluster.routing.allocation.node_initial_primaries_recoveries | 4 | 單個節點上同時初始化的主分片數量 | | cluster.routing.allocation.same_shard.host | false | 是否執行檢查, 以防止基於`host name`和`host address`, 在單個主機上分配同一分片的多個例項; 該設定僅用於在同一臺計算機上啟動多個節點的情況; | ### 3.2. Shard rebalancing 配置 控制叢集之間的分片平衡; | 配置 | 預設值 | 說明 | | :-----------------------------------------------------: | :----------------: | ------------------------------------------------------------ | | cluster.routing.rebalance.enable | all | 啟用或禁用針對特定型別分片的`rebalancing`;
1. `all`: 允許`rebalancing`所有型別的分片;
2. `primaries`: 只允許`rebalancing`主分片;
3. `replicas`: 只允許`rebalancing`副本分片;
4. `none`: 禁用`rebalancing`; | | cluster.routing.allocation.allow_rebalance | indices_all_active | 指定何時允許執行`rebalancing`;
1. `always`: 總是允許;
2. `indices_primaries_active`: 當叢集中所有主分片已分配時才允許`rebalancing`;
3. `indices_all_active`: 當叢集中所有分片(包括主分片和副本分片)都已分配時才允許`rebalancing`; | | cluster.routing.allocation.cluster_concurrent_rebalance | 2 | 指定整個叢集中允許同時在節點間移動的分片數量; 該配置僅控制由於叢集不平衡引起的併發分片分配數量, 對分配過濾(`allocation filtering`)或強制感知(`forced awareness`)的分片分配不做限制; | ### 3.3. 分片平衡啟發式 以下配置用於決定每個分片的存放位置; 當`rebalancing`操作不再使任何節點的權重超過`balance.threshold`時, 叢集即達到平衡; | 配置 | 預設值 | 說明 | | :------------------------------------------: | :----: | ------------------------------------------------------------ | | cluster.routing.allocation.balance.shard | 0.45f | 定義節點上分配的分片總數的權重因子; 提升該值會導致叢集中所有節點趨向於分片數量相等; | | cluster.routing.allocation.balance.index | 0.55f | 定義節點上分配的每個索引的分片數量的權重因子; 提升該值會導致叢集中所有節點上每個索引的分片數量趨向於相等; | | cluster.routing.allocation.balance.threshold | 1.0f | 定義應當執行操作的最小優化值(非負浮點數); 提升該值會導致叢集在優化分片平衡方面不太積極; | ## 4. Index-level 配置 以下配置控制每個索引中的分片分配; ### 4.1. index-level 分片分配過濾([來源](https://www.elastic.co/guide/en/elasticsearch/reference/7.4/shard-allocation-filtering.html)) 配置需要分兩步: 1. 在每個 Elasticsearch 節點的 `elasticsearch.yml` 配置檔案中新增自定義節點屬性, 比如以 `small`, `medium`, `big`區分節點型別, 則配置檔案中可新增: ```yaml node.attr.size: medium ``` 或者在啟動 Elasticsearch 服務時, 在命令列裡新增 `./bin/elasticsearch -Enode.attr.size=medium`; 2. 在新建索引的 `mapping` 時, 新增`index.routing.allocation.include/exclude/require.size: medium` 的過濾配置即可; ```json PUT /_settings { "index.routing.allocation.include.size": "medium" } ``` 可以配置多個自定義節點屬性, 並且必須同時滿足索引裡配置的多個過濾條件; - **index.routing.allocation.include.{attribute}: {values}** - **index.routing.allocation.require.{attribute}: {values}** - **index.routing.allocation.exclude.{attribute}: {values}** 其中 `{attribute}` 可以是上面提到的自定義節點屬性, ES 自己也有一些內建的節點屬性: | attribute | 說明 | | ----------- | ----------------------------------------- | | _name | 通過節點名稱進行匹配 | | _host_ip | 通過節點 IP 地址進行匹配 | | _publish_ip | 通過節點的釋出 IP 地址進行匹配 | | _ip | 通過 `_host_ip` 或 `_publish_ip` 進行匹配 | | _host | 通過節點的`hostname`進行匹配 | | _id | 通過節點的 id 進行匹配 | 其中 `{values}` 可以是單個值, 也可以是逗號分隔的多個值, 也可以使用萬用字元 `*` 進行模糊匹配; ### 4.2. 設定延遲分配, 當節點離開時([來源](https://www.elastic.co/guide/en/elasticsearch/reference/7.4/delayed-allocation.html)) 當某個節點由於突發原因, 比如網路中斷, 人為操作重啟等, 需要暫時離開叢集時, 叢集會立刻新建副本分片以替換丟失的副本, 然後在剩餘的所有節點之間進行`rebalancing`, 這樣導致在短時間內該突發節點又恢復過來後, 原先的副本就無法再使用, 叢集會將剛才新建的副本分片再拷貝回到該節點上; 這樣就會造成不必要的資源浪費, 以及節點分片`rebalancing`帶來的波動; 可以使用 `index.unassigned.node_left.delayed_timeout` 動態設定來延遲由於節點離開而導致未分配的副本分片的分配問題; 該配置預設值 `1m`; ``` PUT _all/_settings { "settings": { "index.unassigned.node_left.delayed_timeout": "5m" } } ``` 修改成以上配置後, 如果在 `5m` 內, 該節點可以恢復重新加入叢集, 則叢集會自動恢復該節點的副本分片分配, 恢復速度很快; > 注意 > > 1. 此設定不影響將副本分片升級為主分片; > 2. 此設定不影響之前未分配的副本分片; > 3. 在整個叢集重新啟動後, 該延遲分配不會生效; ### 4.3. 索引恢復的優先順序([來源](https://www.elastic.co/guide/en/elasticsearch/reference/7.4/recovery-prioritization.html)) 索引分片恢復的優先順序按照: - 可選的 `index.priority` 配置, 值越大優先順序越高; - `index` 索引的建立日期, 越新的索引優先順序越高; - `index` 索引的名稱; ### 4.4. 每個節點的分片總數([來源](https://www.elastic.co/guide/en/elasticsearch/reference/7.4/allocation-total-shards.html)) | 配置 | 預設值 | 說明 | | ------------------------------------------------ | ------------- | ------------------------------------------------------------ | | index.routing.allocation.total_shards_per_node | unbounded(-1) | 指定單個節點上最多分配的分片數量, 包括主分片和副本分片;(具體某個索引) | | cluster.routing.allocation.total_shards_per_node | unbounded(-1) | 指定單個節點上最多分配的分片數量, 包括主分片和副本分片;(與索引無關, 全域性設定) | 這些配置是硬性配置, 可能會導致一些分片無法分配, 需要慎重配置; ## 5. 閱讀來源 1. [Shard allocation and cluster-level routing](https://www.elastic.co/guide/en/elasticsearch/reference/7.4/modules-cluster.html) 2. [Elasticsearch底層系列之 Shard Allocation 機制](https://cloud.tencent.com/developer/article/1361266) 3. [ELASTICSEARCH ALLOCATION 分析](https://it.baiked.com/elasticsearch/1777.html) 4. [Elasticsearch Shard Allocation 機制](https://www.jianshu.com/p/ccd177b53834) 5. [Elasticsearch Allocation&Rebalance](https://www.jianshu.com/p/a65c9