1. 程式人生 > >elastic-job之Simple型別作業實現

elastic-job之Simple型別作業實現

1.什麼是Simple型別作業?

Simple型別作業意為簡單實現,未經任何封裝的型別。需實現SimpleJob介面。該介面僅提供單一方法用於覆蓋,此方法將定時執行。與Quartz原生介面相似,但提供了彈性擴縮容和分片等功能。


2. 建立Simple型別專案,通過API啟動方式呼叫

2.1 首先我們maven建立一個普通的java專案,然後在pom檔案中加入下面這行引入

<dependency>
	<groupId>com.dangdang</groupId>
	<artifactId>elastic-job-lite-core</artifactId>
	<version>2.1.5</version>
</dependency>

2.2 既然是Simple型別,當然要實現SimpleJob介面,那麼我們首先建立一個類ApiMyElasticJobSimple讓他實現SimpleJob介面,然後重寫它的execute方法,其實和Quartz原生介面很相似,這個execute方法裡面就是你要實現的定時任務

package com.lwl.boot.job.simple;
import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.simple.SimpleJob;

/**
 * 任務排程
 *
 */
public class ApiMyElasticJobSimple implements SimpleJob {

	@Override
	public void execute(ShardingContext content) {
		
		int key = content.getShardingItem();
		System.out.println();
		System.out.println("----------------------"+key+"-------------------");
		System.out.println();
		
		switch (key) {
		case 0:
			System.out.println("任務排程執行3: "+key);
			break;
		case 1:
			System.out.println("任務排程執行3: "+key);
			break;
		case 2:
			System.out.println("任務排程執行3: "+key);
			break;

		default:
			System.out.println("沒有任務排程執行");
			break;
		}	
	}
}
2.3 既然有了定時任務,剩下的就是如何啟動它了,因為我們使用的是API的方式啟動,所有我們就直接建立一個類ApiJobSimple,通過該類初始化對應的配置,如何main方法啟動。
package com.lwl.boot.job.simple;

import com.dangdang.ddframe.job.config.JobCoreConfiguration;
import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration;
import com.dangdang.ddframe.job.lite.api.JobScheduler;
import com.dangdang.ddframe.job.lite.api.strategy.impl.AverageAllocationJobShardingStrategy;
import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;
import com.dangdang.ddframe.job.reg.base.CoordinatorRegistryCenter;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperConfiguration;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;


