1. 程式人生 > >LTS原理--JobTracker任務接收與分配(三)

LTS原理--JobTracker任務接收與分配(三)

      JobTracker在這做的是類似路由的作用,JobClient將任務資訊提交到JobTracker,JobTracker將訊息資訊路由分發給TaskTracker進行訊息處理。

        在上一篇部落格LTS原理--JobClient提交任務過程(二)中我們已經介紹了JobClient的任務提交,接下來我們看看在JobTracker中對接收到的任務資訊做了什麼處理操作。

一、示例

1、JobTracker服務註冊

(1)zk服務註冊地址

(2)對外服務埠30005

(3)資料相關配置

<bean id="jobTracker" class="com.github.ltsopensource.spring.JobTrackerFactoryBean" init-method="start">
        <property name="clusterName" value="test_cluster"/>
        <property name="registryAddress" value="zookeeper://127.0.0.1:2181"/>
        <property name="listenPort" value="30005"/>
        <property name="configs">
            <props>
                <prop key="job.logger">mysql</prop>
                <prop key="job.queue">mysql</prop>
                <prop key="jdbc.url">jdbc:mysql://127.0.0.1:3306/lts</prop>
                <prop key="jdbc.username">root</prop>
                <prop key="jdbc.password">root</prop>
            </props>
        </property>
    </bean>

(2)Main函式

public class Main {

    public static void main(String[] args) {
        ApplicationContext context = new ClassPathXmlApplicationContext("/lts-jobtracker.xml");
    }

}

 在JobTrackerFactoryBean進行初始化時會將服務註冊到zk,對外提供30005服務埠及連線資料庫操作。

二、任務接收

        JobTracker提供了一個RemotingDispatcher請求接收分發器,當JobClient提交任務時,會由JobSubmitProcessor進行處理,將接收到的訊息新增到一個訊息佇列中。

由於LTS封裝了Netty、Mina和LTS通訊框架,最最終訊息處理還是呼叫RemotingDispatcher的processRequest方法,接下來我們看看做了什麼處理。

在doBiz中進行最終業務進行處理。

@Override
    public RemotingCommand processRequest(Channel channel, RemotingCommand request) throws RemotingCommandException {
        //真的不同的訊息進行不同處理
        // 心跳
        if (request.getCode() == JobProtos.RequestCode.HEART_BEAT.code()) {
            offerHandler(channel, request);
            return RemotingCommand.createResponseCommand(JobProtos.ResponseCode.HEART_BEAT_SUCCESS.code(), "");
        }
        //限流請求
        if (reqLimitEnable) {
            return doBizWithReqLimit(channel, request);
        } else {
        //處理具體的業務
            return doBiz(channel, request);
        }
    }

 doBiz中會根據不同的RequestCode選擇不同的處理器RemotingProcessor進行處理,當JobClient提交任務時,此時RemotingProcessor的實現類JobSubmitProcessor對請求進行處理。

private RemotingCommand doBiz(Channel channel, RemotingCommand request) throws RemotingCommandException {
        // 其他的請求code
        RequestCode code = RequestCode.valueOf(request.getCode());
        //根據不同的請求選擇不同的請求處理器
        RemotingProcessor processor = processors.get(code);
        if (processor == null) {
            return RemotingCommand.createResponseCommand(RemotingProtos.ResponseCode.REQUEST_CODE_NOT_SUPPORTED.code(), "request code not supported!");
        }
        offerHandler(channel, request);
        return processor.processRequest(channel, request);
    }

在JobSubmitProcessor的processRequest中就是將任務資訊新增到訊息佇列中,並且最終訊息會進行入庫處理,然後建立返回Response。

