分散式定時任務之1:elasticjob配置使用
最近專案中需要做定時跑批任務,因為是SOA的專案,spring自帶的單節點定時任務已經不能滿足任務,為了快速上線,選擇了當當網開元的elasticjob lite版本。
其實使用很簡單,幾個簡單配置就ok了。
專案中使用的是simpleJob,我就簡單把程式碼貼一下吧。
專案引入elasticjob大致以下幾個步驟:
1.pom檔案引入相關的依賴
2.編寫我們的定時任務
3.編寫配置檔案
4.配置部署job的管理平臺
下面開始貼程式碼:
第一步:pom引入配置檔案
<!-- 噹噹網分散式定時任務元件 --> <dependency> <groupId>com.dangdang</groupId> <artifactId>elastic-job-lite-core</artifactId> <exclusions> <exclusion> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> </exclusion> <exclusion> <groupId>org.apache.curator</groupId> <artifactId>curator-client</artifactId> </exclusion> <exclusion> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> </exclusion> <exclusion> <groupId>org.apache.curator</groupId> <artifactId>curator-test</artifactId> </exclusion> </exclusions> <version>2.1.5</version> </dependency> <dependency> <groupId>com.dangdang</groupId> <artifactId>elastic-job-lite-spring</artifactId> <version>2.1.5</version> </dependency> <dependency> <groupId>com.dangdang</groupId> <artifactId>elastic-job-common-restful</artifactId> <version>2.1.5</version> </dependency> <!-- 噹噹網分散式定時任務元件 -->
因為是dubbo專案,exclusions主要是解決dubbo與elasticJob在連結,操作zookeeper時的jar衝突問題,這個看版本排除即可。
排除方法可以用STS的如下檢視排查:
第二步:編寫我們的定時任務:
@Data public class SchoolVoteTask implements SimpleJob { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); @Value("${test.name}") private String name; @Autowired private VoteUserVideoInfoService voteVideoService; @Override public void execute(ShardingContext shardingContext) { // 1.當分片數為1時,在同一個zookepper和jobname情況下,多臺機器部署了Elastic // job時,只有拿到shardingContext.getShardingItem()為0的機器得以執行,其他的機器不執行 // 2.當分片數大於1時,假如有3臺伺服器,分成10片,則分片項分配結果為伺服器A=0,1,2;伺服器B=3,4,5;伺服器C=6,7,8,9。此時每臺伺服器可根據拿到的shardingItem值進行相應的處理, // 舉例場景: // 假如job處理資料庫中的資料業務,方法為:A伺服器處理資料庫中Id以0,1,2結尾的資料,B處理資料庫中Id以3,4,5結尾的資料,C處理器處理6,7,8,9結尾的資料,合計處理0-9為全部資料 // 如果伺服器C崩潰,Elastic // Job自動進行進行失效轉移,將C伺服器的分片轉移到A和B伺服器上,則分片項分配結果為伺服器A=0,1,2,3,4;伺服器B=5,6,7,8,9 // 此時,A伺服器處理資料庫中Id以0,1,2,3,4結尾的資料,B處理資料庫中Id以5,6,7,8,9結尾的資料,合計處理0-9為全部資料. int shardingTotalCount = shardingContext.getShardingTotalCount(); String shardingParameter = shardingContext.getShardingParameter(); int shardingItem = shardingContext.getShardingItem(); switch (shardingItem) { case 0: try { expiredVote(); } catch (ParseException e) { e.printStackTrace(); } break; case 1: System.out.println("OrderTask-1-" + shardingTotalCount + shardingParameter); break; case 2: System.out.println("OrderTask-2-" + shardingTotalCount + shardingParameter); break; default: break; } } private void expiredVote() throws ParseException { VoteUserVideoInfoRequestDTO request = new VoteUserVideoInfoRequestDTO(); request.setAuditState("S"); request.setFlag("1"); request.setVoteState("1"); List<VoteUserVideoInfoResponseDTO> list = voteVideoService.findMutil(request); List<Long> idslst = new ArrayList<Long>(); for (int i = 0; i < list.size(); i++) { VoteUserVideoInfoResponseDTO dto = list.get(i); Date auditDate = dto.getAuditTime(); String auditDateStr = sdf.format(auditDate); auditDate = sdf.parse(auditDateStr); Date currentDate = new Date(); String currentDateStr = sdf.format(currentDate); currentDate = sdf.parse(currentDateStr); int a = (int) ((currentDate.getTime() - auditDate.getTime()) / (1000 * 3600 * 24)); if (a > 7) { // System.out.println(dto); idslst.add(dto.getId()); } } if (idslst.size() != 0) { Long[] ids = new Long[idslst.size()]; for (int i = 0; i < idslst.size(); i++) { ids[i] = idslst.get(i); } int result = voteVideoService.expiredVote(ids); System.out.println("result=" + result); } } }
因為我們專案只需要簡單的定時任務,只要實現噹噹的SimpleJob介面即可。
其分片功能可以通過ShardingContext獲取相關引數,比如:
int shardingTotalCount = shardingContext.getShardingTotalCount();
String shardingParameter = shardingContext.getShardingParameter();
int shardingItem = shardingContext.getShardingItem();
shardingItem 便是分片的序號,然後可以根據不同的值,做相應的處理即可。
關於分片處理,要知道下面幾點:
1.當分片數為1時,在同一個zookepper和jobname情況下,多臺機器部署了Elasticjob時,只有拿到shardingContext.getShardingItem()為0的機器得以執行,其他的機器不執行
2.當分片數大於1時,假如有3臺伺服器,分成10片,則分片項分配結果為伺服器A=0,1,2;伺服器B=3,4,5;伺服器C=6,7,8,9。此時每臺伺服器可根據拿到的shardingItem值進行相應的處理,舉例場景:
假如job處理資料庫中的資料業務,方法為:A伺服器處理資料庫中Id以0,1,2結尾的資料,B處理資料庫中Id以3,4,5結尾的資料,C處理器處理6,7,8,9結尾的資料,合計處理0-9為全部資料
如果伺服器C崩潰,ElasticJob自動進行進行失效轉移,將C伺服器的分片轉移到A和B伺服器上,則分片項分配結果為伺服器A=0,1,2,3,4;伺服器B=5,6,7,8,9,此時,A伺服器處理資料庫中Id以0,1,2,3,4結尾的資料,B處理資料庫中Id以5,6,7,8,9結尾的資料,合計處理0-9為全部資料.
第三步:編寫我們的配置檔案:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:p="http://www.springframework.org/schema/p"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:reg="http://www.dangdang.com/schema/ddframe/reg"
xmlns:job="http://www.dangdang.com/schema/ddframe/job"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.3.xsd
http://www.dangdang.com/schema/ddframe/reg
http://www.dangdang.com/schema/ddframe/reg/reg.xsd
http://www.dangdang.com/schema/ddframe/job
http://www.dangdang.com/schema/ddframe/job/job.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-4.3.xsd " default-lazy-init="false">
<!--配置作業註冊中心 -->
<!--
serverLists=10.3.142.107:2181,10.3.142.107:2182,106.3.14.107:2183
namespace=elastic-job-example
baseSleepTimeMilliseconds=1000
maxSleepTimeMilliseconds=3000
maxRetries=3
-->
<reg:zookeeper id="regCenter"
server-lists="${elastic.job.zk.address}"
namespace="${elastic.job.namespace}"
base-sleep-time-milliseconds="${elastic.job.baseSleepTimeMilliseconds}"
max-sleep-time-milliseconds="${elastic.job.maxSleepTimeMilliseconds}"
max-retries="${elastic.job.maxRetries}"
session-timeout-milliseconds="${elastic.job.sessionTimeoutMilliseconds}"
connection-timeout-milliseconds="${elastic.job.connectionTimeoutMilliseconds}" />
<!-- 配置作業-->
<job:simple id="schoolVoteTask"
class="com.ddc.mcn.service.task.SchoolVoteTask"
registry-center-ref="regCenter"
cron="0/10 * * * * ?"
sharding-total-count="3"
sharding-item-parameters="0=A,1=B,2=C"
event-trace-rdb-data-source="dataSource"
overwrite="true" />
<!--
<job:dataflow id="orderDataFlowTask"
class="com.framework.core.job.OrderDataFlowTask"
streaming-process="false"
registry-center-ref="regCenter"
cron="0/10 * * * * ?"
sharding-total-count="3"
sharding-item-parameters="0=A,1=B,2=C" />
-->
</beans>
這裡面簡單的做個說明步驟:
1.引入job的schema,
xmlns:reg="http://www.dangdang.com/schema/ddframe/reg"
xmlns:job="http://www.dangdang.com/schema/ddframe/job"
http://www.dangdang.com/schema/ddframe/reg
http://www.dangdang.com/schema/ddframe/reg/reg.xsd
http://www.dangdang.com/schema/ddframe/job
http://www.dangdang.com/schema/ddframe/job/job.xsd
2.註冊zk配置中心
3.註冊我們的作業任務
第四步:配置job的管理平臺
這個很簡單,只要在官網上下載elastic-job-lite,然後執行bin中的start.bat(window環境)或者start.sh(linux環境)即可。
埠8899,登入賬號,在conf下的檔案中,可以修改等著賬號資訊的
在全部配置中新增zk,連線上以後,在作業操作的作業維度中,就可以看到我們的定時任務了,然後可以在裡面做一些我們需要的修改,都是立即生效的(心跳時間不算哈)
怎麼樣,很簡單吧。