1. 程式人生 > >Elastic-Job與spring整合

Elastic-Job與spring整合

參考官網:https://gitee.com/elasticjob/elastic-job-example

1.引入jar

<dependency>
    <artifactId>elastic-job-lite-core</artifactId>
    <groupId>com.dangdang</groupId>
    <version>${elastic-job.version}</version>
</dependency>

使用Spring配置啟動
 <dependency>
    <
artifactId>elastic-job-lite-spring</artifactId> <groupId>com.dangdang</groupId> <version>${elastic-job.version}</version> </dependency>

2.配置job

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi
="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:reg="http://www.dangdang.com/schema/ddframe/reg" xmlns:job="http://www.dangdang.com/schema/ddframe/job" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.dangdang.com/schema/ddframe/reg http://www.dangdang.com/schema/ddframe/reg/reg.xsd http://www.dangdang.com/schema/ddframe/job http://www.dangdang.com/schema/ddframe/job/job.xsd "> <context:component-scan base-package="com.alen"/> <context:property-placeholder location="classpath:conf/*.properties"/> <bean id="elasticJobLog" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close"> <property name="driverClassName" value="${event.rdb.driver}"/> <property name="url" value="${event.rdb.url}"/> <property name="username" value="${event.rdb.username}"/> <property name="password" value="${event.rdb.password}"/> </bean> <!--配置作業註冊中心 --> <reg:zookeeper id="regCenter" server-lists="${serverLists}" namespace="${namespace}" base-sleep-time-milliseconds="${baseSleepTimeMilliseconds}" max-sleep-time-milliseconds="${maxSleepTimeMilliseconds}" max-retries="${maxRetries}"/> <!-- 配置簡單作業--> <!-- event-trace-rdb-data-source 配置帶作業資料庫事件追蹤的簡單作業--> <job:simple id="${simple.id}" class="${simple.class}" registry-center-ref="regCenter" sharding-total-count="${simple.shardingTotalCount}" cron="${simple.cron}" sharding-item-parameters="${simple.shardingItemParameters}" monitor-execution="${simple.monitorExecution}" monitor-port="${simple.monitorPort}" failover="${simple.failover}" description="${simple.description}" disabled="${simple.disabled}" overwrite="${simple.overwrite}" job-sharding-strategy-class="" event-trace-rdb-data-source="elasticJobLog" job-parameter="${simple.job-parameter}" /> <bean id="yourRefJobBeanId" class="com.alen.job.simple.MySimpleRefElasticJob"> </bean> <!-- 配置關聯Bean作業--> <!--job-ref String 否 作業關聯的beanId,該配置優先順序大於class屬性配置--> <job:simple id="simpleRefElasticJob" job-ref="yourRefJobBeanId" registry-center-ref="regCenter" cron="${simple.cron}" sharding-total-count="${simple.shardingTotalCount}" sharding-item-parameters="${simple.shardingItemParameters}" /> <!-- 配置資料流作業--> <job:dataflow id="${dataflow.id}" class="${dataflow.class}" registry-center-ref="regCenter" sharding-total-count="${dataflow.shardingTotalCount}" cron="${dataflow.cron}" sharding-item-parameters="${dataflow.shardingItemParameters}" monitor-execution="${dataflow.monitorExecution}" failover="${dataflow.failover}" max-time-diff-seconds="${dataflow.maxTimeDiffSeconds}" streaming-process="${dataflow.streamingProcess}" description="${dataflow.description}" disabled="${dataflow.disabled}" overwrite="${dataflow.overwrite}" job-parameter="${dataflow.job-parameter}" /> <!-- 配置指令碼作業--> <job:script id="${script.id}" registry-center-ref="regCenter" script-command-line="${script.scriptCommandLine}" sharding-total-count="${script.shardingTotalCount}" cron="${script.cron}" sharding-item-parameters="${script.shardingItemParameters}" description="${script.description}" overwrite="${script.overwrite}" /> <!-- 配置帶監聽的簡單作業--> <!-- <job:simple id="listenerElasticJob" class="xxx.MySimpleListenerElasticJob" registry-center-ref="regCenter" cron="0/10 * * * * ?" sharding-total-count="3" sharding-item-parameters="0=A,1=B,2=C"> <job:listener class="xx.MySimpleJobListener"/> <job:distributed-listener class="xx.MyOnceSimpleJobListener" started-timeout-milliseconds="1000" completed-timeout-milliseconds="2000"/> </job:simple>--> <import resource="spring-mybatis.xml"/> </beans>
    job.properties