@Override
    public RemotingCommand processRequest(Channel channel, RemotingCommand request) throws RemotingCommandException {

        JobSubmitRequest jobSubmitRequest = request.getBody();

        JobSubmitResponse jobSubmitResponse = appContext.getCommandBodyWrapper().wrapper(new JobSubmitResponse());
        RemotingCommand response;
        try {
            //將訊息新增到訊息佇列中
            appContext.getJobReceiver().receive(jobSubmitRequest);
            //返回任務提交成功
            response = RemotingCommand.createResponseCommand(
                    JobProtos.ResponseCode.JOB_RECEIVE_SUCCESS.code(), "job submit success!", jobSubmitResponse);

        } catch (JobReceiveException e) {
            LOGGER.error("Receive job failed , jobs = " + jobSubmitRequest.getJobs(), e);
            jobSubmitResponse.setSuccess(false);
            jobSubmitResponse.setMsg(e.getMessage());
            jobSubmitResponse.setFailedJobs(e.getJobs());
            response = RemotingCommand.createResponseCommand(
                    JobProtos.ResponseCode.JOB_RECEIVE_FAILED.code(), e.getMessage(), jobSubmitResponse);
        }

        return response;
    }

在JobReceive的receive方法中會進行訊息接收操作,最終將訊息入庫

public void receive(JobSubmitRequest request) throws JobReceiveException {

        List<Job> jobs = request.getJobs();
        if (CollectionUtils.isEmpty(jobs)) {
            return;
        }
        JobReceiveException exception = null;
        for (Job job : jobs) {
            try {
                //訊息新增到佇列
                addToQueue(job, request);
            } catch (Exception e) {
                if (exception == null) {
                    exception = new JobReceiveException(e);
                }
                exception.addJob(job);
            }
        }

        if (exception != null) {
            throw exception;
        }
    }

在addToQueue中將訊息新增並記錄日誌

 private JobPo addToQueue(Job job, JobSubmitRequest request) {

        JobPo jobPo = null;
        boolean success = false;
        BizLogCode code = null;
        try {
            jobPo = JobDomainConverter.convert(job);
            if (jobPo == null) {
                LOGGER.warn("Job can not be null。{}", job);
                return null;
            }
            if (StringUtils.isEmpty(jobPo.getSubmitNodeGroup())) {
                jobPo.setSubmitNodeGroup(request.getNodeGroup());
            }
            // 設定 jobId
            jobPo.setJobId(JobUtils.generateJobId());

            // 新增任務
            addJob(job, jobPo);

            success = true;
            code = BizLogCode.SUCCESS;

        } catch (DupEntryException e) {
            // 已經存在
            if (job.isReplaceOnExist()) {
                Assert.notNull(jobPo);
                success = replaceOnExist(job, jobPo);
                code = success ? BizLogCode.DUP_REPLACE : BizLogCode.DUP_FAILED;
            } else {
                code = BizLogCode.DUP_IGNORE;
                LOGGER.info("Job already exist And ignore. nodeGroup={}, {}", request.getNodeGroup(), job);
            }
        } finally {
            if (success) {
                stat.incReceiveJobNum();
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("Receive Job success. {}", job);
                }
            }
        }

        // 記錄日誌
        jobBizLog(jobPo, code);

        return jobPo;
    }

根據不同的訊息型別新增訊息到資料庫中

/**
     * 新增任務
     */
    private void addJob(Job job, JobPo jobPo) throws DupEntryException {
        if (job.isCron()) {
            addCronJob(jobPo);
        } else if (job.isRepeatable()) {
            addRepeatJob(jobPo);
        } else {
            addTriggerTimeJob(jobPo);
        }
    }

三、任務傳送

TaskTracker通過拉去的方式從JobTracker獲取任務,接下來我們看看JobTracker傳送任務是做了什麼操作。

在RemotingDispatcher中doBiz根據code型別來選擇處理器RemotingProcessor

private RemotingCommand doBiz(Channel channel, RemotingCommand request) throws RemotingCommandException {
        // 其他的請求code
        RequestCode code = RequestCode.valueOf(request.getCode());
        //根據code選擇處理器
        RemotingProcessor processor = processors.get(code);
        if (processor == null) {
            return RemotingCommand.createResponseCommand(RemotingProtos.ResponseCode.REQUEST_CODE_NOT_SUPPORTED.code(), "request code not supported!");
        }
        offerHandler(channel, request);
        return processor.processRequest(channel, request);
    }

在JobPullProcessor中processRequest會根據請求內容,選擇將任務分發到對應到TaskTracker中。

 @Override
    public RemotingCommand processRequest(final Channel ctx, final RemotingCommand request) throws RemotingCommandException {
        //獲取請求體
        JobPullRequest requestBody = request.getBody();

        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("taskTrackerNodeGroup:{}, taskTrackerIdentity:{} , availableThreads:{}", requestBody.getNodeGroup(), requestBody.getIdentity(), requestBody.getAvailableThreads());
        }
        //新增到處理job中
        jobPusher.push(requestBody);

        return RemotingCommand.createResponseCommand(JobProtos.ResponseCode.JOB_PULL_SUCCESS.code(), "");
    }

