1. 程式人生 > >分散式任務排程平臺XXL-JOB--原始碼解析三:xxl-job-admin排程中心原始碼解析之初始化兩個Thread工作執行緒

分散式任務排程平臺XXL-JOB--原始碼解析三:xxl-job-admin排程中心原始碼解析之初始化兩個Thread工作執行緒

 xxl-job-admin初始化工作

1.1 啟動時, 載入applicationcontext-xxl-job-admin.xml, 初始化spring容器

<!-- 這個排程中心,在啟動的時候,會做很多初始化的工作 ,比如:執行器資訊,註冊機器列表等資訊 -->
	<bean id="xxlJobDynamicScheduler" class="com.xxl.job.admin.core.schedule.XxlJobDynamicScheduler" init-method="init" destroy-method="destroy" >
		<!-- (輕易不要變更“排程器名稱”, 任務建立時會繫結該“排程器名稱”) -->
		<property name="scheduler" ref="quartzScheduler"/>
		<property name="accessToken" value="${xxl.job.accessToken}" />
	</bean>

Spring容器在初始化xxlJobDynamicScheduler時, 會呼叫inti-method指定的init()方法.

1.2 xxlJobDynamicScheduler類init()分析

 public void init() throws Exception {
         // admin排程中心專門開啟一個自動註冊執行緒registryThread, 獲取型別為自動註冊的執行器資訊,完成executor執行器的自動註冊與發現
        // 1 用來監控自動註冊上來的機器,達到自動註冊的目的
        // 2 admin排程中心的自動註冊就是, 將executor端每30秒心跳傳送過來的address寫入registry表,
        // 然後就是開啟一個自動registry執行緒, 每隔30s, 將registry表資料寫入到group表.
        // admin registry monitor run
        JobRegistryMonitorHelper.getInstance().start();

        // 啟動失敗日誌監控執行緒
        // 2 監控任務的執行狀態, 如若失敗,則傳送郵件預警
        // admin monitor run
        JobFailMonitorHelper.getInstance().start();

        NetComServerFactory.putService(AdminBiz.class, XxlJobDynamicScheduler.adminBiz);
        NetComServerFactory.setAccessToken(accessToken);

        // valid
        Assert.notNull(scheduler, "quartz scheduler is null");
        logger.info(">>>>>>>>> init xxl-job admin success.");
    }

上述程式碼片段中, 主要是開啟了兩個執行緒:

JobRegistryMonitorHelper.getInstance().start(); // registryThread執行緒: 用於發現註冊上來的executor執行器

JobFailMonitorHelper.getInstance().start(); // monitorThread執行緒: 查詢資料庫log表, 根據trigger_code欄位值(200成功, 500失敗), 用於監控executor執行器本次job執行是否成功.

NetComServerFactory.putService(AdminBiz.class, XxlJobDynamicScheduler.adminBiz); // 將Spring容器初始化的adminBizImpl物件放入本地快取
NetComServerFactory.setAccessToken(accessToken);// 設定訪問AccessToken

1.3 具體分析admin排程中心是如何實現對executor執行器暴露服務的自動發現 -- registry執行緒

