1. 程式人生 > >Elastic-Job-Lite 源碼分析 —— 作業分片策略

Elastic-Job-Lite 源碼分析 —— 作業分片策略

哈希 AD hash alloc hub strings put iat 總數

摘要: 原創出處 http://www.iocoder.cn/Elastic-Job/job-sharding-strategy/ 「芋道源碼」歡迎轉載,保留摘要,謝謝!

本文基於 Elastic-Job V2.1.5 版本分享

  • 1. 概述
  • 2. 自帶作業分片策略
    • 2.1 AverageAllocationJobShardingStrategy
    • 2.2 OdevitySortByNameJobShardingStrategy
    • 2.3 RotateServerByNameJobShardingStrategy
  • 3. 自定義作業分片策略
  • 666. 彩蛋

技術分享圖片

??????關註微信公眾號:【芋道源碼】有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有源碼分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文註釋源碼 GitHub 地址
  3. 您對於源碼的疑問每條留言都將得到認真回復。甚至不知道如何讀源碼也可以請教噢。
  4. 新的源碼解析文章實時收到通知。每周更新一篇左右。
  5. 認真的源碼交流微信群。

1. 概述

本文主要分享 Elastic-Job-Lite 作業分片策略。

涉及到主要類的類圖如下( 打開大圖 ):

技術分享圖片

你行好事會因為得到贊賞而愉悅
同理,開源項目貢獻者會因為 Star 而更加有動力
為 Elastic-Job 點贊!傳送門

2. 自帶作業分片策略

JobShardingStrategy,作業分片策略接口。分片策略通過實現接口的 #sharding(...) 方法提供作業分片的計算。

public interface JobShardingStrategy { Map<JobInstance, List<Integer>> sharding(List<JobInstance> jobInstances, String jobName, int shardingTotalCount); }

Elastic-Job-Lite 提供三種自帶的作業分片策略:

  • AverageAllocationJobShardingStrategy:基於平均分配算法的分片策略。
  • OdevitySortByNameJobShardingStrategy:根據作業名的哈希值奇偶數決定IP升降序算法的分片策略。
  • RotateServerByNameJobShardingStrategy:根據作業名的哈希值對作業節點列表進行輪轉的分片策略。

2.1 AverageAllocationJobShardingStrategy

AverageAllocationJobShardingStrategy,基於平均分配算法的分片策略。Elastic-Job-Lite 默認的作業分片策略。

如果分片不能整除,則不能整除的多余分片將依次追加到序號小的作業節點。如:
如果有3臺作業節點,分成9片,則每臺作業節點分到的分片是:1=[0,1,2], 2=[3,4,5], 3=[6,7,8]
如果有3臺作業節點,分成8片,則每臺作業節點分到的分片是:1=[0,1,6], 2=[2,3,7], 3=[4,5]
如果有3臺作業節點,分成10片,則每臺作業節點分到的分片是:1=[0,1,2,9], 2=[3,4,5], 3=[6,7,8]

代碼實現如下:

public final class AverageAllocationJobShardingStrategy implements JobShardingStrategy { @Override public Map<JobInstance, List<Integer>> sharding(final List<JobInstance> jobInstances, final String jobName, final int shardingTotalCount) { // 不存在 作業運行實例 if (jobInstances.isEmpty()) { return Collections.emptyMap(); } // 分配能被整除的部分 Map<JobInstance, List<Integer>> result = shardingAliquot(jobInstances, shardingTotalCount); // 分配不能被整除的部分 addAliquant(jobInstances, shardingTotalCount, result); return result; } }
  • 調用 #shardingAliquot(...) 方法分配能被整除的部分。能整除的咱就不舉例子。如果有 3 臺作業節點,分成 8 片,被整除的部分是前 6 片 [0, 1, 2, 3, 4, 5],調用該方法結果:1=[0,1], 2=[2,3], 3=[4,5]。

