1. 程式人生 > 程式設計 >揪出XXL-JOB中的細節

揪出XXL-JOB中的細節

前言

國慶節快到了,要開始休假了,筆者還是很開心的,國慶快樂!

廢話少說,直接進入正題。

相信大家對XXL-JOB都很瞭解,故本文對原始碼不進行過多介紹,側重的是看原始碼過程中想到的幾個知識點,不一定都對,請大神們批評指正。

XXL-JOB簡介

  • XXL-JOB是一個輕量級分散式任務排程平臺,其核心設計目標是開發迅速、學習簡單、輕量級、易擴充套件。現已開放原始碼並接入多家公司線上產品線,開箱即用。
  • XXL-JOB分為排程中心、執行器、資料中心,排程中心負責任務管理及排程、執行器管理、日誌管理等,執行器負責任務執行及執行結果回撥。

任務排程 - “類時間輪”的實現

時間輪

時間輪出自Netty

中的HashedWheelTimer,是一個環形結構,可以用時鐘來類比,鐘面上有很多bucket,每一個bucket上可以存放多個任務,使用一個List儲存該時刻到期的所有任務,同時一個指標隨著時間流逝一格一格轉動,並執行對應bucket上所有到期的任務。任務通過取模決定應該放入哪個bucket。和HashMap的原理類似,newTask對應put,使用List來解決 Hash 衝突。

以上圖為例,假設一個bucket是1秒,則指標轉動一輪表示的時間段為8s,假設當前指標指向 0,此時需要排程一個3s後執行的任務,顯然應該加入到(0+3=3)的方格中,指標再走3s次就可以執行了;如果任務要在10s後執行,應該等指標走完一輪零2格再執行,因此應放入2,同時將round(1)

儲存到任務中。檢查到期任務時只執行round為0的,bucket上其他任務的round減1。

當然,還有優化的“分層時間輪”的實現,請參考https://cnkirito.moe/timer/。

XXL-JOB中的“時間輪”

  • XXL-JOB中的排程方式從Quartz變成了自研排程的方式,很像時間輪,可以理解為有60個bucket且每個bucket為1秒,但是沒有了round的概念。

  • 具體可以看下圖。

  • XXL-JOB中負責任務排程的有兩個執行緒,分別為ringThreadscheduleThread,其作用如下。

1、scheduleThread:對任務資訊進行讀取,預讀未來5s即將觸發的任務,放入時間輪。 2、ringThread:對當前bucket

和前一個bucket中的任務取出並執行。

  • 下面結合原始碼看下,為什麼說是“類時間輪”,關鍵程式碼附上了註解,請大家留意觀看。
// 環狀結構
private volatile static Map<Integer,List<Integer>> ringData = new ConcurrentHashMap<>();

// 任務下次啟動時間(單位為秒) % 60
int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);

// 任務放進時間輪
private void pushTimeRing(int ringSecond,int jobId){
        // push async ring
        List<Integer> ringItemData = ringData.get(ringSecond);
        if (ringItemData == null) {
            ringItemData = new ArrayList<Integer>();
            ringData.put(ringSecond,ringItemData);
        }
        ringItemData.add(jobId);
    }
複製程式碼
// 同時取兩個時間刻度的任務
List<Integer> ringItemData = new ArrayList<>();
int nowSecond = Calendar.getInstance().get(Calendar.SECOND);  
// 避免處理耗時太長,跨過刻度,向前校驗一個刻度;
for (int i = 0; i < 2; i++) {
	List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 );
	if (tmpData != null) {
		ringItemData.addAll(tmpData);
	}
}
// 執行
for (int jobId: ringItemData) {
	JobTriggerPoolHelper.trigger(jobId,TriggerTypeEnum.CRON,-1,null,null);
}
複製程式碼