public class ApiJobSimple {

	
	public static void main(String[] args) {
		new JobScheduler(registryCenter(),configuration()).init();
	}
	
	
	private static CoordinatorRegistryCenter registryCenter() {
		//配置zookeeper
		//註冊中心( CoordinatorRegistryCenter ):用於協調分散式服務
		//連線Zookeeper伺服器的列表. 多個地址用逗號分隔. 如: host1:2181,host2:2181
		String serverLists = "localhost:2181";
		//如果你有多個不同 Elastic-Job叢集 時,使用相同 Zookeeper,可以配置不同的 namespace 進行隔離
		String namespace = "elastic-job-demo";
		CoordinatorRegistryCenter registryCenter = 
					new ZookeeperRegistryCenter(new ZookeeperConfiguration(serverLists, namespace));
		registryCenter.init();
		return registryCenter;
	}

	
	private static LiteJobConfiguration configuration() {
		
		 // 定義作業核心配置
		String jobName = "simpleJob"; //作業名稱
		String cron = "0/15 * * * * ?";	//定時器表示式,用於控制作業觸發時間
		int shardingTotalCount = 3; //作業分片總數,如果一個作業啟動超過作業分片總數的節點,只有 shardingTotalCount 會執行作業.換句話說:當伺服器數量大於分片總數,那麼不是所有伺服器都將會執行,而是根據分片總數來定。
		JobCoreConfiguration coreConfiguration = JobCoreConfiguration.newBuilder(jobName, cron, shardingTotalCount).build();
		
		
		// 定義SIMPLE型別配置
		//		意為簡單實現,未經任何封裝的型別。需實現SimpleJob介面。該介面僅提供單一方法用於覆蓋,
		//		此方法將定時執行。與Quartz原生介面相似,但提供了彈性擴縮容和分片等功能。
		//任務執行類名稱:com.lwl.boot.job.simple.ApiMyElasticJobSimple
		String jobClass = ApiMyElasticJobSimple.class.getCanonicalName();
		SimpleJobConfiguration simpleJobConfiguration = new SimpleJobConfiguration(coreConfiguration, jobClass); 
		
		
		 // 定義Lite作業根配置
		//作業分片策略:系統提供三種策略和自定義可供選擇
		/*
		 * 	1. AverageAllocationJobShardingStrategy:基於平均分配演算法的分片策略
		 *  2. OdevitySortByNameJobShardingStrategy:根據作業名的雜湊值奇偶數決定IP升降序演算法的分片策略
		 *  3. RotateServerByNameJobShardingStrategy:根據作業名的雜湊值對伺服器列表進行輪轉的分片策略
		 * 	4. 實現JobShardingStrategy介面,並且實現sharding的方法,
		 * 		介面方法引數為【作業伺服器IP列表和分片策略選項】,【分片策略選項包括作業名稱】,【分片總數以及分片序列號和個性化引數對照表】,可以根據需求定製化自己的分片策略。
		 * */
		//預設的分片策略
		String jobShardingStrategyClass = AverageAllocationJobShardingStrategy.class.getCanonicalName();
		
		
		LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfiguration).jobShardingStrategyClass(jobShardingStrategyClass).build();
		
		
		return simpleJobRootConfig;
	}
		
}
2.3.1 registryCenter()方法

registryCenter()實現的是註冊中心,註冊中心是通過zookeeper實現的,其型別為:ZookeeperRegistryCenter而他需要一個zookeeper的配置ZookeeperConfiguration實體類,在建立ZookeeperConfiguration實體類的時候需要注意他的引數

ZookeeperConfiguration:引數說明(紅色為必傳欄位)

serverLists:連線Zookeeper伺服器的列表,包括IP地址和埠號,多個地址用逗號分隔,如: host1:2181,host2:2181

namespace:Zookeeper的名稱空間,/如果你有多個不同 Elastic-Job叢集 時,使用相同 Zookeeper,可以配置不同的 namespace 進行隔離

baseSleepTimeMilliseconds:等待重試的間隔時間的初始值  單位:毫秒
maxSleepTimeMilliseconds:等待重試的間隔時間的最大值   單位:毫秒
maxRetries:最大重試次數
sessionTimeoutMilliseconds:會話超時時間  單位:毫秒
connectionTimeoutMilliseconds:連線超時時間  單位:毫秒
digest:連線Zookeeper的許可權令牌,預設為不需要許可權驗證

2.3.2 configuration()方法:作業配置,這個方法注意做了三件事情,定義作業核心配置,定義作業型別,定義根配置

作業配置分為3級,分別是JobCoreConfiguration,JobTypeConfiguration和LiteJobConfiguration。LiteJobConfiguration <--- JobTypeConfiguration,JobTypeConfiguration <--- JobCoreConfiguration,層層巢狀

1)定義作業核心配置

作業核心配置:就是起碼要知道我這個定時器叫什麼,什麼時候啟動,有沒有做分片

JobCoreConfiguration:引數說明(紅色為必傳欄位)

jobName:作業名稱

cron:cron表示式,用於控制作業觸發時間

shardingTotalCount:作業分片總數,如果一個作業啟動超過作業分片總數的節點,只有 shardingTotalCount 會執行作業

shardingItemParameters:分片序列號和引數用等號分隔,多個鍵值對用逗號分隔,分片序列號從0開始,不可大於或等於作業分片總數,如:0=a,1=b,2=c

jobParameter:作業自定義引數,作業自定義引數,可通過傳遞該引數為作業排程的業務方法傳參,用於實現帶引數的作業,例:每次獲取的資料量、作業例項從資料庫讀取的主鍵等

