XXL-JOB原理--任務執行(五)
一、任務排程中心傳送任務執行請求
任務傳送執行的操作有兩種:
(1)根據配置的cron表示式週期性執行相關任務
(2)在任務排程中心主動執行任務
在註冊quartz定時任務時已經註冊執行類為RemoteHttpJobBean,所以週期性執行定時任務會呼叫RemoteHttpJobBean的executeInternal方法,在executeInternal中會呼叫JobTriggerPoolHelper.trigger(jobId),通過任務排程中心主動執行任務時也是會呼叫JobTriggerPoolHelper.trigger(jobId)方法,所以接下來我們要看的是JobTriggerPoolHelper.trigger(jobId)中做的邏輯處理就好。
public class RemoteHttpJobBean extends QuartzJobBean { private static Logger logger = LoggerFactory.getLogger(RemoteHttpJobBean.class); @Override protected void executeInternal(JobExecutionContext context) throws JobExecutionException { // load jobId JobKey jobKey = context.getTrigger().getJobKey(); Integer jobId = Integer.valueOf(jobKey.getName()); // trigger //XxlJobTrigger.trigger(jobId); JobTriggerPoolHelper.trigger(jobId); } }
JobTriggerPoolHelper.trigger所做的操作是將任務提交給一個執行緒池(任務排程中心預設開啟50個執行緒),線上程池中呼叫XxlJobTrigger.trigger。
public void addTrigger(final int jobId){ triggerPool.execute(new Runnable() { @Override public void run() { XxlJobTrigger.trigger(jobId); } }); }
在XxlJobTrigger.trigger中會根據jobId獲取任務的基本配置資訊(阻塞策略、路由策略、失敗重試測試、分組伺服器列表等等),然後根據路由策略選擇是廣播還是單播等,接下來就是組裝訊息體呼叫runExecutor方法傳送http請求到任務執行器。
public static void trigger(int jobId) {
//獲取任務資訊
// load data
XxlJobInfo jobInfo = XxlJobDynamicScheduler.xxlJobInfoDao.loadById(jobId); // job info
if (jobInfo == null) {
logger.warn(">>>>>>>>>>>> trigger fail, jobId invalid,jobId={}", jobId);
return;
}
//根據任務的分組資訊找到分組,分組中存在伺服器的IP和埠地址等
XxlJobGroup group = XxlJobDynamicScheduler.xxlJobGroupDao.load(jobInfo.getJobGroup()); // group info
//阻塞策略
ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorBlockStrategyEnum.SERIAL_EXECUTION); // block strategy
//失敗策略
ExecutorFailStrategyEnum failStrategy = ExecutorFailStrategyEnum.match(jobInfo.getExecutorFailStrategy(), ExecutorFailStrategyEnum.NULL); // fail strategy
//執行路由測試
ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null); // route strategy
//伺服器地址
ArrayList<String> addressList = (ArrayList<String>) group.getRegistryList();
//廣播模式
// broadcast
if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum && CollectionUtils.isNotEmpty(addressList)) {
//依次呼叫所有的伺服器
for (int i = 0; i < addressList.size(); i++) {
String address = addressList.get(i);
// 1、save log-id
XxlJobLog jobLog = new XxlJobLog();
jobLog.setJobGroup(jobInfo.getJobGroup());
jobLog.setJobId(jobInfo.getId());
XxlJobDynamicScheduler.xxlJobLogDao.save(jobLog);
logger.debug(">>>>>>>>>>> xxl-job trigger start, jobId:{}", jobLog.getId());
// 2、prepare trigger-info
//jobLog.setExecutorAddress(executorAddress);
jobLog.setGlueType(jobInfo.getGlueType());
jobLog.setExecutorHandler(jobInfo.getExecutorHandler());
jobLog.setExecutorParam(jobInfo.getExecutorParam());
jobLog.setTriggerTime(new Date());
ReturnT<String> triggerResult = new ReturnT<String>(null);
StringBuffer triggerMsgSb = new StringBuffer();
triggerMsgSb.append(I18nUtil.getString("jobconf_trigger_admin_adress")).append(":").append(IpUtil.getIp());
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regtype")).append(":")
.append( (group.getAddressType() == 0)?I18nUtil.getString("jobgroup_field_addressType_0"):I18nUtil.getString("jobgroup_field_addressType_1") );
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regaddress")).append(":").append(group.getRegistryList());
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorRouteStrategy")).append(":").append(executorRouteStrategyEnum.getTitle()).append("("+i+"/"+addressList.size()+")"); // update01
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorBlockStrategy")).append(":").append(blockStrategy.getTitle());
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorFailStrategy")).append(":").append(failStrategy.getTitle());
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_timeout")).append(":").append(jobInfo.getExecutorTimeout());
// 3、trigger-valid
if (triggerResult.getCode()==ReturnT.SUCCESS_CODE && CollectionUtils.isEmpty(addressList)) {
triggerResult.setCode(ReturnT.FAIL_CODE);
triggerMsgSb.append("<br>----------------------<br>").append(I18nUtil.getString("jobconf_trigger_address_empty"));
}
if (triggerResult.getCode() == ReturnT.SUCCESS_CODE) {
// 4.1、trigger-param
TriggerParam triggerParam = new TriggerParam();
triggerParam.setJobId(jobInfo.getId());
triggerParam.setExecutorHandler(jobInfo.getExecutorHandler());
triggerParam.setExecutorParams(jobInfo.getExecutorParam());
triggerParam.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy());
triggerParam.setExecutorTimeout(jobInfo.getExecutorTimeout());
triggerParam.setLogId(jobLog.getId());
triggerParam.setLogDateTim(jobLog.getTriggerTime().getTime());
triggerParam.setGlueType(jobInfo.getGlueType());
triggerParam.setGlueSource(jobInfo.getGlueSource());
triggerParam.setGlueUpdatetime(jobInfo.getGlueUpdatetime().getTime());
triggerParam.setBroadcastIndex(i);
triggerParam.setBroadcastTotal(addressList.size()); // update02
// 4.2、trigger-run (route run / trigger remote executor)
//遠端呼叫服務介面,執行任務
triggerResult = runExecutor(triggerParam, address); // update03
triggerMsgSb.append("<br><br><span style=\"color:#00c0ef;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_run") +"<<<<<<<<<<< </span><br>").append(triggerResult.getMsg());
// 4.3、trigger (fail retry)
if (triggerResult.getCode()!=ReturnT.SUCCESS_CODE && failStrategy == ExecutorFailStrategyEnum.FAIL_TRIGGER_RETRY) {
triggerResult = runExecutor(triggerParam, address); // update04
triggerMsgSb.append("<br><br><span style=\"color:#F39C12;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_fail_trigger_retry") +"<<<<<<<<<<< </span><br>").append(triggerResult.getMsg());
}
}
// 5、save trigger-info
jobLog.setExecutorAddress(triggerResult.getContent());
jobLog.setTriggerCode(triggerResult.getCode());
jobLog.setTriggerMsg(triggerMsgSb.toString());
XxlJobDynamicScheduler.xxlJobLogDao.updateTriggerInfo(jobLog);
// 6、monitor trigger
JobFailMonitorHelper.monitor(jobLog.getId());
logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId());
}
} else {
//單播模式
// 1、save log-id
XxlJobLog jobLog = new XxlJobLog();
jobLog.setJobGroup(jobInfo.getJobGroup());
jobLog.setJobId(jobInfo.getId());
XxlJobDynamicScheduler.xxlJobLogDao.save(jobLog);
logger.debug(">>>>>>>>>>> xxl-job trigger start, jobId:{}", jobLog.getId());
// 2、prepare trigger-info
//jobLog.setExecutorAddress(executorAddress);
jobLog.setGlueType(jobInfo.getGlueType());
jobLog.setExecutorHandler(jobInfo.getExecutorHandler());
jobLog.setExecutorParam(jobInfo.getExecutorParam());
jobLog.setTriggerTime(new Date());
ReturnT<String> triggerResult = new ReturnT<String>(null);
StringBuffer triggerMsgSb = new StringBuffer();
triggerMsgSb.append(I18nUtil.getString("jobconf_trigger_admin_adress")).append(":").append(IpUtil.getIp());
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regtype")).append(":")
.append( (group.getAddressType() == 0)?I18nUtil.getString("jobgroup_field_addressType_0"):I18nUtil.getString("jobgroup_field_addressType_1") );
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regaddress")).append(":").append(group.getRegistryList());
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorRouteStrategy")).append(":").append(executorRouteStrategyEnum.getTitle());
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorBlockStrategy")).append(":").append(blockStrategy.getTitle());
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorFailStrategy")).append(":").append(failStrategy.getTitle());
triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_timeout")).append(":").append(jobInfo.getExecutorTimeout());
// 3、trigger-valid
if (triggerResult.getCode()==ReturnT.SUCCESS_CODE && CollectionUtils.isEmpty(addressList)) {
triggerResult.setCode(ReturnT.FAIL_CODE);
triggerMsgSb.append("<br>----------------------<br>").append(I18nUtil.getString("jobconf_trigger_address_empty"));
}
if (triggerResult.getCode() == ReturnT.SUCCESS_CODE) {
// 4.1、trigger-param
TriggerParam triggerParam = new TriggerParam();
triggerParam.setJobId(jobInfo.getId());
triggerParam.setExecutorHandler(jobInfo.getExecutorHandler());
triggerParam.setExecutorParams(jobInfo.getExecutorParam());
triggerParam.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy());
triggerParam.setExecutorTimeout(jobInfo.getExecutorTimeout());
triggerParam.setLogId(jobLog.getId());
triggerParam.setLogDateTim(jobLog.getTriggerTime().getTime());
triggerParam.setGlueType(jobInfo.getGlueType());
triggerParam.setGlueSource(jobInfo.getGlueSource());
triggerParam.setGlueUpdatetime(jobInfo.getGlueUpdatetime().getTime());
triggerParam.setBroadcastIndex(0);
triggerParam.setBroadcastTotal(1);
// 4.2、trigger-run (route run / trigger remote executor)
//路由後遠端呼叫服務介面,執行任務
triggerResult = executorRouteStrategyEnum.getRouter().routeRun(triggerParam, addressList);
triggerMsgSb.append("<br><br><span style=\"color:#00c0ef;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_run") +"<<<<<<<<<<< </span><br>").append(triggerResult.getMsg());
// 4.3、trigger (fail retry)
if (triggerResult.getCode()!=ReturnT.SUCCESS_CODE && failStrategy == ExecutorFailStrategyEnum.FAIL_TRIGGER_RETRY) {
triggerResult = executorRouteStrategyEnum.getRouter().routeRun(triggerParam, addressList);
triggerMsgSb.append("<br><br><span style=\"color:#F39C12;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_fail_trigger_retry") +"<<<<<<<<<<< </span><br>").append(triggerResult.getMsg());
}
}
// 5、save trigger-info
jobLog.setExecutorAddress(triggerResult.getContent());
jobLog.setTriggerCode(triggerResult.getCode());
jobLog.setTriggerMsg(triggerMsgSb.toString());
XxlJobDynamicScheduler.xxlJobLogDao.updateTriggerInfo(jobLog);
// 6、monitor trigger
JobFailMonitorHelper.monitor(jobLog.getId());
logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId());
}
}
runExecutor方法中根據address伺服器地址,XxlJobDynamicScheduler.getExecutorBiz中會獲取代理類最終呼叫JettyClient的send方法。
public static ReturnT<String> runExecutor(TriggerParam triggerParam, String address){
ReturnT<String> runResult = null;
try {
//獲取代理物件
ExecutorBiz executorBiz = XxlJobDynamicScheduler.getExecutorBiz(address);
//最終呼叫執行
runResult = executorBiz.run(triggerParam);
} catch (Exception e) {
logger.error(">>>>>>>>>>> xxl-job trigger error, please check if the executor[{}] is running.", address, e);
runResult = new ReturnT<String>(ReturnT.FAIL_CODE, ""+e );
}
StringBuffer runResultSB = new StringBuffer(I18nUtil.getString("jobconf_trigger_run") + ":");
runResultSB.append("<br>address:").append(address);
runResultSB.append("<br>code:").append(runResult.getCode());
runResultSB.append("<br>msg:").append(runResult.getMsg());
runResult.setMsg(runResultSB.toString());
runResult.setContent(address);
return runResult;
}
在XxlJobDynamicScheduler的getExecutorBiz中會通過NetComClientProxy生成代理物件,在執行時會呼叫其方法。
public static ExecutorBiz getExecutorBiz(String address) throws Exception {
// valid
if (address==null || address.trim().length()==0) {
return null;
}
// load-cache
address = address.trim();
ExecutorBiz executorBiz = executorBizRepository.get(address);
if (executorBiz != null) {
return executorBiz;
}
// set-cache
executorBiz = (ExecutorBiz) new NetComClientProxy(ExecutorBiz.class, address, accessToken).getObject();
executorBizRepository.put(address, executorBiz);
return executorBiz;
}
getObject中生成代理物件,執行會執行JettyClient的send方法。
@Override
public Object getObject() throws Exception {
return Proxy.newProxyInstance(Thread.currentThread()
.getContextClassLoader(), new Class[] { iface },
new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// filter method like "Object.toString()"
if (Object.class.getName().equals(method.getDeclaringClass().getName())) {
logger.error(">>>>>>>>>>> xxl-rpc proxy class-method not support [{}.{}]", method.getDeclaringClass().getName(), method.getName());
throw new RuntimeException("xxl-rpc proxy class-method not support");
}
// request
RpcRequest request = new RpcRequest();
request.setServerAddress(serverAddress);
request.setCreateMillisTime(System.currentTimeMillis());
request.setAccessToken(accessToken);
request.setClassName(method.getDeclaringClass().getName());
request.setMethodName(method.getName());
request.setParameterTypes(method.getParameterTypes());
request.setParameters(args);
//發起http呼叫,執行任務
// send
RpcResponse response = client.send(request);
// valid response
if (response == null) {
throw new Exception("Network request fail, response not found.");
}
if (response.isError()) {
throw new RuntimeException(response.getError());
} else {
return response.getResult();
}
}
});
}
任務排程中心向任務執行器傳送的任務請求資料如下。
二、任務執行器接收任務執行
在任務執行器會根據內建的jetty提供web服務,提供請求處理器JettyServerHandler接收處理任務排程中心傳送過來的任務
public class JettyServerHandler extends AbstractHandler {
private static Logger logger = LoggerFactory.getLogger(JettyServerHandler.class);
//接收請求,處理任務
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException {
// invoke
//呼叫任務執行
RpcResponse rpcResponse = doInvoke(request);
// serialize response
byte[] responseBytes = HessianSerializer.serialize(rpcResponse);
response.setContentType("text/html;charset=utf-8");
response.setStatus(HttpServletResponse.SC_OK);
baseRequest.setHandled(true);
OutputStream out = response.getOutputStream();
out.write(responseBytes);
out.flush();
}
private RpcResponse doInvoke(HttpServletRequest request) {
try {
// deserialize request
byte[] requestBytes = HttpClientUtil.readBytes(request);
if (requestBytes == null || requestBytes.length==0) {
RpcResponse rpcResponse = new RpcResponse();
rpcResponse.setError("RpcRequest byte[] is null");
return rpcResponse;
}
//反序列化資料
RpcRequest rpcRequest = (RpcRequest) HessianSerializer.deserialize(requestBytes, RpcRequest.class);
// invoke
//通過反射呼叫任務
RpcResponse rpcResponse = NetComServerFactory.invokeService(rpcRequest, null);
return rpcResponse;
} catch (Exception e) {
logger.error(e.getMessage(), e);
RpcResponse rpcResponse = new RpcResponse();
rpcResponse.setError("Server-error:" + e.getMessage());
return rpcResponse;
}
}
}
在invokeService根據傳送過來的類名com.xxl.job.core.biz.ExecutorBiz和方法run,通過反射機制呼叫
public static RpcResponse invokeService(RpcRequest request, Object serviceBean) {
if (serviceBean==null) {
serviceBean = serviceMap.get(request.getClassName());
}
if (serviceBean == null) {
// TODO
}
RpcResponse response = new RpcResponse();
if (System.currentTimeMillis() - request.getCreateMillisTime() > 180000) {
response.setResult(new ReturnT<String>(ReturnT.FAIL_CODE, "The timestamp difference between admin and executor exceeds the limit."));
return response;
}
if (accessToken!=null && accessToken.trim().length()>0 && !accessToken.trim().equals(request.getAccessToken())) {
response.setResult(new ReturnT<String>(ReturnT.FAIL_CODE, "The access token[" + request.getAccessToken() + "] is wrong."));
return response;
}
try {
Class<?> serviceClass = serviceBean.getClass();
String methodName = request.getMethodName();
Class<?>[] parameterTypes = request.getParameterTypes();
Object[] parameters = request.getParameters();
FastClass serviceFastClass = FastClass.create(serviceClass);
FastMethod serviceFastMethod = serviceFastClass.getMethod(methodName, parameterTypes);
Object result = serviceFastMethod.invoke(serviceBean, parameters);
response.setResult(result);
} catch (Throwable t) {
t.printStackTrace();
response.setError(t.getMessage());
}
return response;
}
我們看看com.xxl.job.core.biz.ExecutorBiz的run方法中做了什麼處理操作。
在ExecutorBiz中根據傳送過來的訊息,根據demoJobHandler找到介面的實現類,接下來就可以新起執行緒去執行實現類DemoJobHandler了。
@Override
public ReturnT<String> run(TriggerParam triggerParam) {
// load old:jobHandler + jobThread
//建立執行執行緒
JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId());
//如果存在則直接使用老的執行緒
IJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null;
String removeOldReason = null;
// valid:jobHandler + jobThread
GlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType());
if (GlueTypeEnum.BEAN == glueTypeEnum) {
// new jobhandler
//根據類名找到任務執行類
IJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());
// valid old jobThread
if (jobThread!=null && jobHandler != newJobHandler) {
// change handler, need kill old thread
removeOldReason = "change jobhandler or glue type, and terminate the old job thread.";
jobThread = null;
jobHandler = null;
}
// valid handler
if (jobHandler == null) {
jobHandler = newJobHandler;
if (jobHandler == null) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "job handler [" + triggerParam.getExecutorHandler() + "] not found.");
}
}
} else if (GlueTypeEnum.GLUE_GROOVY == glueTypeEnum) {
// valid old jobThread
if (jobThread != null &&
!(jobThread.getHandler() instanceof GlueJobHandler
&& ((GlueJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) {
// change handler or gluesource updated, need kill old thread
removeOldReason = "change job source or glue type, and terminate the old job thread.";
jobThread = null;
jobHandler = null;
}
// valid handler
if (jobHandler == null) {
try {
IJobHandler originJobHandler = GlueFactory.getInstance().loadNewInstance(triggerParam.getGlueSource());
jobHandler = new GlueJobHandler(originJobHandler, triggerParam.getGlueUpdatetime());
} catch (Exception e) {
logger.error(e.getMessage(), e);
return new ReturnT<String>(ReturnT.FAIL_CODE, e.getMessage());
}
}
} else if (glueTypeEnum!=null && glueTypeEnum.isScript()) {
// valid old jobThread
if (jobThread != null &&
!(jobThread.getHandler() instanceof ScriptJobHandler
&& ((ScriptJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) {
// change script or gluesource updated, need kill old thread
removeOldReason = "change job source or glue type, and terminate the old job thread.";
jobThread = null;
jobHandler = null;
}
// valid handler
if (jobHandler == null) {
jobHandler = new ScriptJobHandler(triggerParam.getJobId(), triggerParam.getGlueUpdatetime(), triggerParam.getGlueSource(), GlueTypeEnum.match(triggerParam.getGlueType()));
}
} else {
return new ReturnT<String>(ReturnT.FAIL_CODE, "glueType[" + triggerParam.getGlueType() + "] is not valid.");
}
// executor block strategy
if (jobThread != null) {
ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null);
if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) {
// discard when running
if (jobThread.isRunningOrHasQueue()) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "block strategy effect:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle());
}
} else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) {
// kill running jobThread
if (jobThread.isRunningOrHasQueue()) {
removeOldReason = "block strategy effect:" + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle();
jobThread = null;
}
} else {
// just queue trigger
}
}
// replace thread (new or exists invalid)
//起執行緒執行任務
if (jobThread == null) {
jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);
}
// push data to queue
ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);
return pushResult;
}
總結:任務執行器提供web服務,任務排程中心根據任務分組及分組伺服器傳送http請求,任務執行器收到請求,根據請求中的資料呼叫對應的任務。