一致性Hash路由中的Hash演演算法

  • 大家也知道,XXL-JOB在執行任務時,任務具體在哪個執行器上執行是根據路由策略來決定的,其中有一個策略是一致性Hash策略(原始碼在ExecutorRouteConsistentHash.java),自然而然想到了一致性Hash演演算法
  • 一致性Hash演演算法是為瞭解決分散式系統中負載均衡的問題時候可以使用Hash演演算法讓固定的一部分請求落到同一臺伺服器上,這樣每臺伺服器固定處理一部分請求(並維護這些請求的資訊),起到負載均衡的作用。
  • 普通的餘數hash(hash(比如使用者id)%伺服器機器數)演演算法伸縮性很差,當新增或者下線伺服器機器時候,使用者id與伺服器的對映關係會大量失效。一致性hash則利用hash環對其進行了改進。
  • 一致性Hash演演算法在實踐中,當伺服器節點比較少的時候會出現上節所說的一致性hash傾斜的問題,一個解決方法是多加機器,但是加機器是有成本的,那麼就加虛擬節點
  • 具體原理請參考https://www.jianshu.com/p/e968c081f563。
  • 下圖為帶有虛擬節點的Hash環,其中ip1-1是ip1的虛擬節點,ip2-1是ip2的虛擬節點,ip3-1是ip3的虛擬節點。

可見,一致性Hash演演算法的關鍵在於Hash演演算法,保證虛擬節點Hash結果的均勻性, 而均勻性可以理解為減少Hash衝突,Hash衝突的知識點請參考從HashMap,Redis 字典看【Hash】。。。

  • XXL-JOB中的一致性Hash的Hash函式如下。
// jobId轉換為md5
// 不直接用hashCode() 是因為擴大hash取值範圍,減少衝突
byte[] digest = md5.digest();

// 32位hashCode
long hashCode = ((long) (digest[3] & 0xFF) << 24)
	| ((long) (digest[2] & 0xFF) << 16)
	| ((long) (digest[1] & 0xFF) << 8)
	| (digest[0] & 0xFF);

long truncateHashCode = hashCode & 0xffffffffL;
複製程式碼
  • 看到上圖的Hash函式,讓我想到了HashMap的Hash函式
f(key) = hash(key) & (table.length - 1) 
// 使用>>> 16的原因,hashCode()的高位和低位都對f(key)有了一定影響力,使得分佈更加均勻,雜湊衝突的機率就小了。
hash(key) = (h = key.hashCode()) ^ (h >>> 16)
複製程式碼
  • 同理,將jobId的md5編碼的高低位都對Hash結果有影響,使得Hash衝突的概率減小。

分片任務的實現 - 維護執行緒上下文

  • XXL-JOB的分片任務實現了任務的分散式執行,其實是筆者調研的重點,日常開發中很多定時任務都是單機執行,對於後續資料量大的任務最好有一個分散式的解決方案。

  • 分片任務的路由策略,原始碼作者提出了分片廣播的概念,剛開始還有點摸不清頭腦,看了原始碼逐漸清晰了起來。

  • 想必看過原始碼的也遇到過這麼一個小插曲,路由策略咋沒實現?如下圖所示。

public enum ExecutorRouteStrategyEnum {

    FIRST(I18nUtil.getString("jobconf_route_first"),new ExecutorRouteFirst()),LAST(I18nUtil.getString("jobconf_route_last"),new ExecutorRouteLast()),ROUND(I18nUtil.getString("jobconf_route_round"),new ExecutorRouteRound()),RANDOM(I18nUtil.getString("jobconf_route_random"),new ExecutorRouteRandom()),CONSISTENT_HASH(I18nUtil.getString("jobconf_route_consistenthash"),new ExecutorRouteConsistentHash()),LEAST_FREQUENTLY_USED(I18nUtil.getString("jobconf_route_lfu"),new ExecutorRouteLFU()),LEAST_RECENTLY_USED(I18nUtil.getString("jobconf_route_lru"),new ExecutorRouteLRU()),FAILOVER(I18nUtil.getString("jobconf_route_failover"),new ExecutorRouteFailover()),BUSYOVER(I18nUtil.getString("jobconf_route_busyover"),new ExecutorRouteBusyover()),// 說好的實現呢???竟然是null
    SHARDING_BROADCAST(I18nUtil.getString("jobconf_route_shard"),null);
複製程式碼
  • 再繼續追查得到了結論,待我慢慢道來,首先分片任務執行引數傳遞的是什麼?看XxlJobTrigger.trigger函式中的一段程式碼。
...
// 如果是分片路由,走的是這段邏輯
if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(),null)
                && group.getRegistryList() != null && !group.getRegistryList().isEmpty()
                && shardingParam == null) {
            for (int i = 0; i < group.getRegistryList().size(); i++) {
	            // 最後兩個引數,i是當前機器在執行器叢集當中的index,group.getRegistryList().size()為執行器總數
                processTrigger(group,jobInfo,finalFailRetryCount,triggerType,i,group.getRegistryList().size());
            }
        } 
