1. 程式人生 > >分散式定時任務之1:elasticjob配置使用

分散式定時任務之1:elasticjob配置使用

最近專案中需要做定時跑批任務,因為是SOA的專案,spring自帶的單節點定時任務已經不能滿足任務,為了快速上線,選擇了當當網開元的elasticjob lite版本。

其實使用很簡單,幾個簡單配置就ok了。

專案中使用的是simpleJob,我就簡單把程式碼貼一下吧。

專案引入elasticjob大致以下幾個步驟:

1.pom檔案引入相關的依賴

2.編寫我們的定時任務

3.編寫配置檔案

4.配置部署job的管理平臺

下面開始貼程式碼:

第一步:pom引入配置檔案

		<!-- 噹噹網分散式定時任務元件 -->
		<dependency>
			<groupId>com.dangdang</groupId>
			<artifactId>elastic-job-lite-core</artifactId>
			<exclusions>
				<exclusion>
					<groupId>org.apache.curator</groupId>
					<artifactId>curator-framework</artifactId>
				</exclusion>
				<exclusion>
					<groupId>org.apache.curator</groupId>
					<artifactId>curator-client</artifactId>
				</exclusion>
				<exclusion>
					<groupId>org.apache.curator</groupId>
					<artifactId>curator-recipes</artifactId>
				</exclusion>
				<exclusion>
					<groupId>org.apache.curator</groupId>
					<artifactId>curator-test</artifactId>
				</exclusion>
			</exclusions>
			<version>2.1.5</version>
		</dependency>
		<dependency>
			<groupId>com.dangdang</groupId>
			<artifactId>elastic-job-lite-spring</artifactId>
			<version>2.1.5</version>
		</dependency>
		<dependency>
			<groupId>com.dangdang</groupId>
			<artifactId>elastic-job-common-restful</artifactId>
			<version>2.1.5</version>
		</dependency>
		<!-- 噹噹網分散式定時任務元件 -->

因為是dubbo專案,exclusions主要是解決dubbo與elasticJob在連結,操作zookeeper時的jar衝突問題,這個看版本排除即可。

排除方法可以用STS的如下檢視排查:

第二步:編寫我們的定時任務:

@Data
public class SchoolVoteTask implements SimpleJob {

	SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");

	@Value("${test.name}")
	private String name;

	@Autowired
	private VoteUserVideoInfoService voteVideoService;

	@Override
	public void execute(ShardingContext shardingContext) {

		// 1.當分片數為1時,在同一個zookepper和jobname情況下,多臺機器部署了Elastic
		// job時,只有拿到shardingContext.getShardingItem()為0的機器得以執行,其他的機器不執行
		// 2.當分片數大於1時,假如有3臺伺服器,分成10片,則分片項分配結果為伺服器A=0,1,2;伺服器B=3,4,5;伺服器C=6,7,8,9。此時每臺伺服器可根據拿到的shardingItem值進行相應的處理,
		// 舉例場景:
		// 假如job處理資料庫中的資料業務,方法為:A伺服器處理資料庫中Id以0,1,2結尾的資料,B處理資料庫中Id以3,4,5結尾的資料,C處理器處理6,7,8,9結尾的資料,合計處理0-9為全部資料
		// 如果伺服器C崩潰,Elastic
		// Job自動進行進行失效轉移,將C伺服器的分片轉移到A和B伺服器上,則分片項分配結果為伺服器A=0,1,2,3,4;伺服器B=5,6,7,8,9
		// 此時,A伺服器處理資料庫中Id以0,1,2,3,4結尾的資料,B處理資料庫中Id以5,6,7,8,9結尾的資料,合計處理0-9為全部資料.

		int shardingTotalCount = shardingContext.getShardingTotalCount();

		String shardingParameter = shardingContext.getShardingParameter();

		int shardingItem = shardingContext.getShardingItem();
		switch (shardingItem) {
		case 0:
			try {
				expiredVote();
			} catch (ParseException e) {
				e.printStackTrace();
			}
			break;
		case 1:
			System.out.println("OrderTask-1-" + shardingTotalCount + shardingParameter);
			break;
		case 2:
			System.out.println("OrderTask-2-" + shardingTotalCount + shardingParameter);
			break;
		default:
			break;
		}

	}

	private void expiredVote() throws ParseException {
		VoteUserVideoInfoRequestDTO request = new VoteUserVideoInfoRequestDTO();
		request.setAuditState("S");
		request.setFlag("1");
		request.setVoteState("1");
		List<VoteUserVideoInfoResponseDTO> list = voteVideoService.findMutil(request);

		List<Long> idslst = new ArrayList<Long>();
		for (int i = 0; i < list.size(); i++) {
			VoteUserVideoInfoResponseDTO dto = list.get(i);
			Date auditDate = dto.getAuditTime();
			String auditDateStr = sdf.format(auditDate);
			auditDate = sdf.parse(auditDateStr);

			Date currentDate = new Date();
			String currentDateStr = sdf.format(currentDate);
			currentDate = sdf.parse(currentDateStr);
			int a = (int) ((currentDate.getTime() - auditDate.getTime()) / (1000 * 3600 * 24));
			if (a > 7) {
				// System.out.println(dto);
				idslst.add(dto.getId());
			}
		}

		if (idslst.size() != 0) {
			Long[] ids = new Long[idslst.size()];
			for (int i = 0; i < idslst.size(); i++) {
				ids[i] = idslst.get(i);
			}
			int result = voteVideoService.expiredVote(ids);
			System.out.println("result=" + result);
		}

	}

}

