分布式任務elastic-job
# elastic-job簡介
目前Elastic job的最新版本已經由原來的elastic-job-core分離除了兩個項目,分別為Elastic-Job-Lite和Elastic-Job-Cloud。Elastic-Job是一個分布式調度解決方案,由兩個相互獨立的子項目Elastic-Job-Lite和Elastic-Job-Cloud組成,Elastic-Job-Lite定位為輕量級無中心化解決方案,使用jar包的形式提供分布式任務的協調服務。 Elastic-Job-Cloud使用Mesos + Docker(TBD)的解決方案,額外提供資源治理、應用分發以及進程隔離等服務,Elastic-Job-Lite和Elastic-Job-Cloud提供同一套API開發作業,開發者僅需一次開發,即可根據需要以Lite或Cloud的方式部署 .[轉自官網:https://github.com/dangdangdotcom/elastic-job/blob/master/README_cn.md]
一般的技術quartz、spring task、java.util.Timer,這幾種如果在單一機器上跑其實問題不大,但是如果一旦應用於集群環境做分布式部署,就會帶來一個致命的問題,那就是重復執行,當然解決方案有,但是必須依賴數據庫,將任務執行狀態持久化下來。所以當當就把quartz和zookeeper結合起來達到分布式調度,並且添加其他功能,形成了elastic-job。
elastic-job主要的設計理念是無中心化的分布式定時調度框架,思路來源於Quartz的基於數據庫的高可用方案。但數據庫沒有分布式協調功能,所以在高可用方案的基礎上增加了彈性擴容和數據分片的思路,以便於更大限度的利用分布式服務器的資源。
# 功能
1. 主要功能
a) 分布式:重寫Quartz基於數據庫的分布式功能,改用Zookeeper實現註冊中心。
b) 並行調度:采用任務分片方式實現。將一個任務拆分為n個獨立的任務項,由分布式的服務器並行執行各自分配到的分片項。
c) 彈性擴容縮容:將任務拆分為n個任務項後,各個服務器分別執行各自分配到的任務項。一旦有新的服務器加入集群,或現有服務器下線,elastic-job將在保留本次任務執行不變的情況下,下次任務開始前觸發任務重分片。
d) 集中管理:采用基於Zookeeper的註冊中心,集中管理和協調分布式作業的狀態,分配和監聽。外部系統可直接根據Zookeeper的數據管理和監控elastic-job。
e) 定制化流程型任務:作業可分為簡單和數據流處理兩種模式,數據流又分為高吞吐處理模式和順序性處理模式,其中高吞吐處理模式可以開啟足夠多的線程快速的處理數據,而順序性處理模式將每個分片項分配到一個獨立線程,用於保證同一分片的順序性,這點類似於kafka的分區順序性。
2. 其他功能
a) 失效轉移:彈性擴容縮容在下次作業運行前重分片,但本次作業執行的過程中,下線的服務器所分配的作業將不會重新被分配。失效轉移功能可以在本次作業運行中用空閑服務器抓取孤兒作業分片執行。同樣失效轉移功能也會犧牲部分性能。
b) Spring命名空間支持:elastic-job可以不依賴於spring直接運行,但是也提供了自定義的命名空間方便與spring集成。
c) 運維平臺:提供web控制臺用於管理作業。
下載源碼
https://github.com/elasticjob
#spring boot quick start
Add maven dependency
<!-- https://mvnrepository.com/artifact/com.dangdang/elastic-job-lite-core --> <!-- import elastic-job lite core --> <dependency> <groupId>com.dangdang</groupId> <artifactId>elastic-job-lite-core</artifactId> <version>2.1.6</version> </dependency> <!-- import other module if need --> <dependency> <groupId>com.dangdang</groupId> <artifactId>elastic-job-lite-spring</artifactId> <version>2.1.6</version> </dependency>
RegCenter configuration
@Configuration public class RegistryCenterConfig { @Bean(initMethod = "init") public ZookeeperRegistryCenter regCenter() { String serverList = ConfigOne.getProperty(ConfigConstants.JOB_REG_ZK); String namespace = ConfigOne.getProperty(ConfigConstants.JOB_REG_NS); return new ZookeeperRegistryCenter(new ZookeeperConfiguration(serverList, namespace)); } }
Job configuration
@Configuration public class SimpleJobConfig { //private String defaultCron = "0/5 * * * * ?"; //默認每天0點10分開始統計 @Value("${job.default.cron}") private String defaultCron = "0 10 0 * * ?"; @Value("${job.default.shardingTotalCount}") private int defaultShardTotal = 1; @Value("${job.default.shardingItemParameters}") private String defaultShardPrams = ""; @Resource private ZookeeperRegistryCenter regCenter; /*@Resource private JobEventConfiguration jobEventConfiguration;*/ @Resource private StatsDeviceJob statsDeviceJob; @Resource private StatsUserJob statsUserJob; @Resource private StatsDeviceFaultJob statsDeviceFaultJob; @Resource private StatsDeviceAlarmJob statsDeviceAlarmJob; @Resource private StatsRuleJob statsRuleJob; @Resource private StatsYumairJob statsYumairJob; //@Bean(initMethod = "init") @PostConstruct public void init() { //statsDeviceJob new SpringJobScheduler(statsDeviceJob, regCenter, getLiteJobConfiguration(statsDeviceJob.getClass(), defaultCron, defaultShardTotal, defaultShardPrams)).init(); //statsUserJob new SpringJobScheduler(statsUserJob, regCenter, getLiteJobConfiguration(statsUserJob.getClass(), defaultCron, defaultShardTotal, defaultShardPrams)).init(); //statsDeviceFaultJob new SpringJobScheduler(statsDeviceFaultJob, regCenter, getLiteJobConfiguration(statsDeviceFaultJob.getClass(), defaultCron, defaultShardTotal, defaultShardPrams)).init(); //statsDeviceAlarmJob new SpringJobScheduler(statsDeviceAlarmJob, regCenter, getLiteJobConfiguration(statsDeviceAlarmJob.getClass(), defaultCron, defaultShardTotal, defaultShardPrams)).init(); //statsRuleJob new SpringJobScheduler(statsRuleJob, regCenter, getLiteJobConfiguration(statsRuleJob.getClass(), defaultCron, defaultShardTotal, defaultShardPrams)).init(); //statsYumairJob 每小時5分運行一次 new SpringJobScheduler(statsYumairJob, regCenter, getLiteJobConfiguration(statsYumairJob.getClass(), "0 5 * * * ?", defaultShardTotal, defaultShardPrams)).init(); } private LiteJobConfiguration getLiteJobConfiguration(final Class<? extends SimpleJob> jobClass, final String cron, final int defaultShardTotal, final String defaultShardPrams) { return LiteJobConfiguration.newBuilder(new SimpleJobConfiguration(JobCoreConfiguration.newBuilder( jobClass.getName(), cron, defaultShardTotal).shardingItemParameters(defaultShardPrams).build(), jobClass.getCanonicalName())).overwrite(true).build(); } }
Job development
@Service public class StatsDeviceJob implements SimpleJob { private static Logger logger = LoggerFactory.getLogger(StatsDeviceJob.class); @Autowired private DeviceStatsFeignClient deviceStatsFeignClient; @Autowired private DeviceDataFeignClient deviceDataFeignClient; @Autowired private IDataStatsFacade dataStatsFacade; /** * 1.當分片數為1時,在同一個zookepper和jobname情況下,多臺機器部署了Elastic job時, * 只有拿到shardingContext.getShardingItem()為0的機器得以執行,其他的機器不執行 * 2.當分片數大於1時,假如有3臺服務器,分成10片,則分片項分配結果為服務器A=0,1,2;服務器B=3,4,5;服務器C=6,7,8,9。 * 此時每臺服務器可根據拿到的shardingItem值進行相應的處理 * 目前job分片數全部置為1,即不使用分片 * @param shardingContext */ @Override public void execute(ShardingContext shardingContext) { logger.info(String.format("ShardingItem: %s | Thread: %s | %s", shardingContext.getShardingItem(), Thread.currentThread().getId(), "SIMPLE")); deviceStatsJob(); } public void deviceStatsJob() { //TODO } }
# 運維平臺和RESTFul API部署(可選)
1. 下載或者克隆elastic-job源碼
地址:https://github.com/dangdangdotcom/elastic-job
2. maven編譯安裝
進入到elastic-job目錄,按住Shift+鼠標右鍵,選擇“在此處打開命令窗口(W)”,執行如下命令:
```
mvn clean install -Dmaven.test.skip=true
```
```
[INFO] ------------------------------------------------------------------------
[INFO] Reactor Summary:
[INFO]
[INFO] elastic-job ....................................... SUCCESS [23.570s]
[INFO] elastic-job-common ................................ SUCCESS [0.053s]
[INFO] elastic-job-common-core ........................... SUCCESS [27.108s]
[INFO] elastic-job-common-restful ........................ SUCCESS [29.844s]
[INFO] elastic-job-lite .................................. SUCCESS [0.078s]
[INFO] elastic-job-lite-core ............................. SUCCESS [7.249s]
[INFO] elastic-job-lite-lifecycle ........................ SUCCESS [3.766s]
[INFO] elastic-job-lite-spring ........................... SUCCESS [1:08:42.613s
]
[INFO] elastic-job-lite-console .......................... SUCCESS [2:31.964s]
[INFO] elastic-job-cloud ................................. SUCCESS [0.031s]
[INFO] elastic-job-cloud-executor ........................ SUCCESS [3.728s]
[INFO] elastic-job-cloud-scheduler ....................... SUCCESS [33.803s]
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 1:13:24.136s
[INFO] Finished at: Sat Dec 02 18:33:33 CST 2017
[INFO] Final Memory: 55M/272M
[INFO] ------------------------------------------------------------------------
```
3. 解壓上一步打好的包
路徑:elastic-job\elastic-job-lite\elastic-job-lite-console\target\elastic-job-lite-console-2.1.6.tar.gz
elastic-job-lite-console-2.1.6\bin目錄下是啟動腳本
windows環境用:start.bat
linux環境用:start.sh
elastic-job-lite-console-2.1.6\conf目錄下是配置文件auth.properties,配置的用戶名和密碼
4. 解壓縮elastic-job-lite-console-${version}.tar.gz並執行bin\start.sh。
5. 打開瀏覽器訪問http://localhost:8899/即可訪問控制臺。8899為默認端口號,可通過啟動腳本輸入-p自定義端口號。
6. 訪問RESTFul API方法同控制臺。
分布式任務elastic-job