event.rdb.driver=org.h2.Driver
event.rdb.url=jdbc:h2:mem:job_event_storage
event.rdb.username=sa
event.rdb.password=

listener.simple=com.dangdang.ddframe.job.example.listener.SpringSimpleListener
listener.distributed=com.dangdang.ddframe.job.example.listener.SpringSimpleDistributeListener
listener.distributed.startedTimeoutMilliseconds=1000
listener.distributed.completedTimeoutMilliseconds=3000
#1.cron:cron表示式,用於配置作業觸發時間
#2.shardingTotalCount:作業分片總數
#3.shardingItemParameters:分片序列號和引數用等號分隔,多個鍵值對用逗號分隔分片序列號從0開始,不可大於或等於作業分片總數如:0=a,1=b,2=c
#4.maxTimeDiffSeconds:最大允許的本機與註冊中心的時間誤差秒數如果時間誤差超過配置秒數則作業啟動時將拋異常配置為-1表示不校驗時間誤差
#5.failover:是否開啟失效轉移僅monitorExecution開啟,失效轉移才有效
#6.processCountIntervalSeconds:統計作業處理資料數量的間隔時間,單位:秒
#7.description:作業描述資訊
#8.disabled:作業是否禁止啟動,可用於部署作業時,先禁用啟動,部署結束後統一啟動
#9.overwrite:本地配置是否可覆蓋註冊中心配置,如果可覆蓋,每次啟動作業都以本地配置為
simple.id=springSimpleJob
simple.class=com.alen.job.simple.SpringSimpleJob
simple.cron=0/5 * * * * ?
simple.shardingTotalCount=3
simple.shardingItemParameters=0=1001,1=1002,2=1003
simple.monitorExecution=false
simple.failover=true
simple.description=意為簡單實現,未經任何封裝的型別。需實現SimpleJob介面。該介面僅提供單一方法用於覆蓋,此方法將定時執行。與Quartz原生介面相似,但提供了彈性擴縮容和分片等功能。
simple.disabled=false
simple.overwrite=true
simple.monitorPort=9888
simple.job-parameter="測試簡單任務"
dataflow.id=springDataflowJob
dataflow.class=com.alen.job.dataflow.SpringDataflowJob
dataflow.cron=0/5 * * * * ?
dataflow.shardingTotalCount=3
dataflow.shardingItemParameters=0=1001,1=1002,2=1003
dataflow.maxTimeDiffSeconds=-1
dataflow.monitorExecution=true
dataflow.failover=true
dataflow.streamingProcess=true
dataflow.description=Dataflow型別用於處理資料流,需實現DataflowJob介面。該介面提供2個方法可供覆蓋,分別用於抓取(fetchData)和處理(processData)資料。
dataflow.disabled=false
dataflow.overwrite=true
dataflow.job-parameter="測試流式任務"
script.id=springScriptJob
# need absolute path
script.scriptCommandLine=D:/workFile/test/elastic-job-example-master/elastic-job-example-lite-spring/src/main/resources/script/demo.bat
#  \/elastic-job/elastic-job-example/elastic-job-example-lite-spring/src/main/resources/script/demo.sh
script.cron=0/5 * * * * ?
script.shardingTotalCount=3
script.shardingItemParameters=0=1001,1=1002,2=1003
script.description=Script Job
script.overwrite=true

3.任務類

/*
 * Copyright 1999-2015 dangdang.com.
 * <p>
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 * </p>
 */

package com.alen.job.simple;

import com.alen.entity.MessageDO;
import com.alen.entity.fixture.entity.Foo;
import com.alen.entity.fixture.repository.FooRepository;
import com.alen.service.MessageService;
import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.util.List;