public void start() {
        registryThread = new Thread(new Runnable() {

            @Override
            public void run() {
                while (!toStop) {
                    try {

                        // 查詢自動註冊的,address_type欄位型別為0的即為自動註冊的
                        // auto registry group
                        List<XxlJobGroup> groupList = XxlJobDynamicScheduler.xxlJobGroupDao.findByAddressType(0);
                        // groupList = [{"addressType":0,"appName":"xiongxianze-test1","id":2,"order":2,"title":"新分組,
                        // 註冊方式:自動"}]>
                        logger.info("==>groupList : {}", JSON.toJSONString(groupList));
                        if (CollectionUtils.isNotEmpty(groupList)) {

                            // 刪除 90秒之內沒有更新資訊的註冊機器, 90秒沒有心跳資訊返回,代表機器已經出現問題,故移除
                            // remove dead address (admin/executor)
                            XxlJobDynamicScheduler.xxlJobRegistryDao.removeDead(RegistryConfig.DEAD_TIMEOUT);

                            // fresh online address (admin/executor)
                            HashMap<String, List<String>> appAddressMap = new HashMap<String, List<String>>();
                            // 正常心跳的executor端
                            List<XxlJobRegistry> list = XxlJobDynamicScheduler.xxlJobRegistryDao.findAll(RegistryConfig.DEAD_TIMEOUT);
                            // list =
                            // [{"id":7,"registryGroup":"EXECUTOR","registryKey":"local-execute","registryValue":"10.0.254.5:9999","updateTime":1539711248000}]>
                            logger.info("==>fresh online address list:{}", JSON.toJSONString(list));

                            if (list != null) {
                                for (XxlJobRegistry item : list) {
                                    // 判斷該機器註冊資訊RegistryGroup ,RegistType 是否是EXECUTOR 
                                   
                                    if (RegistryConfig.RegistType.EXECUTOR.name().equals(item.getRegistryGroup())) {
                                        // appName = local-execute
                                        String appName = item.getRegistryKey();
                                        // 獲取註冊的執行器 KEY (也就是執行器)
                                        List<String> registryList = appAddressMap.get(appName);
                                        // registryList1 = null
                                        logger.info("==>registryList1 :{}", JSON.toJSONString(registryList));
                                        if (registryList == null) {
                                            registryList = new ArrayList<String>();
                                        }

                                        if (!registryList.contains(item.getRegistryValue())) {
                                            registryList.add(item.getRegistryValue());
                                        }
                                        // registryList2 = ["10.0.254.5:9999"]
                                        logger.info("==>registryList2 :{}", JSON.toJSONString(registryList));
                                        // local-execute : ["10.0.254.5:9999"]
                                        appAddressMap.put(appName, registryList);
                                    }
                                }
                            }

                            // fresh group address
                            for (XxlJobGroup group : groupList) {
                                // xiongxianze-test1
                                // 從這裡就可以看出來, xxl.job.executor.appname properties檔案中該項配置,
                                // 必須與admin端管理執行器時AppName一致
                                List<String> registryList = appAddressMap.get(group.getAppName());
                                String addressListStr = null;
                                // 判斷是否已經有了address, 有了的話, 使用"," 拼接addressList
                                if (CollectionUtils.isNotEmpty(registryList)) {
                                    Collections.sort(registryList);
                                    addressListStr = StringUtils.join(registryList, ",");
                                }
                                // 所以addressListStr = null
                                group.setAddressList(addressListStr);
                                // group = {"addressType":0,"appName":"xiongxianze-test1","id":2,"order":2,"title":"新分組,
                                // 註冊方式:自動"}>
                                logger.info("==> group : {}", JSON.toJSONString(group));
                                XxlJobDynamicScheduler.xxlJobGroupDao.update(group);
                            }
                        }
                    } catch (Exception e) {
                        logger.error("job registry instance error:{}", e);
                    }
                    try {
                        TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
                    } catch (InterruptedException e) {
                        logger.error("job registry instance error:{}", e);
                    }
                }
            }
        });
        registryThread.setDaemon(true);
        registryThread.start();
    }

上述程式碼片段主要的流程就是:

①XxlJobDynamicScheduler.xxlJobGroupDao.findByAddressType(0);//從資料庫中查詢address_type=0(即自動註冊)的全部分組資訊

②XxlJobDynamicScheduler.xxlJobRegistryDao.removeDead(RegistryConfig.DEAD_TIMEOUT);//刪除資料庫中該xxl_job_qrtz_trigger_registry表中已經過期, 沒有進行心跳註冊的executor執行器

③XxlJobDynamicScheduler.xxlJobRegistryDao.findAll(RegistryConfig.DEAD_TIMEOUT);//查詢全部正常心跳註冊上來的executor執行器

④if (RegistryConfig.RegistType.EXECUTOR.name().equals(item.getRegistryGroup())) //判斷是否是executor執行器

appAddressMap.put(appName, registryList);//這裡的appname指的是executor執行器在xxl-job-executor.properties檔案中配置的xxl.job.executor.appname=xxl-job-executor-sample, 而registryList即使每個executor執行器的ip和executor啟動jetty server埠號組成, 埠號由配置檔案中的xxl.job.executor.port=9999決定.

⑤ 接著的這段程式碼就是, 迴圈判斷admin排程中心group分組中是否有與executor執行器appname相同的名稱.     

// fresh group address
                            for (XxlJobGroup group : groupList) {
                                // xiongxianze-test1
                                // 從這裡就可以看出來, xxl.job.executor.appname properties檔案中該項配置,
                                // 必須與admin端管理執行器時AppName一致, 因為這裡是根據從資料庫的group表的appname作為key,
                                // 去取executor執行器端的放入快取中appname
                                List<String> registryList = appAddressMap.get(group.getAppName());
                                String addressListStr = null;
                                // 判斷是否已經有了address, 有了的話, 使用"," 拼接addressList
                                if (CollectionUtils.isNotEmpty(registryList)) {
                                    Collections.sort(registryList);
                                    addressListStr = StringUtils.join(registryList, ",");
                                }
                                // 所以addressListStr = null
                                group.setAddressList(addressListStr);
                                // group = {"addressType":0,"appName":"xiongxianze-test1","id":2,"order":2,"title":"新分組,
                                // 註冊方式:自動"}>
                                logger.info("==> group : {}", JSON.toJSONString(group));
                                XxlJobDynamicScheduler.xxlJobGroupDao.update(group);
                            }

