揪出XXL-JOB中的細節
前言
國慶節快到了,要開始休假了,筆者還是很開心的,國慶快樂!
廢話少說,直接進入正題。
相信大家對XXL-JOB
都很瞭解,故本文對原始碼不進行過多介紹,側重的是看原始碼過程中想到的幾個知識點,不一定都對,請大神們批評指正。
XXL-JOB簡介
-
XXL-JOB
是一個輕量級分散式任務排程平臺,其核心設計目標是開發迅速、學習簡單、輕量級、易擴充套件。現已開放原始碼並接入多家公司線上產品線,開箱即用。 -
XXL-JOB
分為排程中心、執行器、資料中心,排程中心負責任務管理及排程、執行器管理、日誌管理等,執行器負責任務執行及執行結果回撥。
任務排程 - “類時間輪”的實現
時間輪
時間輪出自Netty
HashedWheelTimer
,是一個環形結構,可以用時鐘來類比,鐘面上有很多bucket
,每一個bucket
上可以存放多個任務,使用一個List
儲存該時刻到期的所有任務,同時一個指標隨著時間流逝一格一格轉動,並執行對應bucket
上所有到期的任務。任務通過取模決定應該放入哪個bucket
。和HashMap
的原理類似,newTask
對應put
,使用List
來解決 Hash 衝突。
以上圖為例,假設一個bucket
是1秒,則指標轉動一輪表示的時間段為8s,假設當前指標指向 0,此時需要排程一個3s後執行的任務,顯然應該加入到(0+3=3)的方格中,指標再走3s次就可以執行了;如果任務要在10s後執行,應該等指標走完一輪零2格再執行,因此應放入2,同時將round(1)
round
為0的,bucket
上其他任務的round
減1。
當然,還有優化的“分層時間輪”的實現,請參考https://cnkirito.moe/timer/。
XXL-JOB中的“時間輪”
-
XXL-JOB中的排程方式從
Quartz
變成了自研排程的方式,很像時間輪,可以理解為有60個bucket
且每個bucket
為1秒,但是沒有了round
的概念。 -
具體可以看下圖。
- XXL-JOB中負責任務排程的有兩個執行緒,分別為
ringThread
和scheduleThread
,其作用如下。
1、scheduleThread:對任務資訊進行讀取,預讀未來5s即將觸發的任務,放入時間輪。 2、ringThread:對當前
bucket
和前一個bucket
中的任務取出並執行。
- 下面結合原始碼看下,為什麼說是“類時間輪”,關鍵程式碼附上了註解,請大家留意觀看。
// 環狀結構
private volatile static Map<Integer,List<Integer>> ringData = new ConcurrentHashMap<>();
// 任務下次啟動時間(單位為秒) % 60
int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);
// 任務放進時間輪
private void pushTimeRing(int ringSecond,int jobId){
// push async ring
List<Integer> ringItemData = ringData.get(ringSecond);
if (ringItemData == null) {
ringItemData = new ArrayList<Integer>();
ringData.put(ringSecond,ringItemData);
}
ringItemData.add(jobId);
}
複製程式碼
// 同時取兩個時間刻度的任務
List<Integer> ringItemData = new ArrayList<>();
int nowSecond = Calendar.getInstance().get(Calendar.SECOND);
// 避免處理耗時太長,跨過刻度,向前校驗一個刻度;
for (int i = 0; i < 2; i++) {
List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 );
if (tmpData != null) {
ringItemData.addAll(tmpData);
}
}
// 執行
for (int jobId: ringItemData) {
JobTriggerPoolHelper.trigger(jobId,TriggerTypeEnum.CRON,-1,null,null);
}
複製程式碼
一致性Hash路由中的Hash演演算法
- 大家也知道,
XXL-JOB
在執行任務時,任務具體在哪個執行器上執行是根據路由策略來決定的,其中有一個策略是一致性Hash策略(原始碼在ExecutorRouteConsistentHash.java),自然而然想到了一致性Hash演演算法。 - 一致性Hash演演算法是為瞭解決分散式系統中負載均衡的問題時候可以使用Hash演演算法讓固定的一部分請求落到同一臺伺服器上,這樣每臺伺服器固定處理一部分請求(並維護這些請求的資訊),起到負載均衡的作用。
- 普通的餘數hash(hash(比如使用者id)%伺服器機器數)演演算法伸縮性很差,當新增或者下線伺服器機器時候,使用者id與伺服器的對映關係會大量失效。一致性hash則利用hash環對其進行了改進。
- 一致性Hash演演算法在實踐中,當伺服器節點比較少的時候會出現上節所說的一致性hash傾斜的問題,一個解決方法是多加機器,但是加機器是有成本的,那麼就加虛擬節點。
- 具體原理請參考https://www.jianshu.com/p/e968c081f563。
- 下圖為帶有虛擬節點的Hash環,其中ip1-1是ip1的虛擬節點,ip2-1是ip2的虛擬節點,ip3-1是ip3的虛擬節點。
可見,一致性Hash演演算法的關鍵在於Hash演演算法,保證虛擬節點及Hash結果的均勻性, 而均勻性可以理解為減少Hash衝突,Hash衝突的知識點請參考從HashMap,Redis 字典看【Hash】。。。。
- XXL-JOB中的一致性Hash的Hash函式如下。
// jobId轉換為md5
// 不直接用hashCode() 是因為擴大hash取值範圍,減少衝突
byte[] digest = md5.digest();
// 32位hashCode
long hashCode = ((long) (digest[3] & 0xFF) << 24)
| ((long) (digest[2] & 0xFF) << 16)
| ((long) (digest[1] & 0xFF) << 8)
| (digest[0] & 0xFF);
long truncateHashCode = hashCode & 0xffffffffL;
複製程式碼
- 看到上圖的Hash函式,讓我想到了
HashMap
的Hash函式
f(key) = hash(key) & (table.length - 1)
// 使用>>> 16的原因,hashCode()的高位和低位都對f(key)有了一定影響力,使得分佈更加均勻,雜湊衝突的機率就小了。
hash(key) = (h = key.hashCode()) ^ (h >>> 16)
複製程式碼
- 同理,將jobId的md5編碼的高低位都對Hash結果有影響,使得Hash衝突的概率減小。
分片任務的實現 - 維護執行緒上下文
-
XXL-JOB的分片任務實現了任務的分散式執行,其實是筆者調研的重點,日常開發中很多定時任務都是單機執行,對於後續資料量大的任務最好有一個分散式的解決方案。
-
分片任務的路由策略,原始碼作者提出了分片廣播的概念,剛開始還有點摸不清頭腦,看了原始碼逐漸清晰了起來。
-
想必看過原始碼的也遇到過這麼一個小插曲,路由策略咋沒實現?如下圖所示。
public enum ExecutorRouteStrategyEnum {
FIRST(I18nUtil.getString("jobconf_route_first"),new ExecutorRouteFirst()),LAST(I18nUtil.getString("jobconf_route_last"),new ExecutorRouteLast()),ROUND(I18nUtil.getString("jobconf_route_round"),new ExecutorRouteRound()),RANDOM(I18nUtil.getString("jobconf_route_random"),new ExecutorRouteRandom()),CONSISTENT_HASH(I18nUtil.getString("jobconf_route_consistenthash"),new ExecutorRouteConsistentHash()),LEAST_FREQUENTLY_USED(I18nUtil.getString("jobconf_route_lfu"),new ExecutorRouteLFU()),LEAST_RECENTLY_USED(I18nUtil.getString("jobconf_route_lru"),new ExecutorRouteLRU()),FAILOVER(I18nUtil.getString("jobconf_route_failover"),new ExecutorRouteFailover()),BUSYOVER(I18nUtil.getString("jobconf_route_busyover"),new ExecutorRouteBusyover()),// 說好的實現呢???竟然是null
SHARDING_BROADCAST(I18nUtil.getString("jobconf_route_shard"),null);
複製程式碼
- 再繼續追查得到了結論,待我慢慢道來,首先分片任務執行引數傳遞的是什麼?看
XxlJobTrigger.trigger
函式中的一段程式碼。
...
// 如果是分片路由,走的是這段邏輯
if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(),null)
&& group.getRegistryList() != null && !group.getRegistryList().isEmpty()
&& shardingParam == null) {
for (int i = 0; i < group.getRegistryList().size(); i++) {
// 最後兩個引數,i是當前機器在執行器叢集當中的index,group.getRegistryList().size()為執行器總數
processTrigger(group,jobInfo,finalFailRetryCount,triggerType,i,group.getRegistryList().size());
}
}
...
複製程式碼
- 引數經過自研RPC傳遞到執行器,在執行器中具體負責任務執行的
JobThread.run
中,看到了如下程式碼。
// 分片廣播的引數比set進了ShardingUtil
ShardingUtil.setShardingVo(new ShardingUtil.ShardingVO(triggerParam.getBroadcastIndex(),triggerParam.getBroadcastTotal()));
...
// 將執行引數傳遞給jobHandler執行
handler.execute(triggerParamTmp.getExecutorParams())
複製程式碼
- 接著看
ShardingUtil
,才發現了其中的奧祕,請看程式碼。
public class ShardingUtil {
// 執行緒上下文
private static InheritableThreadLocal<ShardingVO> contextHolder = new InheritableThreadLocal<ShardingVO>();
// 分片引數物件
public static class ShardingVO {
private int index; // sharding index
private int total; // sharding total
// 次數省略 get/set
}
// 引數物件注入上下文
public static void setShardingVo(ShardingVO shardingVo){
contextHolder.set(shardingVo);
}
// 從上下文中取出引數物件
public static ShardingVO getShardingVo(){
return contextHolder.get();
}
}
複製程式碼
- 顯而易見,在負責分片任務的
ShardingJobHandler
裡取出了執行緒上下文中的分片引數,這裡也給個程式碼把~
@JobHandler(value="shardingJobHandler")
@Service
public class ShardingJobHandler extends IJobHandler {
@Override
public ReturnT<String> execute(String param) throws Exception {
// 分片引數
ShardingUtil.ShardingVO shardingVO = ShardingUtil.getShardingVo();
XxlJobLogger.log("分片引數:當前分片序號 = {},總分片數 = {}",shardingVO.getIndex(),shardingVO.getTotal());
// 業務邏輯
for (int i = 0; i < shardingVO.getTotal(); i++) {
if (i == shardingVO.getIndex()) {
XxlJobLogger.log("第 {} 片,命中分片開始處理",i);
} else {
XxlJobLogger.log("第 {} 片,忽略",i);
}
}
return SUCCESS;
}
}
複製程式碼
- 由此得出,分散式實現是根據分片引數
index
及total
來做的,簡單來講,就是給出了當前執行器的標識,根據這個標識將任務的資料或者邏輯進行區分,即可實現分散式執行。 - 題外話:至於為什麼用外部注入分片引數的方式,不直接
execute
傳遞?
1、可能是因為只有分片任務才用到這兩個引數 2、IJobHandler只有String型別引數
看完原始碼後的思考
- 1、經過此次看原始碼,XXL-JOB的設計目標確實符合開發迅速、學習簡單、輕量級、易擴充套件。
- 2、至於自研RPC還沒有具體考量,具體接入應該會考慮公司的RPC框架。
- 3、作者給出的
Quartz
排程的不足,筆者得繼續深入瞭解。 - 4、框架中很多對宕機、故障、超時等異常狀況的相容值得學習。
- 5、Rolling日誌以及日誌系統實現需要繼續瞭解。