    private Map<JobInstance, List<Integer>> shardingAliquot(final List<JobInstance> shardingUnits, final int shardingTotalCount) { Map<JobInstance, List<Integer>> result = new LinkedHashMap<>(shardingTotalCount, 1); int itemCountPerSharding = shardingTotalCount / shardingUnits.size(); // 每個作業運行實例分配的平均分片數 int count = 0; for (JobInstance each : shardingUnits) { List<Integer> shardingItems = new ArrayList<>(itemCountPerSharding + 1); // 順序向下分配 for (int i = count * itemCountPerSharding; i < (count + 1) * itemCountPerSharding; i++) { shardingItems.add(i); } result.put(each, shardingItems); count++; } return result; }
  • 調用 #addAliquant(...) 方法分配能不被整除的部分。繼續上面的例子。不能被整除的部分是後 2 片 [6, 7],調用該方法結果:1=[0,1] + [6], 2=[2,3] + [7], 3=[4,5]。

    private void addAliquant(final List<JobInstance> shardingUnits, final int shardingTotalCount, final Map<JobInstance, List<Integer>> shardingResults) { int aliquant = shardingTotalCount % shardingUnits.size(); // 余數 int count = 0; for (Map.Entry<JobInstance, List<Integer>> entry : shardingResults.entrySet()) { if (count < aliquant) { entry.getValue().add(shardingTotalCount / shardingUnits.size() * shardingUnits.size() + count); } count++; } }

如何實現主備

通過作業配置設置總分片數為 1 ( JobCoreConfiguration.shardingTotalCount = 1 ),只有一個作業分片能夠分配到作業分片項,從而達到一主N備。

2.2 OdevitySortByNameJobShardingStrategy

OdevitySortByNameJobShardingStrategy,根據作業名的哈希值奇偶數決定IP升降序算法的分片策略。

作業名的哈希值為奇數則IP 降序.
作業名的哈希值為偶數則IP 升序.
用於不同的作業平均分配負載至不同的作業節點.
如:

  1. 如果有3臺作業節點, 分成2片, 作業名稱的哈希值為奇數, 則每臺作業節點分到的分片是: 1=[ ], 2=[1], 3=[0].
  2. 如果有3臺作業節點, 分成2片, 作業名稱的哈希值為偶數, 則每臺作業節點分到的分片是: 1=[0], 2=[1], 3=[ ].

實現代碼如下:

@Override public Map<JobInstance, List<Integer>> sharding(final List<JobInstance> jobInstances, final String jobName, final int shardingTotalCount) { long jobNameHash = jobName.hashCode(); if (0 == jobNameHash % 2) { Collections.reverse(jobInstances); } return averageAllocationJobShardingStrategy.sharding(jobInstances, jobName, shardingTotalCount); }
  • 從實現代碼上,仿佛和 IP 升降序沒什麽關系?答案在傳遞進來的參數 jobInstancesjobInstances 已經是按照 IP 進行降序的數組。所以當判斷到作業名的哈希值為偶數時,進行數組反轉( Collections#reverse(...) )實現按照 IP 升序。下面看下為什麽說jobInstances 已經按照 IP 進行降序:

    // ZookeeperRegistryCenter.java @Override public List<String> getChildrenKeys(final String key) { try { List<String> result = client.getChildren().forPath(key); Collections.sort(result, new Comparator<String>() { @Override public int compare(final String o1, final String o2) { return o2.compareTo(o1); } }); return result; } catch (final Exception ex) { RegExceptionHandler.handleException(ex); return Collections.emptyList(); } }
  • 調用 AverageAllocationJobShardingStrategy#sharding(...) 方法完成最終作業分片計算。

2.3 RotateServerByNameJobShardingStrategy

RotateServerByNameJobShardingStrategy,根據作業名的哈希值對作業節點列表進行輪轉的分片策略。這裏的輪轉怎麽定義呢?如果有 3 臺作業節點,順序為 [0, 1, 2],如果作業名的哈希值根據作業分片總數取模為 1, 作業節點順序變為 [1, 2, 0]。

分片的目的,是將作業的負載合理的分配到不同的作業節點上,要避免分片策略總是讓固定的作業節點負載特別大,其它工作節點負載特別小。這個也是為什麽官方對比 RotateServerByNameJobShardingStrategy、AverageAllocationJobShardingStrategy 如下:

AverageAllocationJobShardingStrategy的缺點是,一旦分片數小於作業作業節點數,作業將永遠分配至IP地址靠前的作業節點,導致IP地址靠後的作業節點空閑。如:
OdevitySortByNameJobShardingStrategy則可以根據作業名稱重新分配作業節點負載。
如果有3臺作業節點,分成2片,作業名稱的哈希值為奇數,則每臺作業節點分到的分片是:1=[0], 2=[1], 3=[]
如果有3臺作業節點,分成2片,作業名稱的哈希值為偶數,則每臺作業節點分到的分片是:3=[0], 2=[1], 1=[]

實現代碼如下:

public final class RotateServerByNameJobShardingStrategy implements JobShardingStrategy { private AverageAllocationJobShardingStrategy averageAllocationJobShardingStrategy = new AverageAllocationJobShardingStrategy(); @Override public Map<JobInstance, List<Integer>> sharding(final List<JobInstance> jobInstances, final String jobName, final int shardingTotalCount) { return averageAllocationJobShardingStrategy.sharding(rotateServerList(jobInstances, jobName), jobName, shardingTotalCount); } private List<JobInstance> rotateServerList(final List<JobInstance> shardingUnits, final String jobName) { int shardingUnitsSize = shardingUnits.size(); int offset = Math.abs(jobName.hashCode()) % shardingUnitsSize; // 輪轉開始位置 if (0 == offset) { return shardingUnits; } List<JobInstance> result = new ArrayList<>(shardingUnitsSize); for (int i = 0; i < shardingUnitsSize; i++) { int index = (i + offset) % shardingUnitsSize; result.add(shardingUnits.get(index)); } return result; } }
  • 調用 #rotateServerList(...) 實現作業節點數組輪轉。
  • 調用 AverageAllocationJobShardingStrategy#sharding(...) 方法完成最終作業分片計算。

3. 自定義作業分片策略

可能在你的業務場景下,需要實現自定義的作業分片策略。通過定義類實現 JobShardingStrategy 接口即可:

public final class OOXXShardingStrategy implements JobShardingStrategy { @Override public Map<JobInstance, List<Integer>> sharding(final List<JobInstance> jobInstances, final String jobName, final int shardingTotalCount) { // 實現邏輯 } }

實現後,配置實現類的全路徑到 Lite作業配置( LiteJobConfiguration )的 jobShardingStrategyClass 屬性。

作業進行分片計算時,作業分片策略工廠( JobShardingStrategyFactory ) 會創建作業分片策略實例:

public final class JobShardingStrategyFactory { public static JobShardingStrategy getStrategy(final String jobShardingStrategyClassName) { if (Strings.isNullOrEmpty(jobShardingStrategyClassName)) { return new AverageAllocationJobShardingStrategy(); } try { Class<?> jobShardingStrategyClass = Class.forName(jobShardingStrategyClassName); if (!JobShardingStrategy.class.isAssignableFrom(jobShardingStrategyClass)) { throw new JobConfigurationException("Class ‘%s‘ is not job strategy class", jobShardingStrategyClassName); } return (JobShardingStrategy) jobShardingStrategyClass.newInstance(); } catch (final ClassNotFoundException | InstantiationException | IllegalAccessException ex) { throw new JobConfigurationException("Sharding strategy class ‘%s‘ config error, message details are ‘%s‘", jobShardingStrategyClassName, ex.getMessage()); } } }

Elastic-Job-Lite 源碼分析 —— 作業分片策略