...
複製程式碼
  • 引數經過自研RPC傳遞到執行器,在執行器中具體負責任務執行的JobThread.run中,看到了如下程式碼。
// 分片廣播的引數比set進了ShardingUtil
ShardingUtil.setShardingVo(new ShardingUtil.ShardingVO(triggerParam.getBroadcastIndex(),triggerParam.getBroadcastTotal()));
...
// 將執行引數傳遞給jobHandler執行
handler.execute(triggerParamTmp.getExecutorParams())
複製程式碼
  • 接著看ShardingUtil,才發現了其中的奧祕,請看程式碼。
public class ShardingUtil {
	// 執行緒上下文
    private static InheritableThreadLocal<ShardingVO> contextHolder = new InheritableThreadLocal<ShardingVO>();
	// 分片引數物件
    public static class ShardingVO {

        private int index;  // sharding index
        private int total;  // sharding total
		// 次數省略 get/set
    }
	// 引數物件注入上下文
    public static void setShardingVo(ShardingVO shardingVo){
        contextHolder.set(shardingVo);
    }
	// 從上下文中取出引數物件
    public static ShardingVO getShardingVo(){
        return contextHolder.get();
    }

}
複製程式碼
  • 顯而易見,在負責分片任務的ShardingJobHandler裡取出了執行緒上下文中的分片引數,這裡也給個程式碼把~
@JobHandler(value="shardingJobHandler")
@Service
public class ShardingJobHandler extends IJobHandler {

	@Override
	public ReturnT<String> execute(String param) throws Exception {

		// 分片引數
		ShardingUtil.ShardingVO shardingVO = ShardingUtil.getShardingVo();
		XxlJobLogger.log("分片引數:當前分片序號 = {},總分片數 = {}",shardingVO.getIndex(),shardingVO.getTotal());

		// 業務邏輯
		for (int i = 0; i < shardingVO.getTotal(); i++) {
			if (i == shardingVO.getIndex()) {
				XxlJobLogger.log("第 {} 片,命中分片開始處理",i);
			} else {
				XxlJobLogger.log("第 {} 片,忽略",i);
			}
		}

		return SUCCESS;
	}

}
複製程式碼
  • 由此得出,分散式實現是根據分片引數indextotal來做的,簡單來講,就是給出了當前執行器的標識,根據這個標識將任務的資料或者邏輯進行區分,即可實現分散式執行。
  • 題外話:至於為什麼用外部注入分片引數的方式,不直接execute傳遞?

1、可能是因為只有分片任務才用到這兩個引數 2、IJobHandler只有String型別引數

看完原始碼後的思考

  • 1、經過此次看原始碼,XXL-JOB的設計目標確實符合開發迅速、學習簡單、輕量級、易擴充套件
  • 2、至於自研RPC還沒有具體考量,具體接入應該會考慮公司的RPC框架。
  • 3、作者給出的Quartz排程的不足,筆者得繼續深入瞭解。
  • 4、框架中很多對宕機、故障、超時等異常狀況的相容值得學習。
  • 5、Rolling日誌以及日誌系統實現需要繼續瞭解。

參考文獻