LTS原理--JobTracker任務接收與分配(三)
JobTracker在這做的是類似路由的作用,JobClient將任務資訊提交到JobTracker,JobTracker將訊息資訊路由分發給TaskTracker進行訊息處理。
在上一篇部落格LTS原理--JobClient提交任務過程(二)中我們已經介紹了JobClient的任務提交,接下來我們看看在JobTracker中對接收到的任務資訊做了什麼處理操作。
一、示例
1、JobTracker服務註冊
(1)zk服務註冊地址
(2)對外服務埠30005
(3)資料相關配置
<bean id="jobTracker" class="com.github.ltsopensource.spring.JobTrackerFactoryBean" init-method="start"> <property name="clusterName" value="test_cluster"/> <property name="registryAddress" value="zookeeper://127.0.0.1:2181"/> <property name="listenPort" value="30005"/> <property name="configs"> <props> <prop key="job.logger">mysql</prop> <prop key="job.queue">mysql</prop> <prop key="jdbc.url">jdbc:mysql://127.0.0.1:3306/lts</prop> <prop key="jdbc.username">root</prop> <prop key="jdbc.password">root</prop> </props> </property> </bean>
(2)Main函式
public class Main {
public static void main(String[] args) {
ApplicationContext context = new ClassPathXmlApplicationContext("/lts-jobtracker.xml");
}
}
在JobTrackerFactoryBean進行初始化時會將服務註冊到zk,對外提供30005服務埠及連線資料庫操作。
二、任務接收
JobTracker提供了一個RemotingDispatcher請求接收分發器,當JobClient提交任務時,會由JobSubmitProcessor進行處理,將接收到的訊息新增到一個訊息佇列中。
由於LTS封裝了Netty、Mina和LTS通訊框架,最最終訊息處理還是呼叫RemotingDispatcher的processRequest方法,接下來我們看看做了什麼處理。
在doBiz中進行最終業務進行處理。
@Override public RemotingCommand processRequest(Channel channel, RemotingCommand request) throws RemotingCommandException { //真的不同的訊息進行不同處理 // 心跳 if (request.getCode() == JobProtos.RequestCode.HEART_BEAT.code()) { offerHandler(channel, request); return RemotingCommand.createResponseCommand(JobProtos.ResponseCode.HEART_BEAT_SUCCESS.code(), ""); } //限流請求 if (reqLimitEnable) { return doBizWithReqLimit(channel, request); } else { //處理具體的業務 return doBiz(channel, request); } }
doBiz中會根據不同的RequestCode選擇不同的處理器RemotingProcessor進行處理,當JobClient提交任務時,此時RemotingProcessor的實現類JobSubmitProcessor對請求進行處理。
private RemotingCommand doBiz(Channel channel, RemotingCommand request) throws RemotingCommandException {
// 其他的請求code
RequestCode code = RequestCode.valueOf(request.getCode());
//根據不同的請求選擇不同的請求處理器
RemotingProcessor processor = processors.get(code);
if (processor == null) {
return RemotingCommand.createResponseCommand(RemotingProtos.ResponseCode.REQUEST_CODE_NOT_SUPPORTED.code(), "request code not supported!");
}
offerHandler(channel, request);
return processor.processRequest(channel, request);
}
在JobSubmitProcessor的processRequest中就是將任務資訊新增到訊息佇列中,並且最終訊息會進行入庫處理,然後建立返回Response。
@Override
public RemotingCommand processRequest(Channel channel, RemotingCommand request) throws RemotingCommandException {
JobSubmitRequest jobSubmitRequest = request.getBody();
JobSubmitResponse jobSubmitResponse = appContext.getCommandBodyWrapper().wrapper(new JobSubmitResponse());
RemotingCommand response;
try {
//將訊息新增到訊息佇列中
appContext.getJobReceiver().receive(jobSubmitRequest);
//返回任務提交成功
response = RemotingCommand.createResponseCommand(
JobProtos.ResponseCode.JOB_RECEIVE_SUCCESS.code(), "job submit success!", jobSubmitResponse);
} catch (JobReceiveException e) {
LOGGER.error("Receive job failed , jobs = " + jobSubmitRequest.getJobs(), e);
jobSubmitResponse.setSuccess(false);
jobSubmitResponse.setMsg(e.getMessage());
jobSubmitResponse.setFailedJobs(e.getJobs());
response = RemotingCommand.createResponseCommand(
JobProtos.ResponseCode.JOB_RECEIVE_FAILED.code(), e.getMessage(), jobSubmitResponse);
}
return response;
}
在JobReceive的receive方法中會進行訊息接收操作,最終將訊息入庫
public void receive(JobSubmitRequest request) throws JobReceiveException {
List<Job> jobs = request.getJobs();
if (CollectionUtils.isEmpty(jobs)) {
return;
}
JobReceiveException exception = null;
for (Job job : jobs) {
try {
//訊息新增到佇列
addToQueue(job, request);
} catch (Exception e) {
if (exception == null) {
exception = new JobReceiveException(e);
}
exception.addJob(job);
}
}
if (exception != null) {
throw exception;
}
}
在addToQueue中將訊息新增並記錄日誌
private JobPo addToQueue(Job job, JobSubmitRequest request) {
JobPo jobPo = null;
boolean success = false;
BizLogCode code = null;
try {
jobPo = JobDomainConverter.convert(job);
if (jobPo == null) {
LOGGER.warn("Job can not be null。{}", job);
return null;
}
if (StringUtils.isEmpty(jobPo.getSubmitNodeGroup())) {
jobPo.setSubmitNodeGroup(request.getNodeGroup());
}
// 設定 jobId
jobPo.setJobId(JobUtils.generateJobId());
// 新增任務
addJob(job, jobPo);
success = true;
code = BizLogCode.SUCCESS;
} catch (DupEntryException e) {
// 已經存在
if (job.isReplaceOnExist()) {
Assert.notNull(jobPo);
success = replaceOnExist(job, jobPo);
code = success ? BizLogCode.DUP_REPLACE : BizLogCode.DUP_FAILED;
} else {
code = BizLogCode.DUP_IGNORE;
LOGGER.info("Job already exist And ignore. nodeGroup={}, {}", request.getNodeGroup(), job);
}
} finally {
if (success) {
stat.incReceiveJobNum();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Receive Job success. {}", job);
}
}
}
// 記錄日誌
jobBizLog(jobPo, code);
return jobPo;
}
根據不同的訊息型別新增訊息到資料庫中
/**
* 新增任務
*/
private void addJob(Job job, JobPo jobPo) throws DupEntryException {
if (job.isCron()) {
addCronJob(jobPo);
} else if (job.isRepeatable()) {
addRepeatJob(jobPo);
} else {
addTriggerTimeJob(jobPo);
}
}
三、任務傳送
TaskTracker通過拉去的方式從JobTracker獲取任務,接下來我們看看JobTracker傳送任務是做了什麼操作。
在RemotingDispatcher中doBiz根據code型別來選擇處理器RemotingProcessor
private RemotingCommand doBiz(Channel channel, RemotingCommand request) throws RemotingCommandException {
// 其他的請求code
RequestCode code = RequestCode.valueOf(request.getCode());
//根據code選擇處理器
RemotingProcessor processor = processors.get(code);
if (processor == null) {
return RemotingCommand.createResponseCommand(RemotingProtos.ResponseCode.REQUEST_CODE_NOT_SUPPORTED.code(), "request code not supported!");
}
offerHandler(channel, request);
return processor.processRequest(channel, request);
}
在JobPullProcessor中processRequest會根據請求內容,選擇將任務分發到對應到TaskTracker中。
@Override
public RemotingCommand processRequest(final Channel ctx, final RemotingCommand request) throws RemotingCommandException {
//獲取請求體
JobPullRequest requestBody = request.getBody();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("taskTrackerNodeGroup:{}, taskTrackerIdentity:{} , availableThreads:{}", requestBody.getNodeGroup(), requestBody.getIdentity(), requestBody.getAvailableThreads());
}
//新增到處理job中
jobPusher.push(requestBody);
return RemotingCommand.createResponseCommand(JobProtos.ResponseCode.JOB_PULL_SUCCESS.code(), "");
}
在JobPusher的push方法中會新起執行緒處理TaskTracker傳送過來的任務請求。
public void push(final JobPullRequest request) {
this.executorService.submit(new Runnable() {
@Override
public void run() {
try {
//新起執行緒處理請求
push0(request);
} catch (Exception e) {
LOGGER.error("Job push failed!", e);
}
}
});
}
在push0中就是根據TaskTracker的可用工作執行緒數,來推送對應的數量的任務
private void push0(final JobPullRequest request) {
//獲取分組資訊
String nodeGroup = request.getNodeGroup();
String identity = request.getIdentity();
// 更新TaskTracker的可用執行緒數
appContext.getTaskTrackerManager().updateTaskTrackerAvailableThreads(nodeGroup,
identity, request.getAvailableThreads(), request.getTimestamp());
//獲取TaskTracker節點資訊
final TaskTrackerNode taskTrackerNode = appContext.getTaskTrackerManager().
getTaskTrackerNode(nodeGroup, identity);
if (taskTrackerNode == null) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("taskTrackerNodeGroup:{}, taskTrackerIdentity:{} , didn't have node.", nodeGroup, identity);
}
return;
}
int availableThread = taskTrackerNode.getAvailableThread().get();
if (availableThread <= 0) {
return;
}
AtomicBoolean pushingFlag = getPushingFlag(taskTrackerNode);
if (pushingFlag.compareAndSet(false, true)) {
try {
final int batchSize = jobPushBatchSize;
int it = availableThread % batchSize == 0 ? availableThread / batchSize : availableThread / batchSize + 1;
final CountDownLatch latch = new CountDownLatch(it);
for (int i = 1; i <= it; i++) {
int size = batchSize;
if (i == it) {
size = availableThread - batchSize * (it - 1);
}
final int finalSize = size;
//建立多個執行緒去推送任務
pushExecutorService.execute(new Runnable() {
@Override
public void run() {
try {
// 推送任務
send(remotingServer, finalSize, taskTrackerNode);
} catch (Throwable t) {
LOGGER.error("Error on Push Job to {}", taskTrackerNode, t);
} finally {
latch.countDown();
}
}
});
}
try {
latch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
DotLogUtils.dot("taskTrackerNodeGroup:{}, taskTrackerIdentity:{} , pushing finished. batchTimes:{}, size:{}", nodeGroup, identity, it, availableThread);
} finally {
pushingFlag.compareAndSet(true, false);
}
}
}
在send方法中,會查詢未處理的任務,將任務資訊推送到TaskTracker中
private JobPushResult send(final RemotingServerDelegate remotingServer, int size, final TaskTrackerNode taskTrackerNode) {
//任務分組
final String nodeGroup = taskTrackerNode.getNodeGroup();
final String identity = taskTrackerNode.getIdentity();
//推送任務
JobSender.SendResult sendResult = appContext.getJobSender().send(nodeGroup, identity, size, new JobSender.SendInvoker() {
@Override
public JobSender.SendResult invoke(final List<JobPo> jobPos) {
// 傳送給TaskTracker執行
JobPushRequest body = appContext.getCommandBodyWrapper().wrapper(new JobPushRequest());
body.setJobMetaList(JobDomainConverter.convert(jobPos));
RemotingCommand commandRequest = RemotingCommand.createRequestCommand(JobProtos.RequestCode.PUSH_JOB.code(), body);
// 是否分發推送任務成功
final Holder<Boolean> pushSuccess = new Holder<Boolean>(false);
final CountDownLatch latch = new CountDownLatch(1);
try {
//建立連線傳送訊息
remotingServer.invokeAsync(taskTrackerNode.getChannel().getChannel(), commandRequest, new AsyncCallback() {
@Override
public void operationComplete(ResponseFuture responseFuture) {
try {
RemotingCommand responseCommand = responseFuture.getResponseCommand();
if (responseCommand == null) {
LOGGER.warn("Job push failed! response command is null!");
return;
}
if (responseCommand.getCode() == JobProtos.ResponseCode.JOB_PUSH_SUCCESS.code()) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Job push success! nodeGroup=" + nodeGroup + ", identity=" + identity + ", jobList=" + JSON.toJSONString(jobPos));
}
pushSuccess.set(true);
stat.incPushJobNum(jobPos.size());
} else if (responseCommand.getCode() == JobProtos.ResponseCode.NO_AVAILABLE_JOB_RUNNER.code()) {
JobPushResponse jobPushResponse = responseCommand.getBody();
if (jobPushResponse != null && CollectionUtils.isNotEmpty(jobPushResponse.getFailedJobIds())) {
// 修復任務
for (String jobId : jobPushResponse.getFailedJobIds()) {
for (JobPo jobPo : jobPos) {
if (jobId.equals(jobPo.getJobId())) {
resumeJob(jobPo);
break;
}
}
}
stat.incPushJobNum(jobPos.size() - jobPushResponse.getFailedJobIds().size());
} else {
stat.incPushJobNum(jobPos.size());
}
pushSuccess.set(true);
}
} finally {
latch.countDown();
}
}
});
} catch (RemotingSendException e) {
LOGGER.error("Remoting send error, jobPos={}", JSON.toJSONObject(jobPos), e);
return new JobSender.SendResult(false, JobPushResult.SENT_ERROR);
}
try {
latch.await(Constants.LATCH_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
throw new RequestTimeoutException(e);
}
if (!pushSuccess.get()) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Job push failed! nodeGroup=" + nodeGroup + ", identity=" + identity + ", jobs=" + JSON.toJSONObject(jobPos));
}
for (JobPo jobPo : jobPos) {
resumeJob(jobPo);
}
return new JobSender.SendResult(false, JobPushResult.SENT_ERROR);
}
return new JobSender.SendResult(true, JobPushResult.SUCCESS);
}
});
return (JobPushResult) sendResult.getReturnValue();
}
在JobSender中呼叫send方法,完成任務推送處理。
public SendResult send(String taskTrackerNodeGroup, String taskTrackerIdentity, int size, SendInvoker invoker) {
//根據任務分鐘獲取任務
List<JobPo> jobPos = fetchJob(taskTrackerNodeGroup, taskTrackerIdentity, size);
if (jobPos.size() == 0) {
return new SendResult(false, JobPushResult.NO_JOB);
}
//呼叫方法推送任務
SendResult sendResult = invoker.invoke(jobPos);
//記錄日誌
if (sendResult.isSuccess()) {
List<JobLogPo> jobLogPos = new ArrayList<JobLogPo>(jobPos.size());
for (JobPo jobPo : jobPos) {
// 記錄日誌
JobLogPo jobLogPo = JobDomainConverter.convertJobLog(jobPo);
jobLogPo.setSuccess(true);
jobLogPo.setLogType(LogType.SENT);
jobLogPo.setLogTime(SystemClock.now());
jobLogPo.setLevel(Level.INFO);
jobLogPos.add(jobLogPo);
}
appContext.getJobLogger().log(jobLogPos);
}
return sendResult;
}
總結:
JobTracker主要兩方面功能
(1)接收JobClient的任務,將任務進行持久化操作
(2)接收TaskTracker的通知,當存在JobClient的任務時將任務資訊推送到TaskTracker中。