在JobPusher的push方法中會新起執行緒處理TaskTracker傳送過來的任務請求。

public void push(final JobPullRequest request) {

        this.executorService.submit(new Runnable() {
            @Override
            public void run() {
                try {
                    //新起執行緒處理請求
                    push0(request);
                } catch (Exception e) {
                    LOGGER.error("Job push failed!", e);
                }
            }
        });
    }

在push0中就是根據TaskTracker的可用工作執行緒數,來推送對應的數量的任務

private void push0(final JobPullRequest request) {

		//獲取分組資訊
        String nodeGroup = request.getNodeGroup();
        String identity = request.getIdentity();
        // 更新TaskTracker的可用執行緒數
        appContext.getTaskTrackerManager().updateTaskTrackerAvailableThreads(nodeGroup,
                identity, request.getAvailableThreads(), request.getTimestamp());

		//獲取TaskTracker節點資訊
        final TaskTrackerNode taskTrackerNode = appContext.getTaskTrackerManager().
                getTaskTrackerNode(nodeGroup, identity);

        if (taskTrackerNode == null) {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("taskTrackerNodeGroup:{}, taskTrackerIdentity:{} , didn't have node.", nodeGroup, identity);
            }
            return;
        }

        int availableThread = taskTrackerNode.getAvailableThread().get();
        if (availableThread <= 0) {
            return;
        }

        AtomicBoolean pushingFlag = getPushingFlag(taskTrackerNode);
        if (pushingFlag.compareAndSet(false, true)) {
            try {
                final int batchSize = jobPushBatchSize;

                int it = availableThread % batchSize == 0 ? availableThread / batchSize : availableThread / batchSize + 1;

                final CountDownLatch latch = new CountDownLatch(it);

                for (int i = 1; i <= it; i++) {
                    int size = batchSize;
                    if (i == it) {
                        size = availableThread - batchSize * (it - 1);
                    }
                    final int finalSize = size;
					//建立多個執行緒去推送任務
                    pushExecutorService.execute(new Runnable() {
                        @Override
                        public void run() {
                            try {
                                // 推送任務
                                send(remotingServer, finalSize, taskTrackerNode);
                            } catch (Throwable t) {
                                LOGGER.error("Error on Push Job to {}", taskTrackerNode, t);
                            } finally {
                                latch.countDown();
                            }
                        }
                    });
                }

                try {
                    latch.await();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                DotLogUtils.dot("taskTrackerNodeGroup:{}, taskTrackerIdentity:{} , pushing finished. batchTimes:{}, size:{}", nodeGroup, identity, it, availableThread);
            } finally {
                pushingFlag.compareAndSet(true, false);
            }
        }
    }

在send方法中,會查詢未處理的任務,將任務資訊推送到TaskTracker中