failover:是否開啟任務執行失效轉移,開啟表示如果作業在一次任務執行中途宕機,允許將該次未完成的任務在另一作業節點上補償執行

misfire:是否開啟錯過任務重新執行

description:作業描述資訊

jobProperties:配置jobProperties定義的列舉控制Elastic-Job的實現細節,JOB_EXCEPTION_HANDLER用於擴充套件異常處理類,EXECUTOR_SERVICE_HANDLER用於擴充套件作業處理執行緒池類

2)定義作業型別

作業型別主要分為三種:SimpleJob(簡單作業),DataflowJob(資料流作業),ScriptJob(指令碼作業),它們的配置實現類分別對應著:SimpleJobConfiguration,DataflowJobConfiguration,ScriptJobConfiguration

由於使用的是Simple型別,使用就對應SimpleJobConfiguration配置

SimpleJobConfiguration:引數說明(紅色為必傳欄位)

coreConfig:即1)的核心配置類

jobClass:作業實現類,即實現SimpleJob類的全稱,可以通過【類.class.getCanonicalName()】獲取

3) 定義Lite作業根配置

 定義Lite作業根配置主要定義了 對任務排程的監控和使用分片策略
 作業分片策略:系統提供三種策略和自定義可供選擇

1. AverageAllocationJobShardingStrategy:基於平均分配演算法的分片策略

策略說明:

如果分片不能整除,則不能整除的多餘分片將依次追加到序號小的伺服器。如:
如果有3臺伺服器,分成9片,則每臺伺服器分到的分片是:1=[0,1,2], 2=[3,4,5], 3=[6,7,8]
如果有3臺伺服器,分成8片,則每臺伺服器分到的分片是:1=[0,1,6], 2=[2,3,7], 3=[4,5]

如果有3臺伺服器,分成10片,則每臺伺服器分到的分片是:1=[0,1,2,9], 2=[3,4,5], 3=[6,7,8]

2. OdevitySortByNameJobShardingStrategy:根據作業名的雜湊值奇偶數決定IP升降序演算法的分片策略

策略說明:

作業名的雜湊值為奇數則IP升序。
作業名的雜湊值為偶數則IP降序。

用於不同的作業平均分配負載至不同的伺服器。

AverageAllocationJobShardingStrategy的缺點是,一旦分片數小於作業伺服器數,作業將永遠分配至IP地址靠前的伺服器,導致IP地址靠後的伺服器空閒。

而OdevitySortByNameJobShardingStrategy則可以根據作業名稱重新分配伺服器負載。如:

如果有3臺伺服器,分成2片,作業名稱的雜湊值為奇數,則每臺伺服器分到的分片是:1=[0], 2=[1], 3=[]

如果有3臺伺服器,分成2片,作業名稱的雜湊值為偶數,則每臺伺服器分到的分片是:3=[0], 2=[1], 1=[]

3. RotateServerByNameJobShardingStrategy:根據作業名的雜湊值對伺服器列表進行輪轉的分片策略

4. 實現JobShardingStrategy介面,並且實現sharding的方法,介面方法引數為【作業伺服器IP列表和分片策略選項】,【分片策略選項包括作業名稱】,【分片總數以及分片序列號和個性化引數對照表】,可以根據需求定製化自己的分片策略。

LiteJobConfiguration:引數說明(紅色為必傳欄位)

jobConfig:即 2)中的任務型別配置

jobShardingStrategyClass:作業分片策略實現類全路徑,預設使用平均分配策略

monitorExecution:監控作業執行時狀態
每次作業執行時間和間隔時間均非常短的情況,建議不監控作業執行時狀態以提升效率。因為是瞬時狀態,所以無必要監控。請使用者自行增加資料堆積監控。並且不能保證資料重複選取,應在作業中實現冪等性。
每次作業執行時間和間隔時間均較長的情況,建議監控作業執行時狀態,可保證資料不會重複選取。

monitorPort:作業監控埠,建議配置作業監控埠, 方便開發者dump作業資訊。,使用方法: echo “dump” | nc 127.0.0.1 9888

