分散式任務排程平臺XXL-JOB--原始碼解析三:xxl-job-admin排程中心原始碼解析之初始化兩個Thread工作執行緒
xxl-job-admin初始化工作
1.1 啟動時, 載入applicationcontext-xxl-job-admin.xml, 初始化spring容器
<!-- 這個排程中心,在啟動的時候,會做很多初始化的工作 ,比如:執行器資訊,註冊機器列表等資訊 --> <bean id="xxlJobDynamicScheduler" class="com.xxl.job.admin.core.schedule.XxlJobDynamicScheduler" init-method="init" destroy-method="destroy" > <!-- (輕易不要變更“排程器名稱”, 任務建立時會繫結該“排程器名稱”) --> <property name="scheduler" ref="quartzScheduler"/> <property name="accessToken" value="${xxl.job.accessToken}" /> </bean>
Spring容器在初始化xxlJobDynamicScheduler時, 會呼叫inti-method指定的init()方法.
1.2 xxlJobDynamicScheduler類init()分析
public void init() throws Exception { // admin排程中心專門開啟一個自動註冊執行緒registryThread, 獲取型別為自動註冊的執行器資訊,完成executor執行器的自動註冊與發現 // 1 用來監控自動註冊上來的機器,達到自動註冊的目的 // 2 admin排程中心的自動註冊就是, 將executor端每30秒心跳傳送過來的address寫入registry表, // 然後就是開啟一個自動registry執行緒, 每隔30s, 將registry表資料寫入到group表. // admin registry monitor run JobRegistryMonitorHelper.getInstance().start(); // 啟動失敗日誌監控執行緒 // 2 監控任務的執行狀態, 如若失敗,則傳送郵件預警 // admin monitor run JobFailMonitorHelper.getInstance().start(); NetComServerFactory.putService(AdminBiz.class, XxlJobDynamicScheduler.adminBiz); NetComServerFactory.setAccessToken(accessToken); // valid Assert.notNull(scheduler, "quartz scheduler is null"); logger.info(">>>>>>>>> init xxl-job admin success."); }
上述程式碼片段中, 主要是開啟了兩個執行緒:
JobRegistryMonitorHelper.getInstance().start(); // registryThread執行緒: 用於發現註冊上來的executor執行器
JobFailMonitorHelper.getInstance().start(); // monitorThread執行緒: 查詢資料庫log表, 根據trigger_code欄位值(200成功, 500失敗), 用於監控executor執行器本次job執行是否成功.
NetComServerFactory.putService(AdminBiz.class, XxlJobDynamicScheduler.adminBiz); // 將Spring容器初始化的adminBizImpl物件放入本地快取
NetComServerFactory.setAccessToken(accessToken);// 設定訪問AccessToken
1.3 具體分析admin排程中心是如何實現對executor執行器暴露服務的自動發現 -- registry執行緒
public void start() {
registryThread = new Thread(new Runnable() {
@Override
public void run() {
while (!toStop) {
try {
// 查詢自動註冊的,address_type欄位型別為0的即為自動註冊的
// auto registry group
List<XxlJobGroup> groupList = XxlJobDynamicScheduler.xxlJobGroupDao.findByAddressType(0);
// groupList = [{"addressType":0,"appName":"xiongxianze-test1","id":2,"order":2,"title":"新分組,
// 註冊方式:自動"}]>
logger.info("==>groupList : {}", JSON.toJSONString(groupList));
if (CollectionUtils.isNotEmpty(groupList)) {
// 刪除 90秒之內沒有更新資訊的註冊機器, 90秒沒有心跳資訊返回,代表機器已經出現問題,故移除
// remove dead address (admin/executor)
XxlJobDynamicScheduler.xxlJobRegistryDao.removeDead(RegistryConfig.DEAD_TIMEOUT);
// fresh online address (admin/executor)
HashMap<String, List<String>> appAddressMap = new HashMap<String, List<String>>();
// 正常心跳的executor端
List<XxlJobRegistry> list = XxlJobDynamicScheduler.xxlJobRegistryDao.findAll(RegistryConfig.DEAD_TIMEOUT);
// list =
// [{"id":7,"registryGroup":"EXECUTOR","registryKey":"local-execute","registryValue":"10.0.254.5:9999","updateTime":1539711248000}]>
logger.info("==>fresh online address list:{}", JSON.toJSONString(list));
if (list != null) {
for (XxlJobRegistry item : list) {
// 判斷該機器註冊資訊RegistryGroup ,RegistType 是否是EXECUTOR
if (RegistryConfig.RegistType.EXECUTOR.name().equals(item.getRegistryGroup())) {
// appName = local-execute
String appName = item.getRegistryKey();
// 獲取註冊的執行器 KEY (也就是執行器)
List<String> registryList = appAddressMap.get(appName);
// registryList1 = null
logger.info("==>registryList1 :{}", JSON.toJSONString(registryList));
if (registryList == null) {
registryList = new ArrayList<String>();
}
if (!registryList.contains(item.getRegistryValue())) {
registryList.add(item.getRegistryValue());
}
// registryList2 = ["10.0.254.5:9999"]
logger.info("==>registryList2 :{}", JSON.toJSONString(registryList));
// local-execute : ["10.0.254.5:9999"]
appAddressMap.put(appName, registryList);
}
}
}
// fresh group address
for (XxlJobGroup group : groupList) {
// xiongxianze-test1
// 從這裡就可以看出來, xxl.job.executor.appname properties檔案中該項配置,
// 必須與admin端管理執行器時AppName一致
List<String> registryList = appAddressMap.get(group.getAppName());
String addressListStr = null;
// 判斷是否已經有了address, 有了的話, 使用"," 拼接addressList
if (CollectionUtils.isNotEmpty(registryList)) {
Collections.sort(registryList);
addressListStr = StringUtils.join(registryList, ",");
}
// 所以addressListStr = null
group.setAddressList(addressListStr);
// group = {"addressType":0,"appName":"xiongxianze-test1","id":2,"order":2,"title":"新分組,
// 註冊方式:自動"}>
logger.info("==> group : {}", JSON.toJSONString(group));
XxlJobDynamicScheduler.xxlJobGroupDao.update(group);
}
}
} catch (Exception e) {
logger.error("job registry instance error:{}", e);
}
try {
TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
} catch (InterruptedException e) {
logger.error("job registry instance error:{}", e);
}
}
}
});
registryThread.setDaemon(true);
registryThread.start();
}
上述程式碼片段主要的流程就是:
①XxlJobDynamicScheduler.xxlJobGroupDao.findByAddressType(0);//從資料庫中查詢address_type=0(即自動註冊)的全部分組資訊
②XxlJobDynamicScheduler.xxlJobRegistryDao.removeDead(RegistryConfig.DEAD_TIMEOUT);//刪除資料庫中該xxl_job_qrtz_trigger_registry表中已經過期, 沒有進行心跳註冊的executor執行器
③XxlJobDynamicScheduler.xxlJobRegistryDao.findAll(RegistryConfig.DEAD_TIMEOUT);//查詢全部正常心跳註冊上來的executor執行器
④if (RegistryConfig.RegistType.EXECUTOR.name().equals(item.getRegistryGroup())) //判斷是否是executor執行器
appAddressMap.put(appName, registryList);//這裡的appname指的是executor執行器在xxl-job-executor.properties檔案中配置的xxl.job.executor.appname=xxl-job-executor-sample, 而registryList即使每個executor執行器的ip和executor啟動jetty server埠號組成, 埠號由配置檔案中的xxl.job.executor.port=9999決定.
⑤ 接著的這段程式碼就是, 迴圈判斷admin排程中心group分組中是否有與executor執行器appname相同的名稱.
// fresh group address
for (XxlJobGroup group : groupList) {
// xiongxianze-test1
// 從這裡就可以看出來, xxl.job.executor.appname properties檔案中該項配置,
// 必須與admin端管理執行器時AppName一致, 因為這裡是根據從資料庫的group表的appname作為key,
// 去取executor執行器端的放入快取中appname
List<String> registryList = appAddressMap.get(group.getAppName());
String addressListStr = null;
// 判斷是否已經有了address, 有了的話, 使用"," 拼接addressList
if (CollectionUtils.isNotEmpty(registryList)) {
Collections.sort(registryList);
addressListStr = StringUtils.join(registryList, ",");
}
// 所以addressListStr = null
group.setAddressList(addressListStr);
// group = {"addressType":0,"appName":"xiongxianze-test1","id":2,"order":2,"title":"新分組,
// 註冊方式:自動"}>
logger.info("==> group : {}", JSON.toJSONString(group));
XxlJobDynamicScheduler.xxlJobGroupDao.update(group);
}
這裡的關鍵程式碼就是, XxlJobDynamicScheduler.xxlJobGroupDao.update(group); 將executor執行器端註冊上來相同的xxl.job.executor.appname值, 放入了同一個組, 這樣admin排程中心, 就能從這個group組中選擇不同的排程策略排程本次執行的job. 同時也說明了這幾個executor執行器, 對應的是排程同一個job, 這樣admin排程中心就能選擇相應的排程策略排程job, 比如:first第一個executor執行本次job, last最後一個executor執行器執行本次排程, random隨機選擇一個executor執行進行本次排程, 或者是LFU最不經常, LRU最近最久未使用的執行器執行本次排程.
到這裡admin排程中心就實現了executor執行器註冊上來的服務暴露地址的自動發現.
1.4 具體分析admin排程中心是如何實現監控executor執行器執行結果的 -- monitorThread執行緒
@Override
public void run() {
// monitor
while (!toStop) {
try {
// 從快取佇列中獲取資料, jogLogId
Integer jobLogId = JobFailMonitorHelper.instance.queue.take();
if (jobLogId != null && jobLogId > 0) {
// 根據joblogid從xxl_job_qrtz_trgger_log表中獲取job執行的log
XxlJobLog log = XxlJobDynamicScheduler.xxlJobLogDao.load(jobLogId);
if (log != null) {
// log.getHandleCode()預設值為0, 而且返回值200, job正在執行中
if (ReturnT.SUCCESS_CODE == log.getTriggerCode() && log.getHandleCode() == 0) {
// job running, wait + again monitor
TimeUnit.SECONDS.sleep(10);
JobFailMonitorHelper.monitor(jobLogId);
logger.info(">>>>>>>>>>> job monitor, job running, JobLogId:{}", jobLogId);
}
// job成功執行
if (ReturnT.SUCCESS_CODE == log.getTriggerCode()
&& ReturnT.SUCCESS_CODE == log.getHandleCode()) {
// job success, pass
logger.info(">>>>>>>>>>> job monitor, job success, JobLogId:{}", jobLogId);
}
// job執行失敗
if (ReturnT.FAIL_CODE == log.getTriggerCode()
|| ReturnT.FAIL_CODE == log.getHandleCode()) {
// job fail,
// 執行失敗, 傳送告警郵件
sendMonitorEmail(log);
logger.info(">>>>>>>>>>> job monitor, job fail, JobLogId:{}", jobLogId);
}
}
}
} catch (Exception e) {
logger.error("job monitor error:{}", e);
}
}
// monitor all clear
List<Integer> jobLogIdList = new ArrayList<Integer>();
int drainToNum = getInstance().queue.drainTo(jobLogIdList);
if (jobLogIdList != null && jobLogIdList.size() > 0) {
for (Integer jobLogId : jobLogIdList) {
XxlJobLog log = XxlJobDynamicScheduler.xxlJobLogDao.load(jobLogId);
if (ReturnT.FAIL_CODE == log.getTriggerCode() || ReturnT.FAIL_CODE == log.getHandleCode()) {
// job fail,
sendMonitorEmail(log);
logger.info(">>>>>>>>>>> job monitor last, job fail, JobLogId:{}", jobLogId);
}
}
}
}
①JobFailMonitorHelper.instance.queue.take(); // 首先monitor執行緒會從queue佇列中取出已經logJobId, 而這個jobLogId是什麼時候放入queue佇列中的呢? 是XxlJobTrigger類在執行trigger()進行一次排程時, 寫入的, 如下:
當執行本次排程時, 其實是走了一遍rpc遠端呼叫, 得到執行結果200或者500狀態, 而monitor監控執行緒就是根據該狀態判斷本次排程是否成功了.
②monitor執行緒再根據jobLogId查詢資料庫, 判斷本次排程job執行結果, trigger_code=200成功, 則在控制檯列印本次排程執行成功.
否則就傳送告警郵件.
以上分析的只是admin排程中心初始化時, 所做的一些初始化工作, 一是, 開啟一個自動發現executor執行器服務的執行緒, 並寫入xxl-job-qrtz-trigger-group表, 二是, 再開啟一個monitor監控執行緒, 用於監控executor執行器執行本次job的執行結果.
下一篇將解析admin排程中心具體的排程過程, 排程策略以及rpc通訊的實現.