LTS分散式任務排程在專案中的實際應用
阿新 • • 發佈:2019-01-10
公司專案是分散式的,所以定時任務用lts框架,簡單的看看程式碼,分析了一下,找到呼叫流程。
注意:不瞭解lts的先看下這個文件,我也是看完才看懂程式碼
文件地址:https://www.cnblogs.com/dion-90/articles/8674591.html
lts的工作流程
- JobClient 提交一個 任務 給 JobTracker, 這裡我提供了兩種客戶端API, 一種是如果JobTracker 不存在或者提交失敗,直接返回提交失敗。另一種客戶端是重試客戶端, 如果提交失敗,先儲存到本地leveldb(可以使用NFS來達到同個節點組共享leveldb檔案的目的,多執行緒訪問,做了檔案鎖處理),返回給客戶端提交成功的資訊,待JobTracker可用的時候,再將任務提交。
- JobTracker 收到JobClient提交來的任務,先生成一個唯一的JobID。然後將任務儲存在Mongo叢集中。JobTracker 發現有(任務執行的)可用的TaskTracker節點(組) 之後,將優先順序最大,最先提交的任務分發給TaskTracker。這裡JobTracker會優先分配給比較空閒的TaskTracker節點,達到負載均衡。
- TaskTracker 收到JobTracker分發來的任務之後,執行。執行完畢之後,再反饋任務執行結果給JobTracker(成功or 失敗[失敗有失敗錯誤資訊]),如果發現JobTacker不可用,那麼儲存本地leveldb,等待TaskTracker可用的時候再反饋。反饋結果的同時,詢問JobTacker有沒有新的任務要執行。
- JobTacker收到TaskTracker節點的任務結果資訊,生成並插入(mongo)任務執行日誌。根據任務資訊決定要不要反饋給客戶端。不需要反饋的直接刪除, 需要反饋的(同樣JobClient不可用儲存檔案,等待可用重發)。
- JobClient 收到任務執行結果,進行自己想要的邏輯處理。
程式碼分析
以我公司為例, spring cloud裡有兩個服務分別是JobClient
和JobTracker
:
其中task相當於JobClient, job相當於JobTracker
1.task中的程式碼, 專案啟動執行這個方法, 這相當是定製任務的基本資訊:
// task中的程式碼, 專案啟動執行這個方法
/**
* 定時生成 ...資料
*/
public void jobMethod()
{
// 定義個bean, 裡面可以定義屬性
SynMainTainMsgCommand command = new SynMainTainMsgCommand();
// 定義任務物件
Job job = new Job();
// taskId自定義
job.setTaskId("doMainTainProdcut");
// 自定義引數,實際就是HashMap
job.setParam("description", "定時生成 車輛 保養資料");
// 注意這裡傳的commond物件的類全名
job.setParam("command", command.getClass().getName());
job.setTaskTrackerNodeGroup("組名");
job.setNeedFeedback(true);
job.setReplaceOnExist(true); // 當任務佇列中存在這個任務的時候,是否替換更新
job.setCronExpression("cron表示式");
// 提交任務
Response response = jobClient.submitJob(job);
}
2.先把job任務寫了,一會再說怎麼呼叫的
在job服務裡實現相應的任務:
@Component
public class SynMainTainMsgHandler extends BaseCommandHandler<SynMainTainMsgCommand, HttpCommandResultWithData> {
// ...略
public HttpCommandResultWithData handler(SynMainTainMsgCommand command) {
// 具體的任務實現...
}
}
3.關鍵是下一步,job怎麼收到並且執行的
// 1.實現JobRunner的run方法
@JobRunner4TaskTracker
public class JobRunnerImpl implements JobRunner {
@Override
public Result run(JobContext jobContext) throws Throwable {
String successjson;
try {
// 接到引數
String command = jobContext.getJob().getParam("command");
String commandParam = jobContext.getJob().getParam("commandParam");
// 反射找到上面定義的bean
Class clazz = Class.forName(command);
// ...略部分程式碼
Object o = clazz.newInstance();
// 這裡想通過command找handler,並執行
Command.Result handlerresult = dispatch((AbstractCommand)o);
// 這裡只要找到 BaseCommandHandler的handler方法執行就行了,因為上面的SynMainTainMsgHandler繼承了BaseCommandHandler
successjson = JsonUtil.toJson(handlerresult);
return new Result(Action.EXECUTE_SUCCESS, successjson);
}
}
- 這裡只要找到 BaseCommandHandler的handler方法執行就行了,因為上面的SynMainTainMsgHandler繼承了BaseCommandHandler,
BaseCommandHandler繼承CommandHandler。
private <C extends Command<?>, CR extends Command.Result> CR dispatch(C command) {
CommandHandler<C, CR> commandHandler = (CommandHandler)this.handlerByType.get(command.getClass());
return commandHandler.handle(command);
}
省略部分程式碼,實現思路就是通過傳進來的command.className獲取到具體的handler。