public class SpringSimpleJob implements SimpleJob {


    @Autowired
    private  MessageService  messageService;

    @Override
    public void execute(final ShardingContext shardingContext) {
        System.out.println("SpringSimpleJob 簡單任務-------------任務名:"+shardingContext.getJobName()+"\n"
                +",---ShardingParameter:"+shardingContext.getShardingParameter()+"\n"
                +",----TaskId:"+shardingContext.getTaskId()+"\n"
                +",----JobParameter:"+shardingContext.getJobParameter()+"\n"
                +",----tShardingItem:"+shardingContext.getShardingItem()+"\n"
                +",----ShardingTotalCount:"+shardingContext.getShardingTotalCount()+"\n"
        );
        //根據JobParameter,ShardingParameter ,ShardingItem來對應資料,片與資料的關係
        List<MessageDO> messageDOList=messageService.getMessage(shardingContext.getShardingParameter());
        if(messageDOList!=null){
            for (MessageDO messageDO: messageDOList) {
                System.out.println(
                        "id為:"+messageDO.getId()+"\n" +
                        ",----tShardingItem:"+shardingContext.getShardingItem()+"\n"
                );
                messageService.updateStatus(messageDO);
            }
        }
    }
}

/*
 * Copyright 1999-2015 dangdang.com.
 * <p>
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 * </p>
 */

package com.alen.job.dataflow;

import com.alen.entity.MessageDO;
import com.alen.entity.fixture.entity.Foo;
import com.alen.entity.fixture.repository.FooRepository;
import com.alen.service.MessageService;
import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.dataflow.DataflowJob;
import org.springframework.beans.factory.annotation.Autowired;

import javax.annotation.Resource;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;

public class SpringDataflowJob implements DataflowJob<MessageDO> {

    @Autowired
    private MessageService messageService;


   /* 流式處理資料只有fetchData方法的返回值為null或集合長度為空時,作業才停止抓取,否則作業將一直執行下去;
    非流式處理資料則只會在每次作業執行過程中執行一次fetchData方法和processData方法,隨即完成本次作業。
    如果採用流式作業處理方式,建議processData處理資料後更新其狀態,
    避免fetchData再次抓取到,從而使得作業永不停止。
    流式資料處理參照TbSchedule設計,適用於不間歇的資料處理。*/
    @Override
    public List<MessageDO> fetchData(final ShardingContext shardingContext) {
        System.out.println("SpringDataflowJob Dataflow型別作業-------------任務名:"+shardingContext.getJobName()+"\n"
                +",---ShardingParameter:"+shardingContext.getShardingParameter()+"\n"
                +",----TaskId:"+shardingContext.getTaskId()+"\n"
                +",----JobParameter:"+shardingContext.getJobParameter()+"\n"
                +",----tShardingItem:"+shardingContext.getShardingItem()+"\n"
                +",----ShardingTotalCount:"+shardingContext.getShardingTotalCount()+"\n"
        );

        List<MessageDO> messageDOList=messageService.getMessage(shardingContext.getShardingParameter());

        return messageDOList;
    }
    
    @Override
    public void processData(final ShardingContext shardingContext, final List<MessageDO> data) {
        if(data!=null){
            for (MessageDO messageDO: data) {
                System.out.println(
                        "id為:"+messageDO.getId()+"\n" +
                                ",----tShardingItem:"+shardingContext.getShardingItem()+"\n"
                );
                messageService.updateStatus(messageDO);
            }
        }
    }
}

啟動類
public final class SpringMain {
    
    private static final int EMBED_ZOOKEEPER_PORT = 5181;
    
    // CHECKSTYLE:OFF
    public static void main(final String[] args) {
    // CHECKSTYLE:ON
        // 僅用於執行Elastic-Job的例子時無需額外啟動Zookeeper. 如有必要, 請使用本地環境可用的Zookeeper代替.
        EmbedZookeeperServer.start(EMBED_ZOOKEEPER_PORT);
        new ClassPathXmlApplicationContext("classpath:META-INF/applicationContext.xml");
    }
}