這裡的關鍵程式碼就是, XxlJobDynamicScheduler.xxlJobGroupDao.update(group); 將executor執行器端註冊上來相同的xxl.job.executor.appname值, 放入了同一個組, 這樣admin排程中心, 就能從這個group組中選擇不同的排程策略排程本次執行的job. 同時也說明了這幾個executor執行器, 對應的是排程同一個job, 這樣admin排程中心就能選擇相應的排程策略排程job, 比如:first第一個executor執行本次job, last最後一個executor執行器執行本次排程, random隨機選擇一個executor執行進行本次排程, 或者是LFU最不經常, LRU最近最久未使用的執行器執行本次排程.

到這裡admin排程中心就實現了executor執行器註冊上來的服務暴露地址的自動發現.

1.4 具體分析admin排程中心是如何實現監控executor執行器執行結果的 -- monitorThread執行緒

@Override
            public void run() {

                // monitor
                while (!toStop) {
                    try {
                        // 從快取佇列中獲取資料, jogLogId
                        Integer jobLogId = JobFailMonitorHelper.instance.queue.take();
                        if (jobLogId != null && jobLogId > 0) {
                            // 根據joblogid從xxl_job_qrtz_trgger_log表中獲取job執行的log
                            XxlJobLog log = XxlJobDynamicScheduler.xxlJobLogDao.load(jobLogId);
                            if (log != null) {

                                // log.getHandleCode()預設值為0, 而且返回值200, job正在執行中
                                if (ReturnT.SUCCESS_CODE == log.getTriggerCode() && log.getHandleCode() == 0) {
                                    // job running, wait + again monitor
                                    TimeUnit.SECONDS.sleep(10);

                                    JobFailMonitorHelper.monitor(jobLogId);
                                    logger.info(">>>>>>>>>>> job monitor, job running, JobLogId:{}", jobLogId);
                                }

                                // job成功執行
                                if (ReturnT.SUCCESS_CODE == log.getTriggerCode()
                                    && ReturnT.SUCCESS_CODE == log.getHandleCode()) {
                                    // job success, pass
                                    logger.info(">>>>>>>>>>> job monitor, job success, JobLogId:{}", jobLogId);
                                }

                                // job執行失敗
                                if (ReturnT.FAIL_CODE == log.getTriggerCode()
                                    || ReturnT.FAIL_CODE == log.getHandleCode()) {
                                    // job fail,
                                    // 執行失敗, 傳送告警郵件
                                    sendMonitorEmail(log);
                                    logger.info(">>>>>>>>>>> job monitor, job fail, JobLogId:{}", jobLogId);
                                }
                            }
                        }
                    } catch (Exception e) {
                        logger.error("job monitor error:{}", e);
                    }
                }

                // monitor all clear
                List<Integer> jobLogIdList = new ArrayList<Integer>();
                int drainToNum = getInstance().queue.drainTo(jobLogIdList);
                if (jobLogIdList != null && jobLogIdList.size() > 0) {
                    for (Integer jobLogId : jobLogIdList) {
                        XxlJobLog log = XxlJobDynamicScheduler.xxlJobLogDao.load(jobLogId);
                        if (ReturnT.FAIL_CODE == log.getTriggerCode() || ReturnT.FAIL_CODE == log.getHandleCode()) {
                            // job fail,
                            sendMonitorEmail(log);
                            logger.info(">>>>>>>>>>> job monitor last, job fail, JobLogId:{}", jobLogId);
                        }
                    }
                }

            }

①JobFailMonitorHelper.instance.queue.take(); // 首先monitor執行緒會從queue佇列中取出已經logJobId, 而這個jobLogId是什麼時候放入queue佇列中的呢? 是XxlJobTrigger類在執行trigger()進行一次排程時, 寫入的, 如下:

當執行本次排程時, 其實是走了一遍rpc遠端呼叫, 得到執行結果200或者500狀態, 而monitor監控執行緒就是根據該狀態判斷本次排程是否成功了.

②monitor執行緒再根據jobLogId查詢資料庫, 判斷本次排程job執行結果, trigger_code=200成功, 則在控制檯列印本次排程執行成功.

否則就傳送告警郵件.

以上分析的只是admin排程中心初始化時, 所做的一些初始化工作, 一是, 開啟一個自動發現executor執行器服務的執行緒, 並寫入xxl-job-qrtz-trigger-group表, 二是, 再開啟一個monitor監控執行緒, 用於監控executor執行器執行本次job的執行結果.

下一篇將解析admin排程中心具體的排程過程, 排程策略以及rpc通訊的實現.