maxTimeDiffSeconds:最大允許的本機與註冊中心的時間誤差秒數,如果時間誤差超過配置秒數則作業啟動時將拋異常,配置為-1表示不校驗時間誤差

reconcileIntervalMinutes:修復作業伺服器不一致狀態服務排程間隔時間,配置為小於1的任意值表示不執行修復    單位:分鐘

eventTraceRdbDataSource:作業事件追蹤的資料來源Bean引用

3. 啟動API,呼叫ApiJobSimple中main方法

當啟動之後,定時器開始正常工作,控制檯打印出的日誌是這樣的:

	----------------------0-------------------

	任務排程執行3: 0

	----------------------1-------------------

	任務排程執行3: 1

	----------------------2-------------------

	任務排程執行3: 2		
	

就是當只有一個定時任務在跑的時候,所有任務都會被他執行,但是當你再啟動一個面方法,之前的那個不要關閉,那麼就出現2個輸出控制檯日誌為:

第一臺的控制檯
	----------------------1-------------------

	任務排程執行3: 1
	
	第二臺的控制檯
	----------------------0-------------------

	任務排程執行3: 0
	----------------------2-------------------

	任務排程執行3: 2	

任務就被分片到這2臺機器上了,預設採用的是平均分配演算法的分片策略,當然你再啟用一下main方法,你就發現他們各自輸出一條記錄,好了就到這裡了。

最後感謝各位觀看,如需轉載請標明原處,我的程式碼已上傳到github上:https://github.com/1181888200/boot-elastic-job

相關推薦

elastic-jobSimple型別作業實現

1.什麼是Simple型別作業?Simple型別作業意為簡單實現,未經任何封裝的型別。需實現SimpleJob介面。該介面僅提供單一方法用於覆蓋,此方法將定時執行。與Quartz原生介面相似,但提供了彈性擴縮容和分片等功能。2. 建立Simple型別專案,通過API啟動方式呼

elastic-jobDataflow型別作業實現

一、前序二、Dataflow是什麼?Dataflow型別用於處理資料流,需實現DataflowJob介面。該介面提供2個方法可供覆蓋,分別用於抓取(fetchData)和處理(processData)資料。三、 怎麼開啟?可通過DataflowJobConfiguration

elastic-job監聽器

每個作業都可以配置一個任務監聽器,確切的說是隻能配置一個本地監聽器和一個分散式監聽器。Elastic-job有三種作業型別,但是它們的通用配置都是一樣的,所以本文在介紹作業的監聽器配置時將僅以簡單作業的配置為例。本地監聽器本地監聽器只在節點執行自己分片的時候排程,每個分片任務

Elastic-Job簡單Job

簡介 elastic-job是噹噹網開源的基於zookeeper和quartz實現的分散式作業排程框架。github地址是https://github.com/dangdangdotcom/elastic-job,官方網站是http://elasticjob.

Elastic-Job何為分散式作業

原文地址:http://dangdangdotcom.github.io/elastic-job/post/1.x/distribution/ 何為分散式作業? 分片概念 任務的分散式執行,需要將一個任務拆分為n個獨立的任務項,然後由分散式的伺服器分別執行某一個或

Elastic-Job-Lite 原始碼閱讀 ---- 作業執行

