Elasticsearch 模組 - Shard Allocation 機制
阿新 • • 發佈:2021-03-07
[原文](https://www.cnblogs.com/memento/p/14494010.html)
## 1. 背景
`shard allocation` 意思是`分片分配`, 是一個將分片分配到節點的過程; 可能發生該操作的過程包括:
- 初始恢復(`initial recovery`)
- 副本分配(`replica allocation`)
- 重新平衡(`rebalance`)
- 節點的新增和刪除
[來源](https://www.elastic.co/guide/en/elasticsearch/reference/7.4/modules-cluster.html)
分片的分配操作, 是由 `master` 角色的節點來決定什麼時候移動分片, 以及移動到哪個節點上, 以達到叢集的均衡;
> **說明**
>
> 本文基於 Elasticsearch 7.4.0 版本
## 2. 機制分析
### 2.1. `Allocation` 觸發條件
- 新增或刪除 `index` 索引
- `node` 節點的新增或刪除
- 執行 `reroute` 命令
- 修改 `replica` 副本數量
- 叢集重啟
具體對應原始碼解釋:[來源](https://upload-images.jianshu.io/upload_images/6563032-f817c410e1793734.png?imageMogr2/auto-orient/strip|imageView2/2/w/1200/format/webp)
| 序號 | 呼叫函式 | 說明 |
| ---- | ---------------------------------------------------- | ------------------------------------------ |
| 1 | AllocationService.applyStartedShards | Shard 啟動狀態修改 |
| 2 | AllocationService.applyFailedShards | Shard 失效狀態修改 |
| 3 | AllocationService.deassociateDeadNodes | Node 節點離開叢集 |
| 4 | AllocationService.reroute(AllocationCommands) | 執行 relocation 命令 |
| 5 | TransportClusterUpdateSettingsAction.masterOperation | 叢集配置修改操作 |
| 6 | MetaDataCreateIndexService.onlyCreateIndex | 建立新索引 index 請求 |
| 7 | MetaDataDeleteIndexService.deleteIndexs | 刪除索引 index 操作 |
| 8 | MetaDataIndexStateService.closeIndex | 關閉 index 操作 |
| 9 | MetaDataIndexStateService.openIndex | 開啟 index操作 |
| 10 | NodeJoinController.JoinTaskExecutor | 通過叢集發現的節點加入叢集 |
| 11 | GatewayService.GatewayRecoveryListener | 通過 GatewayRecovery 恢復的節點加入叢集 |
| 12 | LocalAllocateDangledIndices.submitStateUpdateTask | 恢復磁碟記憶體而在 MateDate 內不存在的 index |
| 13 | RestoreService.restoreSnapshot | 從 snapshot 中恢復的 index |
### 2.2. `Rebalance` 的觸發條件
在 `rebalance` 之前會經過 `2.3.2` 中介紹的**所有策略**裡實現的 `canRebalance` 方法, 全部通過後才會執行下面的 `Rebalance` 過程;
`Rebalance` 過程是通過呼叫 `balanceByWeights()` 方法, 計算 shard 所在的每個 node 的 `weight` 值,
$$
weightShard = node.numShards() + numAdditionalShards - balancer.avgShardsPerNode() \\
weightIndex = node.numShards(index) + numAdditionalShards - balancer.avgShardsPerNode(index) \\
weight = theta0 * weightShard + theta1 * weightIndex \\
$$
其中:
- `numAdditionalShards` 一般為 0, 呼叫 `weightShardAdded`, `weightShardRemoved` 方法時分別取值為 `1` 和 `-1`;
- theta0 = `cluster.routing.allocation.balance.shard` 系統動態配置項, 預設值為 `0.45f`;
- theta1 = `cluster.routing.allocation.balance.index` 系統動態配置項, 預設值為 `0.55f`;
![權重計算公式](https://img2020.cnblogs.com/blog/335284/202103/335284-20210307113042847-1974828865.png)
原始碼如下:
```java
private static class WeightFunction {
private final float indexBalance;
private final float shardBalance;
private final float theta0;
private final float theta1;
WeightFunction(float indexBalance, float shardBalance) {
float sum = indexBalance + shardBalance;
if (sum <= 0.0f) {
throw new IllegalArgumentException("Balance factors must sum to a value > 0 but was: " + sum);
}
theta0 = shardBalance / sum;
theta1 = indexBalance / sum;
this.indexBalance = indexBalance;
this.shardBalance = shardBalance;
}
float weight(Balancer balancer, ModelNode node, String index) {
final float weightShard = node.numShards() - balancer.avgShardsPerNode();
final float weightIndex = node.numShards(index) - balancer.avgShardsPerNode(index);
return theta0 * weightShard + theta1 * weightIndex;
}
}
```
### 2.3. 原始碼分析
分片分配就是把一個分片分配到叢集中某個節點的過程, 其中分配決策包含了兩個方面:
- 哪些分片應該分配到哪些節點上
- 哪個分片作為主分片, 哪個作為副本分片
Elasticsearch 主要通過兩個基礎元件來完成分片分配這個過程的: `allocator` 和 `deciders`;
- `allocator` 尋找最優的節點來分配分片;
- `deciders` 負責判斷並決定是否要進行分配;
1) 新建的索引
`allocator` 負責找出擁有分片數量最少的節點列表, 按**分片數量**遞增排序, 分片數量較少的會被優先選擇; 對於新建索引, `allocator` 的目標是以更為均衡的方式把新索引的分片分配到叢集的節點中;
`deciders` 依次遍歷 `allocator` 給出的節點列表, 判斷是否要把分片分配給該節點, 比如是否滿足分配過濾規則, 分片是否將超出節點磁碟容量閾值等等;
2) 已有的索引
`allocator` 對於主分片, 只允許把主分片指定在已經擁有該分片完整資料的節點上; 對於副本分片, 則是先判斷其他節點上是否已有該分片的資料的拷貝, 如果有這樣的節點, `allocator` 則優先把分片分配到這其中一個節點上;
#### 2.3.1. Allocator
![Allocator](https://img2020.cnblogs.com/blog/335284/202103/335284-20210307113043229-107769562.png)
- `PrimaryShardAllocator` 找到擁有某 `Shard` 最新資料(主分片)的節點;
- `ReplicaShardAllocator` 找到磁碟上擁有這個 `Shard` 資料(副本分片)的節點;
- `BalancedShardsAllocator` 找到擁有最少 `Shard` 個數的節點;
```java
public class BalancedShardsAllocator implements ShardsAllocator {
public static final Setting INDEX_BALANCE_FACTOR_SETTING = Setting.floatSetting("cluster.routing.allocation.balance.index", 0.55f, 0.0f, Property.Dynamic, Property.NodeScope);
public static final Setting SHARD_BALANCE_FACTOR_SETTING = Setting.floatSetting("cluster.routing.allocation.balance.shard", 0.45f, 0.0f, Property.Dynamic, Property.NodeScope);
public static final Setting THRESHOLD_SETTING = Setting.floatSetting("cluster.routing.allocation.balance.threshold", 1.0f, 0.0f, Property.Dynamic, Property.NodeScope);
private volatile WeightFunction weightFunction;
private volatile float threshold;
}
```
#### 2.3.2. Deciders
`Deciders` 決策期基礎元件的抽象類為 `AllocationDecider`:
```java
public abstract class AllocationDecider {
public Decision canRebalance(ShardRouting shardRouting, RoutingAllocation allocation) {
return Decision.ALWAYS;
}
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
return Decision.ALWAYS;
}
public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
return Decision.ALWAYS;
}
public Decision canAllocate(ShardRouting shardRouting, RoutingAllocation allocation) {
return Decision.ALWAYS;
}
public Decision canAllocate(IndexMetadata indexMetadata, RoutingNode node, RoutingAllocation allocation) {
return Decision.ALWAYS;
}
public Decision canAllocate(RoutingNode node, RoutingAllocation allocation) {
return Decision.ALWAYS;
}
public Decision shouldAutoExpandToNode(IndexMetadata indexMetadata, DiscoveryNode node, RoutingAllocation allocation) {
return Decision.ALWAYS;
}
public Decision canRebalance(RoutingAllocation allocation) {
return Decision.ALWAYS;
}
}
```
ES 7.4.0 中的 `Decider` 決策器包括以下所示, 他們均實現上面的 `AllocationDecider` 抽象類, 並重寫 `canRebalance`, `canAllocate`, `canRemain`, `canForceAllocatePrimary` 等方法;
![AllocationDecider](https://img2020.cnblogs.com/blog/335284/202103/335284-20210307113043467-536697295.png)
決策器比較多, 大致分類如下, 並列舉決策器對應的配置項:
##### 2.3.2.1. 負載均衡類
- `SameShardAllocationDecider`: 避免主副分片分配到同一個節點;
- `AwarenessAllocationDecider`: 感知分配器, 感知伺服器, 機架等, 儘量分散儲存 Shard;
> 對應的配置引數有:
>
> `cluster.routing.allocation.awareness.attributes: rack_id`
>
> `cluster.routing.allocation.awareness.attributes: zone`
- `ShardsLimitAllocationDecider`: 同一個節點上允許存在同一個 `index` 的 `shard` 數目;
> `index.routing.allocation.total_shards_per_node`: 表示該索引每個節點上允許最多的 `shard` 數量; **預設值=-1**, 表示無限制;
>
> `cluster.routing.allocation.total_shards_per_node`: `cluster` 級別, 表示叢集範圍內每個節點上允許最多的 `shard` 數量, **預設值=-1**, 表示無限制;
>
> `index` 級別會覆蓋 `cluster` 級別;
##### 2.3.2.2. 併發控制類
- `ThrottlingAllocationDecider`: `recovery` 階段的限速配置, 避免過多的 `recovering allocation` 導致該節點的負載過高;
> `cluster.routing.allocation.node_initial_primaries_recoveries`: 當前節點在進行主分片恢復時的數量, 預設值=4;
>
> `cluster.routing.allocation.node_concurrent_incoming_recoveries`: 預設值=2, 通常是其他節點上的副本 shard 恢復到該節點上;
>
> `cluster.routing.allocation.node_concurrent_outgoing_recoveries`: 預設值=2, 通常是當前節點上的主分片 shard 恢復副本分片到其他節點上;
>
> `cluster.routing.allocation.node_concurrent_recoveries`: 統一配置上面兩個配置項;
- `ConcurrentRebalanceAllocationDecider`: `rebalace` 併發控制, 表示叢集同時允許進行 `rebalance` 操作的併發數量;
> `cluster.routing.allocation.cluster_concurrent_rebalance`, 預設值=2
>
> 通過檢查 `RoutingNodes` 類中維護的 `reloadingShard` 計數器, 看是否超過配置的併發數;
- `DiskThresholdDecider`: 根據節點的磁碟剩餘量來決定是否分配到該節點上;
> `cluster.routing.allocation.disk.threshold_enabled`, 預設值=true;
>
> `cluster.routing.allocation.disk.watermark.low`: 預設值=85%, 達到這個值後, 新索引的分片不會分配到該節點上;
>
> `cluster.routing.allocation.disk.watermark.high`: 預設值=90%, 達到這個值後, 會觸發已分配到該節點上的 `Shard` 會 `rebalance` 到其他節點上去;
##### 2.3.2.3. 條件限制類
- `RebalanceOnlyWhenActiveAllocationDecider`: 所有 `Shard` 都處於 `active` 狀態下才可以執行 `rebalance` 操作;
- `FilterAllocationDecider`: 通過介面動態設定的過濾器; `cluster` 級別會覆蓋 `index` 級別;
> `index.routing.allocation.require.{attribute}`
> `index.routing.allocation.include.{attribute}`
> `index.routing.allocation.exclude.{attribute}`
> `cluster.routing.allocation.require.{attribute}`
> `cluster.routing.allocation.include.{attribute}`
> `cluster.routing.allocation.exclude.{attribute}`
>
> - require 表示必須滿足, include 表示可以分配到指定節點, exclude 表示不允許分配到指定節點;
> - {attribute} 還有 ES 內建的幾個選擇, _name, _ip, _host;
- `ReplicaAfterPrimaryActiveAllocationDecider`: 保證只在主分片分配完成後(`active` 狀態)才開始分配副本分片;
- `ClusterRebalanceAllocationDecider`: 通過叢集中 `active` 的 `shard` 狀態來決定是否可以執行 `rebalance`;
> `cluster.routing.allocation.allow_rebalance`
>
> `indices_all_active`(預設): 當叢集所有的節點分配完成, 才可以執行 `rebalance` 操作;
> `indices_primaries_active`: 只要所有主分片分配完成, 才可以執行 `rebalance` 操作;
> `always`: 任何情況下都允許 rebalance 操作;
- `MaxRetryAllocationDecider`: 防止 shard 在失敗次數達到上限後繼續分配;
> `index.allocation.max_retries`: 設定分配的最大失敗重試次數, 預設值=5;
##### 2.3.2.4. 其他決策類
- `EnableAllocationDecider`: 設定允許分配的分片型別; `index` 級別配置會覆蓋 `cluster` 級別配置;
> `all`(預設): 允許所有型別的分片;
>
> `primaries`: 僅允許主分片;
>
> `new_primaries`: 僅允許新建索引的主分片;
>
> `none`: 禁止分片分配操作;
- `NodeVersionAllocationDecider`: 檢查分片所在 Node 的版本是否高於目標 Node 的 ES 版本;
- `SnapshotInProgressAllocationDecider`: 決定 `snapshot` 期間是否允許 `allocation`, 因為 `snapshot` 只會發生在主分片上, 所以該配置只會限制主分片的 `allocation`;
> `cluster.routing.allocation.snapshot.relocation_enabled`
接下來介紹一下在 Elasticsearch 中涉及到 `Allocation` 和 `Rebalance` 的相關配置項;
## 3. cluster-level 配置
### 3.1. Shard allocation 配置
控制分片的分配和恢復;
| 配置 | 預設值 | 說明 |
| :----------------------------------------------------------: | :----: | ------------------------------------------------------------ |
| cluster.routing.allocation.enable | all | 啟用或禁用針對特定型別分片的分配;
1. `all`: 允許分配所有型別的分片;
2. `primaries`: 只允許分配主分片(`primary shard`);
3. `new_primaries`: 只允許分配**新索引**的主分片(`primary shard`);
4. `none`: 禁用分片分配;
該設定不會影響重啟節點時本地主分片的恢復; | | cluster.routing.allocation.node_concurrent_incoming_recoveries | 2 | 一個節點允許併發的傳入分片(`incoming shard`)數量 | | cluster.routing.allocation.node_concurrent_outgoing_recoveries | 2 | 一個節點允許併發的傳出分片(`incoming shard`)數量 | | cluster.routing.allocation.node_concurrent_recoveries | | 上面兩者的合併配置 | | cluster.routing.allocation.node_initial_primaries_recoveries | 4 | 單個節點上同時初始化的主分片數量 | | cluster.routing.allocation.same_shard.host | false | 是否執行檢查, 以防止基於`host name`和`host address`, 在單個主機上分配同一分片的多個例項; 該設定僅用於在同一臺計算機上啟動多個節點的情況; | ### 3.2. Shard rebalancing 配置 控制叢集之間的分片平衡; | 配置 | 預設值 | 說明 | | :-----------------------------------------------------: | :----------------: | ------------------------------------------------------------ | | cluster.routing.rebalance.enable | all | 啟用或禁用針對特定型別分片的`rebalancing`;
1. `all`: 允許`rebalancing`所有型別的分片;
2. `primaries`: 只允許`rebalancing`主分片;
3. `replicas`: 只允許`rebalancing`副本分片;
4. `none`: 禁用`rebalancing`; | | cluster.routing.allocation.allow_rebalance | indices_all_active | 指定何時允許執行`rebalancing`;
1. `always`: 總是允許;
2. `indices_primaries_active`: 當叢集中所有主分片已分配時才允許`rebalancing`;
3. `indices_all_active`: 當叢集中所有分片(包括主分片和副本分片)都已分配時才允許`rebalancing`; |
| cluster.routing.allocation.cluster_concurrent_rebalance | 2 | 指定整個叢集中允許同時在節點間移動的分片數量; 該配置僅控制由於叢集不平衡引起的併發分片分配數量, 對分配過濾(`allocation filtering`)或強制感知(`forced awareness`)的分片分配不做限制; |
### 3.3. 分片平衡啟發式
以下配置用於決定每個分片的存放位置; 當`rebalancing`操作不再使任何節點的權重超過`balance.threshold`時, 叢集即達到平衡;
| 配置 | 預設值 | 說明 |
| :------------------------------------------: | :----: | ------------------------------------------------------------ |
| cluster.routing.allocation.balance.shard | 0.45f | 定義節點上分配的分片總數的權重因子; 提升該值會導致叢集中所有節點趨向於分片數量相等; |
| cluster.routing.allocation.balance.index | 0.55f | 定義節點上分配的每個索引的分片數量的權重因子; 提升該值會導致叢集中所有節點上每個索引的分片數量趨向於相等; |
| cluster.routing.allocation.balance.threshold | 1.0f | 定義應當執行操作的最小優化值(非負浮點數); 提升該值會導致叢集在優化分片平衡方面不太積極; |
## 4. Index-level 配置
以下配置控制每個索引中的分片分配;
### 4.1. index-level 分片分配過濾([來源](https://www.elastic.co/guide/en/elasticsearch/reference/7.4/shard-allocation-filtering.html))
配置需要分兩步:
1. 在每個 Elasticsearch 節點的 `elasticsearch.yml` 配置檔案中新增自定義節點屬性, 比如以 `small`, `medium`, `big`區分節點型別, 則配置檔案中可新增:
```yaml
node.attr.size: medium
```
或者在啟動 Elasticsearch 服務時, 在命令列裡新增 `./bin/elasticsearch -Enode.attr.size=medium`;
2. 在新建索引的 `mapping` 時, 新增`index.routing.allocation.include/exclude/require.size: medium` 的過濾配置即可;
```json
PUT /_settings
{
"index.routing.allocation.include.size": "medium"
}
```
可以配置多個自定義節點屬性, 並且必須同時滿足索引裡配置的多個過濾條件;
- **index.routing.allocation.include.{attribute}: {values}**
- **index.routing.allocation.require.{attribute}: {values}**
- **index.routing.allocation.exclude.{attribute}: {values}**
其中 `{attribute}` 可以是上面提到的自定義節點屬性, ES 自己也有一些內建的節點屬性:
| attribute | 說明 |
| ----------- | ----------------------------------------- |
| _name | 通過節點名稱進行匹配 |
| _host_ip | 通過節點 IP 地址進行匹配 |
| _publish_ip | 通過節點的釋出 IP 地址進行匹配 |
| _ip | 通過 `_host_ip` 或 `_publish_ip` 進行匹配 |
| _host | 通過節點的`hostname`進行匹配 |
| _id | 通過節點的 id 進行匹配 |
其中 `{values}` 可以是單個值, 也可以是逗號分隔的多個值, 也可以使用萬用字元 `*` 進行模糊匹配;
### 4.2. 設定延遲分配, 當節點離開時([來源](https://www.elastic.co/guide/en/elasticsearch/reference/7.4/delayed-allocation.html))
當某個節點由於突發原因, 比如網路中斷, 人為操作重啟等, 需要暫時離開叢集時, 叢集會立刻新建副本分片以替換丟失的副本, 然後在剩餘的所有節點之間進行`rebalancing`, 這樣導致在短時間內該突發節點又恢復過來後, 原先的副本就無法再使用, 叢集會將剛才新建的副本分片再拷貝回到該節點上; 這樣就會造成不必要的資源浪費, 以及節點分片`rebalancing`帶來的波動;
可以使用 `index.unassigned.node_left.delayed_timeout` 動態設定來延遲由於節點離開而導致未分配的副本分片的分配問題; 該配置預設值 `1m`;
```
PUT _all/_settings
{
"settings": {
"index.unassigned.node_left.delayed_timeout": "5m"
}
}
```
修改成以上配置後, 如果在 `5m` 內, 該節點可以恢復重新加入叢集, 則叢集會自動恢復該節點的副本分片分配, 恢復速度很快;
> 注意
>
> 1. 此設定不影響將副本分片升級為主分片;
> 2. 此設定不影響之前未分配的副本分片;
> 3. 在整個叢集重新啟動後, 該延遲分配不會生效;
### 4.3. 索引恢復的優先順序([來源](https://www.elastic.co/guide/en/elasticsearch/reference/7.4/recovery-prioritization.html))
索引分片恢復的優先順序按照:
- 可選的 `index.priority` 配置, 值越大優先順序越高;
- `index` 索引的建立日期, 越新的索引優先順序越高;
- `index` 索引的名稱;
### 4.4. 每個節點的分片總數([來源](https://www.elastic.co/guide/en/elasticsearch/reference/7.4/allocation-total-shards.html))
| 配置 | 預設值 | 說明 |
| ------------------------------------------------ | ------------- | ------------------------------------------------------------ |
| index.routing.allocation.total_shards_per_node | unbounded(-1) | 指定單個節點上最多分配的分片數量, 包括主分片和副本分片;(具體某個索引) |
| cluster.routing.allocation.total_shards_per_node | unbounded(-1) | 指定單個節點上最多分配的分片數量, 包括主分片和副本分片;(與索引無關, 全域性設定) |
這些配置是硬性配置, 可能會導致一些分片無法分配, 需要慎重配置;
## 5. 閱讀來源
1. [Shard allocation and cluster-level routing](https://www.elastic.co/guide/en/elasticsearch/reference/7.4/modules-cluster.html)
2. [Elasticsearch底層系列之 Shard Allocation 機制](https://cloud.tencent.com/developer/article/1361266)
3. [ELASTICSEARCH ALLOCATION 分析](https://it.baiked.com/elasticsearch/1777.html)
4. [Elasticsearch Shard Allocation 機制](https://www.jianshu.com/p/ccd177b53834)
5. [Elasticsearch Allocation&Rebalance](https://www.jianshu.com/p/a65c9
2. `primaries`: 只允許分配主分片(`primary shard`);
3. `new_primaries`: 只允許分配**新索引**的主分片(`primary shard`);
4. `none`: 禁用分片分配;
該設定不會影響重啟節點時本地主分片的恢復; | | cluster.routing.allocation.node_concurrent_incoming_recoveries | 2 | 一個節點允許併發的傳入分片(`incoming shard`)數量 | | cluster.routing.allocation.node_concurrent_outgoing_recoveries | 2 | 一個節點允許併發的傳出分片(`incoming shard`)數量 | | cluster.routing.allocation.node_concurrent_recoveries | | 上面兩者的合併配置 | | cluster.routing.allocation.node_initial_primaries_recoveries | 4 | 單個節點上同時初始化的主分片數量 | | cluster.routing.allocation.same_shard.host | false | 是否執行檢查, 以防止基於`host name`和`host address`, 在單個主機上分配同一分片的多個例項; 該設定僅用於在同一臺計算機上啟動多個節點的情況; | ### 3.2. Shard rebalancing 配置 控制叢集之間的分片平衡; | 配置 | 預設值 | 說明 | | :-----------------------------------------------------: | :----------------: | ------------------------------------------------------------ | | cluster.routing.rebalance.enable | all | 啟用或禁用針對特定型別分片的`rebalancing`;
2. `primaries`: 只允許`rebalancing`主分片;
3. `replicas`: 只允許`rebalancing`副本分片;
4. `none`: 禁用`rebalancing`; | | cluster.routing.allocation.allow_rebalance | indices_all_active | 指定何時允許執行`rebalancing`;
1. `always`: 總是允許;
2. `indices_primaries_active`: 當叢集中所有主分片已分配時才允許`rebalancing`;