分散式任務排程平臺 → XXL-JOB 初探
開心一刻
旁邊的女乘客太吵,我實在忍無可忍,便對她說:“你能不能讓我睡會兒?”
她揮手就給了我一個耳光:“你個臭流氓!”
我頓時就清醒了,理論到:“你讓我睡一會怎麼了嗎”
她害羞的低下了頭,說道:“人家不是隨便的人”
我:“我也不是隨便的人,下一站我們下車把話說清楚”
任務排程
相信大家對任務排程都不陌生,說的通熟一點就是定時任務;這個在我們的專案中或多或少都存在,我們可以用 JDK 自帶的(Timer、ScheduledExecutor)來實現,也可以用 Spring 的 Scheduler 來實現,不管用以上哪種方式,我們都是在單機上跑,如果我們以叢集的方式部署,會不會出現什麼問題 ?
叢集中的各個節點都會執行定時排程,會有重複執行的問題,那怎麼辦? 我們可以加配置,只啟動某個節點的定時任務,但是這時候又會出現單點問題
那有沒有什麼辦法,既能避免重複執行,又不會出現單點問題呢? 分散式排程應運而生,常見的分散式任務排程框架有:quartz 、cronsun、Elastic-job、saturn、lts、TBSchedule、xxl-job 等
quartz 我已經簡單講過,有興趣的可以去看看:請點我,你們會發現:樓主壓根就沒講 quartz 的叢集模式。你們發現的很對,我就是沒講,就問你氣不氣 ?
既然你們對 quartz 已經有了一定的瞭解了 ,那麼它的叢集模式交給你們自己了
今天我們就一起來了解下另外一個分散式排程平臺:xxl-job
關於 xxl-job 是什麼、有什麼特性、發展歷程、接入了哪些公司、各個版本的新特性等等問題,我都不會去講,因為官方文件已經說的非常清楚了。xxl-job 是國產的,如果文件還看不懂,那就需要回學校再造了。但是我還是想強調下它的架構圖
通過這個架構圖,我們可以對其有個大致的瞭解;大體上分為排程中心 和 執行器,排程中心通過排程規則(cron表示式)對執行器中的任務進行排程,執行器收到排程後,執行具體的任務(Job)
既然官方文件都說的非常細緻了,那我還能講什麼呢 ? 好像確實麼什麼可以說的了, 那今天就到這吧,大家散會!
大家先別急著走,雖然下面的內容在官方文件中已經存在,但是卻很容易被我們忽略;我會在搭建的過程中來穿插著一些問題,來鞏固我們容易忽略的點
單節點搭建
我們先搭一個簡單的,排程中心 和 執行器 都先搭建成單節點
按照官方的文件來,一步一步很容易搭建成功
原始碼下載
原始碼地址:xxl-job,可以 git clone 也可以 Download ZIP ,不管何種方式,我們拿到了原始碼,匯入到 IDEA,結構如下
初始化 “排程資料庫”
SQL 指令碼在原始碼中已存在,路徑: xxl-job-master\doc\db\tables_xxl_job.sql ,執行此指令碼,建立資料庫和表,如下圖
配置&部署 排程中心
配置檔案: appliction.properties ,內容如下
### web server.port=8080 server.servlet.context-path=/xxl-job-admin ### actuator management.server.servlet.context-path=/actuator management.health.mail.enabled=false ### resources spring.mvc.servlet.load-on-startup=0 spring.mvc.static-path-pattern=/static/** spring.resources.static-locations=classpath:/static/ ### freemarker spring.freemarker.templateLoaderPath=classpath:/templates/ spring.freemarker.suffix=.ftl spring.freemarker.charset=UTF-8 spring.freemarker.request-context-attribute=request spring.freemarker.settings.number_format=0.########## ### mybatis mybatis.mapper-locations=classpath:/mybatis-mapper/*Mapper.xml #mybatis.type-aliases-package=com.xxl.job.admin.core.model ### xxl-job, datasource spring.datasource.url=jdbc:mysql://127.0.0.1:3306/xxl_job?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&serverTimezone=Asia/Shanghai spring.datasource.username=root spring.datasource.password=root_pwd spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver ### datasource-pool spring.datasource.type=com.zaxxer.hikari.HikariDataSource spring.datasource.hikari.minimum-idle=10 spring.datasource.hikari.maximum-pool-size=30 spring.datasource.hikari.auto-commit=true spring.datasource.hikari.idle-timeout=30000 spring.datasource.hikari.pool-name=HikariCP spring.datasource.hikari.max-lifetime=900000 spring.datasource.hikari.connection-timeout=10000 spring.datasource.hikari.connection-test-query=SELECT 1 ### xxl-job, email spring.mail.host=smtp.qq.com spring.mail.port=25 [email protected] [email protected] spring.mail.password=xxx spring.mail.properties.mail.smtp.auth=true spring.mail.properties.mail.smtp.starttls.enable=true spring.mail.properties.mail.smtp.starttls.required=true spring.mail.properties.mail.smtp.socketFactory.class=javax.net.ssl.SSLSocketFactory ### xxl-job, access token xxl.job.accessToken= ### xxl-job, i18n (default is zh_CN, and you can choose "zh_CN", "zh_TC" and "en") xxl.job.i18n=zh_CN ## xxl-job, triggerpool max size xxl.job.triggerpool.fast.max=200 xxl.job.triggerpool.slow.max=100 ### xxl-job, log retention days xxl.job.logretentiondays=30View Code
需要改的地方不多,埠號可能需要根據實際情況進行修改,然後就是資料庫的地址、使用者名稱和密碼需要改成自己的,email伺服器最好配上(告警用的上),排程中心與執行器之間的安全訪問 token( xxl.job.accessToken ) 最好也配置上
出於演示,改下資料庫的配置就好,其他的保持預設;我們啟動排程中心,訪問: http://localhost:8080/xxl-job-admin ,預設登入賬號 “admin/123456”, 登入後執行介面如下圖所示
“排程中心” 已經部署成功
配置&部署 執行器
配置檔案: application.properties ,內容如下
# web port server.port=8081 # no web #spring.main.web-environment=false # log config logging.config=classpath:logback.xml ### xxl-job admin address list, such as "http://address" or "http://address01,http://address02" xxl.job.admin.addresses=http://127.0.0.1:8080/xxl-job-admin ### xxl-job, access token xxl.job.accessToken= ### xxl-job executor appname xxl.job.executor.appname=xxl-job-executor-sample ### xxl-job executor registry-address: default use address to registry , otherwise use ip:port if address is null xxl.job.executor.address= ### xxl-job executor server-info xxl.job.executor.ip= xxl.job.executor.port=9999 ### xxl-job executor log-path xxl.job.executor.logpath=/data/applogs/xxl-job/jobhandler ### xxl-job executor log-retention-days xxl.job.executor.logretentiondays=30View Code
埠號配置一個未被使用的埠,排程中心地址配置成我們之前部署的排程中心的地址即可;至於 xxl.job.accessToken ,和排程中心配置成一樣即可
配置檔案中各個配置的註釋寫的非常清楚,大家根據實際情況進行配置即可
執行器的示例有好幾個,我們啟動 springboot 版本的;啟動不報錯就行了,它會自動註冊到排程中心,如下圖
配置排程規則&任務
排程中心通過排程規則對執行器中的任務進行排程,現在排程中心和執行器都部署好了,就缺排程規則和任務了
任務在示例程式碼中已經存在了, SampleXxlJob.java :
package com.xxl.job.executor.service.jobhandler; import com.xxl.job.core.biz.model.ReturnT; import com.xxl.job.core.context.XxlJobContext; import com.xxl.job.core.handler.IJobHandler; import com.xxl.job.core.handler.annotation.XxlJob; import com.xxl.job.core.log.XxlJobLogger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; import java.io.BufferedInputStream; import java.io.BufferedReader; import java.io.DataOutputStream; import java.io.InputStreamReader; import java.net.HttpURLConnection; import java.net.URL; import java.util.Arrays; import java.util.concurrent.TimeUnit; /** * XxlJob開發示例(Bean模式) * * 開發步驟: * 1、在Spring Bean例項中,開發Job方法,方式格式要求為 "public ReturnT<String> execute(String param)" * 2、為Job方法添加註解 "@XxlJob(value="自定義jobhandler名稱", init = "JobHandler初始化方法", destroy = "JobHandler銷燬方法")",註解value值對應的是排程中心新建任務的JobHandler屬性的值。 * 3、執行日誌:需要通過 "XxlJobLogger.log" 列印執行日誌; * * @author xuxueli 2019-12-11 21:52:51 */ @Component public class SampleXxlJob { private static Logger logger = LoggerFactory.getLogger(SampleXxlJob.class); /** * 1、簡單任務示例(Bean模式) */ @XxlJob("demoJobHandler") public ReturnT<String> demoJobHandler(String param) throws Exception { XxlJobLogger.log("XXL-JOB, Hello World."); for (int i = 0; i < 5; i++) { XxlJobLogger.log("beat at:" + i); TimeUnit.SECONDS.sleep(2); } return ReturnT.SUCCESS; } /** * 2、分片廣播任務 */ @XxlJob("shardingJobHandler") public ReturnT<String> shardingJobHandler(String param) throws Exception { // 分片引數 int shardIndex = XxlJobContext.getXxlJobContext().getShardIndex(); int shardTotal = XxlJobContext.getXxlJobContext().getShardTotal(); XxlJobLogger.log("分片引數:當前分片序號 = {}, 總分片數 = {}", shardIndex, shardTotal); // 業務邏輯 for (int i = 0; i < shardTotal; i++) { if (i == shardIndex) { XxlJobLogger.log("第 {} 片, 命中分片開始處理", i); } else { XxlJobLogger.log("第 {} 片, 忽略", i); } } return ReturnT.SUCCESS; } /** * 3、命令列任務 */ @XxlJob("commandJobHandler") public ReturnT<String> commandJobHandler(String param) throws Exception { String command = param; int exitValue = -1; BufferedReader bufferedReader = null; try { // command process Process process = Runtime.getRuntime().exec(command); BufferedInputStream bufferedInputStream = new BufferedInputStream(process.getInputStream()); bufferedReader = new BufferedReader(new InputStreamReader(bufferedInputStream)); // command log String line; while ((line = bufferedReader.readLine()) != null) { XxlJobLogger.log(line); } // command exit process.waitFor(); exitValue = process.exitValue(); } catch (Exception e) { XxlJobLogger.log(e); } finally { if (bufferedReader != null) { bufferedReader.close(); } } if (exitValue == 0) { return IJobHandler.SUCCESS; } else { return new ReturnT<String>(IJobHandler.FAIL.getCode(), "command exit value("+exitValue+") is failed"); } } /** * 4、跨平臺Http任務 * 引數示例: * "url: http://www.baidu.com\n" + * "method: get\n" + * "data: content\n"; */ @XxlJob("httpJobHandler") public ReturnT<String> httpJobHandler(String param) throws Exception { // param parse if (param==null || param.trim().length()==0) { XxlJobLogger.log("param["+ param +"] invalid."); return ReturnT.FAIL; } String[] httpParams = param.split("\n"); String url = null; String method = null; String data = null; for (String httpParam: httpParams) { if (httpParam.startsWith("url:")) { url = httpParam.substring(httpParam.indexOf("url:") + 4).trim(); } if (httpParam.startsWith("method:")) { method = httpParam.substring(httpParam.indexOf("method:") + 7).trim().toUpperCase(); } if (httpParam.startsWith("data:")) { data = httpParam.substring(httpParam.indexOf("data:") + 5).trim(); } } // param valid if (url==null || url.trim().length()==0) { XxlJobLogger.log("url["+ url +"] invalid."); return ReturnT.FAIL; } if (method==null || !Arrays.asList("GET", "POST").contains(method)) { XxlJobLogger.log("method["+ method +"] invalid."); return ReturnT.FAIL; } boolean isPostMethod = method.equals("POST"); // request HttpURLConnection connection = null; BufferedReader bufferedReader = null; try { // connection URL realUrl = new URL(url); connection = (HttpURLConnection) realUrl.openConnection(); // connection setting connection.setRequestMethod(method); connection.setDoOutput(isPostMethod); connection.setDoInput(true); connection.setUseCaches(false); connection.setReadTimeout(5 * 1000); connection.setConnectTimeout(3 * 1000); connection.setRequestProperty("connection", "Keep-Alive"); connection.setRequestProperty("Content-Type", "application/json;charset=UTF-8"); connection.setRequestProperty("Accept-Charset", "application/json;charset=UTF-8"); // do connection connection.connect(); // data if (isPostMethod && data!=null && data.trim().length()>0) { DataOutputStream dataOutputStream = new DataOutputStream(connection.getOutputStream()); dataOutputStream.write(data.getBytes("UTF-8")); dataOutputStream.flush(); dataOutputStream.close(); } // valid StatusCode int statusCode = connection.getResponseCode(); if (statusCode != 200) { throw new RuntimeException("Http Request StatusCode(" + statusCode + ") Invalid."); } // result bufferedReader = new BufferedReader(new InputStreamReader(connection.getInputStream(), "UTF-8")); StringBuilder result = new StringBuilder(); String line; while ((line = bufferedReader.readLine()) != null) { result.append(line); } String responseMsg = result.toString(); XxlJobLogger.log(responseMsg); return ReturnT.SUCCESS; } catch (Exception e) { XxlJobLogger.log(e); return ReturnT.FAIL; } finally { try { if (bufferedReader != null) { bufferedReader.close(); } if (connection != null) { connection.disconnect(); } } catch (Exception e2) { XxlJobLogger.log(e2); } } } /** * 5、生命週期任務示例:任務初始化與銷燬時,支援自定義相關邏輯; */ @XxlJob(value = "demoJobHandler2", init = "init", destroy = "destroy") public ReturnT<String> demoJobHandler2(String param) throws Exception { XxlJobLogger.log("XXL-JOB, Hello World."); return ReturnT.SUCCESS; } public void init(){ logger.info("init"); } public void destroy(){ logger.info("destory"); } }View Code
是一個任務集,裡面每一個被 @XxlJob 修飾的都是一個任務,我們以名為: demoJobHandler 的任務來做演示
任務已經定好,目前就只差排程規則了,我們去排程中心管理介面進行配置;預設情況下,xxl-job 會幫我們自動配置好一個任務,如下
直接用它是可以的,但是為了清楚怎麼配置,我們重新配置一個
各個配置項的具體含義大家可以去看官方文件,裡面都有詳細的介紹;
簡單點來說上圖的配置,就是每隔 3 秒,排程中心會去排程 示例執行器 的 demoJobHandler 任務
啟動排程
配置和部署都已完成,現在差的就是啟動排程了,我們啟動它
然後我們就可以在排程日誌頁面檢視排程中心的排程日誌了,如下所示
問題
現在不管是排程中心,還是執行器,都是單節點的,都存在單節點問題
那如何解決了,單節點的解決方案往往就是叢集,我們可以將排程中心和執行器都部署成叢集,而 xxl-job 又是支援的,而且叢集部署非常簡單、方便
叢集搭建
叢集架構圖簡單如下
nginx 只是對排程中心的請求(排程中心管理頁面的操作)做負載均衡,它不涉及任務的排程與回撥,這裡就不配置 nginx 了, 我們重點來看下排程中心叢集與執行器叢集的搭建
排程中心叢集
排程中心叢集的搭建非常簡單,只需要注意兩點:DB配置保持一致,叢集機器時鐘保持一致(單機叢集忽視)
出於演示,我們就做單機叢集處理,那麼我們只需要在 IDEA 中再啟動一個排程中心節點就好,埠號配置不一樣就好;排程中心共啟動兩個節點,之前的埠號是8080, 這個我們改成 8088
啟動之後,我們就可以從http://localhost:8080/xxl-job-admin,http://localhost:8088/xxl-job-admin 對排程中心控制檯進行訪問了,具體就不演示了, 大家可以自行去操作
生產環境下,會通過 nginx 對外暴露唯一地址,由 nginx 對這兩個(或者多個)進行負載均衡
執行器叢集
搭建同樣非常簡單,只需要注意兩點
1、執行器回撥地址(xxl.job.admin.addresses)需要保持一致;執行器根據該配置進行執行器自動註冊等操作。
2、同一個執行器叢集內AppName(xxl.job.executor.appname)需要保持一致;排程中心根據該配置動態發現不同叢集的線上執行器列表
由於是單機叢集搭建,埠的唯一性也需要注意
啟動之後,去排程中心修改路由策略為輪訓,再啟動任務排程,然後就可以去檢視排程日誌了
宕機測試
這個就不演示了,大家自行去測試,停掉某個節點,整個排程是否能正常完成
疑問
1、排程中心叢集部署,任務排程的時候,會不會每個節點都發起排程請求,從而產生重複排程的問題
這個問題在官方文件中有說明:基於資料庫的叢集方案,資料庫選用Mysql;叢集分散式併發環境中進行定時任務排程時,會在各個節點會上報任務,存到資料庫中,執行時會從資料庫中取出觸發器來執行,如果觸發器的名稱和執行時間相同,則只有一個節點去執行此任務。
因此對同一個排程,不會產生重複排程問題
2、執行器叢集收到排程請求後,會不會每個節點都去執行任務
這個問題不成立,我們不是配置了路由策略嗎,排程中心會根據路由策略將排程請求傳送給具體的某個執行器了,那何來每個執行器都執行任務呢 ?
如果官方文件看的細的話,我們會發現有如下一段話
不只非同步排程和非同步執行,其實還包括非同步回撥,xxl-job 中用到了大量的佇列、非同步處理
當然還有一些其他的疑問,絕大部分在官方文件都能找到答案,所以需要大家多讀、細讀
總結
1、單機模式,大家瞭解就好,生產環境肯定都是叢集模式的;但 xxl-job 的叢集部署也非常簡單
2、xxl-job 的全非同步化&輕量級設計,可以保證使用有限的執行緒支撐大量的JOB併發執行
3、通篇都是在 xxl-job 的原始碼上進行的,如何將它應用進我們的實戰專案中了 ? 實戰篇,我們下期見
參考
XXL