因為我們專案只需要簡單的定時任務,只要實現噹噹的SimpleJob介面即可。

其分片功能可以通過ShardingContext獲取相關引數,比如:

int shardingTotalCount = shardingContext.getShardingTotalCount();

String shardingParameter = shardingContext.getShardingParameter();

int shardingItem = shardingContext.getShardingItem();

shardingItem 便是分片的序號,然後可以根據不同的值,做相應的處理即可。

關於分片處理,要知道下面幾點:

1.當分片數為1時,在同一個zookepper和jobname情況下,多臺機器部署了Elasticjob時,只有拿到shardingContext.getShardingItem()為0的機器得以執行,其他的機器不執行


2.當分片數大於1時,假如有3臺伺服器,分成10片,則分片項分配結果為伺服器A=0,1,2;伺服器B=3,4,5;伺服器C=6,7,8,9。此時每臺伺服器可根據拿到的shardingItem值進行相應的處理,舉例場景:
假如job處理資料庫中的資料業務,方法為:A伺服器處理資料庫中Id以0,1,2結尾的資料,B處理資料庫中Id以3,4,5結尾的資料,C處理器處理6,7,8,9結尾的資料,合計處理0-9為全部資料
如果伺服器C崩潰,ElasticJob自動進行進行失效轉移,將C伺服器的分片轉移到A和B伺服器上,則分片項分配結果為伺服器A=0,1,2,3,4;伺服器B=5,6,7,8,9,此時,A伺服器處理資料庫中Id以0,1,2,3,4結尾的資料,B處理資料庫中Id以5,6,7,8,9結尾的資料,合計處理0-9為全部資料.

第三步:編寫我們的配置檔案:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
	   xmlns:p="http://www.springframework.org/schema/p"
	   xmlns:context="http://www.springframework.org/schema/context"
	   xmlns:reg="http://www.dangdang.com/schema/ddframe/reg"
	   xmlns:job="http://www.dangdang.com/schema/ddframe/job"
	   xsi:schemaLocation="http://www.springframework.org/schema/beans  
           				   http://www.springframework.org/schema/beans/spring-beans-4.3.xsd
           				   http://www.dangdang.com/schema/ddframe/reg 
                           http://www.dangdang.com/schema/ddframe/reg/reg.xsd 
                           http://www.dangdang.com/schema/ddframe/job 
                           http://www.dangdang.com/schema/ddframe/job/job.xsd  
           				   http://www.springframework.org/schema/context  
           	               http://www.springframework.org/schema/context/spring-context-4.3.xsd " default-lazy-init="false">

	<!--配置作業註冊中心 --> 
	<!-- 
	serverLists=10.3.142.107:2181,10.3.142.107:2182,106.3.14.107:2183
	namespace=elastic-job-example
	baseSleepTimeMilliseconds=1000
	maxSleepTimeMilliseconds=3000
	maxRetries=3
	 --> 
    <reg:zookeeper id="regCenter" 
    			   server-lists="${elastic.job.zk.address}" 
    			   namespace="${elastic.job.namespace}"  
                   base-sleep-time-milliseconds="${elastic.job.baseSleepTimeMilliseconds}" 
                   max-sleep-time-milliseconds="${elastic.job.maxSleepTimeMilliseconds}" 
                   max-retries="${elastic.job.maxRetries}" 
                   session-timeout-milliseconds="${elastic.job.sessionTimeoutMilliseconds}"
                   connection-timeout-milliseconds="${elastic.job.connectionTimeoutMilliseconds}" />
                   
    <!-- 配置作業-->  
    <job:simple id="schoolVoteTask" 
    			class="com.ddc.mcn.service.task.SchoolVoteTask" 
    			registry-center-ref="regCenter"  
    			cron="0/10 * * * * ?" 
                sharding-total-count="3" 
               	sharding-item-parameters="0=A,1=B,2=C"
               	event-trace-rdb-data-source="dataSource"
                overwrite="true" />
                
     <!--     
     <job:dataflow id="orderDataFlowTask"  
     			   class="com.framework.core.job.OrderDataFlowTask" 
     			   streaming-process="false"
     			   registry-center-ref="regCenter"
     			   cron="0/10 * * * * ?"  
     			   sharding-total-count="3"   
     			   sharding-item-parameters="0=A,1=B,2=C" />           
      -->                                  
</beans>

這裡面簡單的做個說明步驟:

1.引入job的schema,

      xmlns:reg="http://www.dangdang.com/schema/ddframe/reg"
      xmlns:job="http://www.dangdang.com/schema/ddframe/job"

      http://www.dangdang.com/schema/ddframe/reg 
      http://www.dangdang.com/schema/ddframe/reg/reg.xsd 
      http://www.dangdang.com/schema/ddframe/job 
      http://www.dangdang.com/schema/ddframe/job/job.xsd 

2.註冊zk配置中心

3.註冊我們的作業任務

第四步:配置job的管理平臺

這個很簡單,只要在官網上下載elastic-job-lite,然後執行bin中的start.bat(window環境)或者start.sh(linux環境)即可。

埠8899,登入賬號,在conf下的檔案中,可以修改等著賬號資訊的

在全部配置中新增zk,連線上以後,在作業操作的作業維度中,就可以看到我們的定時任務了,然後可以在裡面做一些我們需要的修改,都是立即生效的(心跳時間不算哈)

怎麼樣,很簡單吧。