作業執行的核心流程:  因為使用了 Quartz,任務執行是實現了 Quartz 的 Job 介面:  public final class LiteJob implements Job { @Setter private ElasticJob elasticJob

Elastic-Job原始碼解析(二)定時核心實現quartz

Elastic-Job是一個分散式定時任務框架,其內部的定時主要是利用quartz來實現,而Elastic-Job核心是對quartz進行了封裝,並提供了分散式任務的功能。具體是怎麼實現呢? 怎麼實現分散式呢? 主要是通過Zookeeper通訊,獲取任務伺服器ip地址,並通

Elastic Job 入門教程(二)— Spring Boot框架下是實現Elastic Job 指令碼作業(Script Job

在Elastic Job 入門教程(一)— 與Spring Boot整合這篇文章中,我們簡單介紹了Spring Boot與Elastic Job 的整合,並簡單實現了SimpleJob型別作業。本章,我

elastic-job--作業型別

elastic-job提供了三種類型的作業: Simple型別作業 SimpleJob需要實現SimpleJob介面,意為簡單實現,未經過任何封裝,與quartz原生介面相似,比如示例程式碼中所使用的job。 Dataflow型別作業 Dataflow型別用於

Elastic-Job-Lite詳解作業排程

JobScheduler是elastic-job作業排程的關鍵類,也是起始類,在包com.dangdang.ddframe.job.lite.api下。排程任務的執行需要包含兩大步驟:任務的配置和任務的註冊。JobScheduler的建構函式除了任務配置和註冊相關資訊之

Elastic-Job-Lite 源碼分析 —— 作業分片策略

哈希 AD hash alloc hub strings put iat 總數 摘要: 原創出處 http://www.iocoder.cn/Elastic-Job/job-sharding-strategy/ 「芋道源碼」歡迎轉載,保留摘要,謝謝! 本文基於 Elast

Elastic-Job原始碼解析(三)分片定時任務執行

通過本篇的閱讀你將學會了解Elastic-Job的定時時機,及如何通過分片方式做一個分散式的定時任務框架。瞭解常用的三種分片策略,及如何自定義分散式分片策略 目錄 Elastic-Job如何通過SpringJobScheduler啟動定時 Ela

Elastic-Job原始碼解析(一)與Spring完美整合

看過小編寫SpringFramework原始碼解析的同學應該對Spring支援自定義標籤還有點印象吧,沒有的話我們回顧下,然後看看Elastic-Job是如何巧妙的利用自定義標籤生成Job任務的吧。請注意這裡用了一個巧妙關鍵字。我們看它如何巧妙的吧。 Spring自定義

詳解當當網的分散式作業框架elastic-job

轉自:http://www.infoq.com/cn/articles/dangdang-distributed-work-framework-elastic-job 作業的必要性以及存在的問題 為什麼需要作業? 作業即定時任務。一般來說,系統可使用訊息傳遞代替部分使用作業的場

Elastic-job實戰(分散式作業排程框架)

就拿一個場景來說吧,如果我們的專案是部署到多臺機器上,那麼某一時刻,我們的定時任務肯定每臺機器上都會執行一遍,那這肯定不是我們想要的結果,我們只希望有一臺機器能執行。 一.前言 Elastic job是噹噹網架構師基於Zookepper、Quartz開發並開源的

springboot2.0+Elastic Job動態job配置實現微服務呼叫

springboot2.0+Elastic Job實現微服務呼叫 本猿第一次寫部落格,寫的不好勿噴 一、前提 本文只介紹如何把elastic-job和springcloud微服務整合,不對其他知識做介紹,需先掌握以下知識 1.會使用springboot 2.會使用springcl

Elastic Job 定時任務實現

官方文件:http://dangdangdotcom.github.io/elastic-job/elastic-job-lite/00-overview/intro/ 該說的文件上都說了;在過程中遇到一些錯誤記下了 環境:zookeeper版本 zookeeper-3.

Elastic Job入門示例-實現DataflowJob介面

1.接上篇內容:https://blog.csdn.net/seanme/article/details/802564602.本次介紹流式處理任務型別 * 流式任務型別:業務實現兩個介面:抓取(fetchData)和處理(processData)資料 * a.流式處

Elastic-Job是否支援動態新增做作業

原文地址:http://dangdangdotcom.github.io/elastic-job/post/faq/ 6. 是否支援動態新增作業? 回答: 動態新增作業這個概念每個人理解不盡相同。 elast

Elastic-Job作業執行狀態監聽

原文地址:http://dangdangdotcom.github.io/elastic-job/post/1.x/execution_monitor/ 作業執行狀態監控 通過監聽elastic-job的zookeeper註冊中心的幾個關鍵節點即可完成作業執行狀態監控