private JobPushResult send(final RemotingServerDelegate remotingServer, int size, final TaskTrackerNode taskTrackerNode) {

		//任務分組
        final String nodeGroup = taskTrackerNode.getNodeGroup();
        final String identity = taskTrackerNode.getIdentity();
		//推送任務
        JobSender.SendResult sendResult = appContext.getJobSender().send(nodeGroup, identity, size, new JobSender.SendInvoker() {
            @Override
            public JobSender.SendResult invoke(final List<JobPo> jobPos) {

                // 傳送給TaskTracker執行
                JobPushRequest body = appContext.getCommandBodyWrapper().wrapper(new JobPushRequest());
                body.setJobMetaList(JobDomainConverter.convert(jobPos));
                RemotingCommand commandRequest = RemotingCommand.createRequestCommand(JobProtos.RequestCode.PUSH_JOB.code(), body);

                // 是否分發推送任務成功
                final Holder<Boolean> pushSuccess = new Holder<Boolean>(false);

                final CountDownLatch latch = new CountDownLatch(1);
                try {
					//建立連線傳送訊息
                    remotingServer.invokeAsync(taskTrackerNode.getChannel().getChannel(), commandRequest, new AsyncCallback() {
                        @Override
                        public void operationComplete(ResponseFuture responseFuture) {
                            try {
                                RemotingCommand responseCommand = responseFuture.getResponseCommand();
                                if (responseCommand == null) {
                                    LOGGER.warn("Job push failed! response command is null!");
                                    return;
                                }
                                if (responseCommand.getCode() == JobProtos.ResponseCode.JOB_PUSH_SUCCESS.code()) {
                                    if (LOGGER.isDebugEnabled()) {
                                        LOGGER.debug("Job push success! nodeGroup=" + nodeGroup + ", identity=" + identity + ", jobList=" + JSON.toJSONString(jobPos));
                                    }
                                    pushSuccess.set(true);
                                    stat.incPushJobNum(jobPos.size());
                                } else if (responseCommand.getCode() == JobProtos.ResponseCode.NO_AVAILABLE_JOB_RUNNER.code()) {
                                    JobPushResponse jobPushResponse = responseCommand.getBody();
                                    if (jobPushResponse != null && CollectionUtils.isNotEmpty(jobPushResponse.getFailedJobIds())) {
                                        // 修復任務
                                        for (String jobId : jobPushResponse.getFailedJobIds()) {
                                            for (JobPo jobPo : jobPos) {
                                                if (jobId.equals(jobPo.getJobId())) {
                                                    resumeJob(jobPo);
                                                    break;
                                                }
                                            }
                                        }
                                        stat.incPushJobNum(jobPos.size() - jobPushResponse.getFailedJobIds().size());
                                    } else {
                                        stat.incPushJobNum(jobPos.size());
                                    }
                                    pushSuccess.set(true);
                                }

                            } finally {
                                latch.countDown();
                            }
                        }
                    });

                } catch (RemotingSendException e) {
                    LOGGER.error("Remoting send error, jobPos={}", JSON.toJSONObject(jobPos), e);
                    return new JobSender.SendResult(false, JobPushResult.SENT_ERROR);
                }

                try {
                    latch.await(Constants.LATCH_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
                } catch (InterruptedException e) {
                    throw new RequestTimeoutException(e);
                }

                if (!pushSuccess.get()) {
                    if (LOGGER.isDebugEnabled()) {
                        LOGGER.debug("Job push failed! nodeGroup=" + nodeGroup + ", identity=" + identity + ", jobs=" + JSON.toJSONObject(jobPos));
                    }
                    for (JobPo jobPo : jobPos) {
                        resumeJob(jobPo);
                    }
                    return new JobSender.SendResult(false, JobPushResult.SENT_ERROR);
                }

                return new JobSender.SendResult(true, JobPushResult.SUCCESS);
            }
        });

        return (JobPushResult) sendResult.getReturnValue();
    }

在JobSender中呼叫send方法,完成任務推送處理。

public SendResult send(String taskTrackerNodeGroup, String taskTrackerIdentity, int size, SendInvoker invoker) {

		//根據任務分鐘獲取任務
        List<JobPo> jobPos = fetchJob(taskTrackerNodeGroup, taskTrackerIdentity, size);
        if (jobPos.size() == 0) {
            return new SendResult(false, JobPushResult.NO_JOB);
        }
		//呼叫方法推送任務
        SendResult sendResult = invoker.invoke(jobPos);
		
		//記錄日誌
        if (sendResult.isSuccess()) {
            List<JobLogPo> jobLogPos = new ArrayList<JobLogPo>(jobPos.size());
            for (JobPo jobPo : jobPos) {
                // 記錄日誌
                JobLogPo jobLogPo = JobDomainConverter.convertJobLog(jobPo);
                jobLogPo.setSuccess(true);
                jobLogPo.setLogType(LogType.SENT);
                jobLogPo.setLogTime(SystemClock.now());
                jobLogPo.setLevel(Level.INFO);
                jobLogPos.add(jobLogPo);
            }
            appContext.getJobLogger().log(jobLogPos);
        }
        return sendResult;
    }

總結:

JobTracker主要兩方面功能

(1)接收JobClient的任務,將任務進行持久化操作

(2)接收TaskTracker的通知,當存在JobClient的任務時將任務資訊推送到TaskTracker中。