原始碼分析ElasticJob任務錯過機制(misfire)與冪等性
任務在排程執行中,由於某種原因未執行完畢,下一次排程任務觸發後,在同一個Job例項中,會出現兩個執行緒處理同一個分片上的資料,這樣就會造成兩個執行緒可能處理到相同的資料。為了避免同一條資料可能會被多次執行的問題,ElasticJob引入冪等機制,確保同一條資料不會再被多個Job同時處理,也避免同一條資料在同一個Job例項的多個執行緒處理。再重申一次ElastciJob的分散式是資料的分散式,一個任務在多個Job例項上執行,每個Job例項處理該Job的部分資料(資料分片)。
本文重點分析ElasticJob是如何做到如下兩點的。
1)ElasticJob如何確保在同一個Job例項中多個執行緒不會處理相同的資料。
2)ElasticJob如何確保資料不會被多個Job例項處理。
為了解決上述這種情況,ElasticJob引入任務錯過補償執行(misfire)與冪等機制(monitorExecution)
1、ElasticJob如何確保在同一個Job例項中多個執行緒不會處理相同的資料。
場景:例如任務排程週期為每5s執行一次,正常每次排程任務處理需要耗時2s,如果在某一段時間由於資料庫壓力變大,導致原本只需要2s就能處理完成的任務,現在需要16s才能執行,在這個資料處理的過程中,每5s又會觸發一次排程(任務處理),如果不加以控制的話,在同一個例項上根據分片條件去查詢資料庫,查詢到的資料有可能相同(部分相同),這樣同一條任務資料將被多次執行,如果這個任務時處理轉賬業務,如果在業務方法不實現冪等,則會引發非常嚴重的問題,那ElasticJob是否可以避免這個問題呢?
答案是肯定。elasticJob提供了一個配置引數:monitorExecution=true,開啟冪等性。
一個任務觸發後,將執行任務處理邏輯,其入口:AbstractElasticJobExecutor#misfireIfRunning
if (jobFacade.misfireIfRunning(shardingContexts.getShardingItemParameters().keySet())) { // @1
if (shardingContexts.isAllowSendJobEvent()) { // @2
jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_FINISHED, String.format(
"Previous job '%s' - shardingItems '%s' is still running, misfired job will start after previous job completed." , jobName,
shardingContexts.getShardingItemParameters().keySet()));
}
return;
}
程式碼@1:在一個排程任務觸發後如果上一次任務還未執行,則需要設定該分片狀態為mirefire,表示錯失了一次任務執行。
程式碼@2:如果該分片被設定為mirefire並開啟了事件跟蹤,將事件跟蹤儲存在資料庫中。
接下來詳細分析JobFacade.misfireIfRunning的實現邏輯:
/**
* 如果當前分片項仍在執行則設定任務被錯過執行的標記.
*
* @param items 需要設定錯過執行的任務分片項
* @return 是否錯過本次執行
*/
public boolean misfireIfHasRunningItems(final Collection<Integer> items) {
if (!hasRunningItems(items)) {
return false;
}
setMisfire(items);
return true;
}
如果存在未完成的分片,則呼叫setMisfire(items)方法,ElasticJob在開啟monitorExecution(true)【冪等機制】機制的情況下,在分片任務開始時會建立${namespace}/jobname/sharding/{item}/running節點,在任務結束後會刪除該目錄,所以在判斷是否有分片正在執行時,只需判斷是否存在上述節點即可。如果存在,呼叫setMisfire方法。
PS:如果ElasticJob為開啟冪等(monitorExecution)的情況下,才會建立\${namespace}/jobname/sharding
/{item}/running,misfire機制才能生效。
ExecutionService#setMisfire
/**
* 設定任務被錯過執行的標記.
*
* @param items 需要設定錯過執行的任務分片項
*/
public void setMisfire(final Collection<Integer> items) {
for (int each : items) {
jobNodeStorage.createJobNodeIfNeeded(ShardingNode.getMisfireNode(each));
}
}
設定misfire的方法為分配給該例項下的所有分片建立持久節點${namespace}/jobname/shading/{item}/misfire節點,注意,只要分配給該例項的任何一分片未執行完畢,則在該例項下的所有分片都增加misfire節點,然後忽略本次任務觸發執行,等待任務結束後再執行。
AbstractElasticJobExecutor#execute
execute(shardingContexts, JobExecutionEvent.ExecutionSource.NORMAL_TRIGGER);
while (jobFacade.isExecuteMisfired(shardingContexts.getShardingItemParameters().keySet())) {
jobFacade.clearMisfire(shardingContexts.getShardingItemParameters().keySet());
execute(shardingContexts, JobExecutionEvent.ExecutionSource.MISFIRE);
}
在任務執行完成後檢查是否存在${namespace}/jobname/sharding/{item}/misfire節點,如果存在,則首先清除misfie相關的檔案,然後執行任務。
ElasticJob的misfire實現方案總結:
在下一個排程週期到達之後,只要發現這個分片的任何一個分片正在執行,則為該例項分片的所有分片都設定為misfire,等任務執行完畢後,再統一執行下一次任務排程。
2、ElasticJob如何確保資料不會被多個Job例項處理
ElasticJob基於資料分片,不同分片根據分片引數(人為配置),從資料庫中查詢各自資料(任務資料分片),如果當節點宕機,資料會重新分片,如果任務未執行完成,然後執行分片,資料是否會被不同的任務同時處理呢?
答案是不會,因為當節點宕機後,是否需要重新分片事件監聽器會監聽到Job例項代表的節點刪除,設定重新分片,在任務被排程執行具體處理邏輯之前,需要重新分片,重新分片的前提又是要所有的分片的任務全部執行完畢,這也依賴是否開啟冪等控制(monitorExecution),如果開啟,ElasticJob能感知正在執行處理邏輯的分片,重新分片需要等待當前所有任務全部執行完畢後才會觸發,故不會存在不同節點處理相同資料的問題。
問答:
1、如果一個任務JOB的排程頻率為每10s一次,在某個時間,該job執行耗時用了33s(平時只需執行5s),按照正常排程,應該後續會觸發3次排程,那該job後執行完,會連續執行3次排程嗎?
答案:在33s這次任務執行完成後,如果後面的任務執行在10s內執行完畢的話,只會觸發一次,不會補償3次,因為ElasticJob記錄任務錯失執行,只是建立了misfire節點,並不會記錄錯失的此時,因為也沒這個必要。