elastic-job之Simple型別作業實現
1.什麼是Simple型別作業?
Simple型別作業意為簡單實現,未經任何封裝的型別。需實現SimpleJob介面。該介面僅提供單一方法用於覆蓋,此方法將定時執行。與Quartz原生介面相似,但提供了彈性擴縮容和分片等功能。
2. 建立Simple型別專案,通過API啟動方式呼叫
2.1 首先我們maven建立一個普通的java專案,然後在pom檔案中加入下面這行引入
<dependency> <groupId>com.dangdang</groupId> <artifactId>elastic-job-lite-core</artifactId> <version>2.1.5</version> </dependency>
2.2 既然是Simple型別,當然要實現SimpleJob介面,那麼我們首先建立一個類ApiMyElasticJobSimple讓他實現SimpleJob介面,然後重寫它的execute方法,其實和Quartz原生介面很相似,這個execute方法裡面就是你要實現的定時任務
2.3 既然有了定時任務,剩下的就是如何啟動它了,因為我們使用的是API的方式啟動,所有我們就直接建立一個類ApiJobSimple,通過該類初始化對應的配置,如何main方法啟動。package com.lwl.boot.job.simple; import com.dangdang.ddframe.job.api.ShardingContext; import com.dangdang.ddframe.job.api.simple.SimpleJob; /** * 任務排程 * */ public class ApiMyElasticJobSimple implements SimpleJob { @Override public void execute(ShardingContext content) { int key = content.getShardingItem(); System.out.println(); System.out.println("----------------------"+key+"-------------------"); System.out.println(); switch (key) { case 0: System.out.println("任務排程執行3: "+key); break; case 1: System.out.println("任務排程執行3: "+key); break; case 2: System.out.println("任務排程執行3: "+key); break; default: System.out.println("沒有任務排程執行"); break; } } }
2.3.1 registryCenter()方法package com.lwl.boot.job.simple; import com.dangdang.ddframe.job.config.JobCoreConfiguration; import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration; import com.dangdang.ddframe.job.lite.api.JobScheduler; import com.dangdang.ddframe.job.lite.api.strategy.impl.AverageAllocationJobShardingStrategy; import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration; import com.dangdang.ddframe.job.reg.base.CoordinatorRegistryCenter; import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperConfiguration; import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter; public class ApiJobSimple { public static void main(String[] args) { new JobScheduler(registryCenter(),configuration()).init(); } private static CoordinatorRegistryCenter registryCenter() { //配置zookeeper //註冊中心( CoordinatorRegistryCenter ):用於協調分散式服務 //連線Zookeeper伺服器的列表. 多個地址用逗號分隔. 如: host1:2181,host2:2181 String serverLists = "localhost:2181"; //如果你有多個不同 Elastic-Job叢集 時,使用相同 Zookeeper,可以配置不同的 namespace 進行隔離 String namespace = "elastic-job-demo"; CoordinatorRegistryCenter registryCenter = new ZookeeperRegistryCenter(new ZookeeperConfiguration(serverLists, namespace)); registryCenter.init(); return registryCenter; } private static LiteJobConfiguration configuration() { // 定義作業核心配置 String jobName = "simpleJob"; //作業名稱 String cron = "0/15 * * * * ?"; //定時器表示式,用於控制作業觸發時間 int shardingTotalCount = 3; //作業分片總數,如果一個作業啟動超過作業分片總數的節點,只有 shardingTotalCount 會執行作業.換句話說:當伺服器數量大於分片總數,那麼不是所有伺服器都將會執行,而是根據分片總數來定。 JobCoreConfiguration coreConfiguration = JobCoreConfiguration.newBuilder(jobName, cron, shardingTotalCount).build(); // 定義SIMPLE型別配置 // 意為簡單實現,未經任何封裝的型別。需實現SimpleJob介面。該介面僅提供單一方法用於覆蓋, // 此方法將定時執行。與Quartz原生介面相似,但提供了彈性擴縮容和分片等功能。 //任務執行類名稱:com.lwl.boot.job.simple.ApiMyElasticJobSimple String jobClass = ApiMyElasticJobSimple.class.getCanonicalName(); SimpleJobConfiguration simpleJobConfiguration = new SimpleJobConfiguration(coreConfiguration, jobClass); // 定義Lite作業根配置 //作業分片策略:系統提供三種策略和自定義可供選擇 /* * 1. AverageAllocationJobShardingStrategy:基於平均分配演算法的分片策略 * 2. OdevitySortByNameJobShardingStrategy:根據作業名的雜湊值奇偶數決定IP升降序演算法的分片策略 * 3. RotateServerByNameJobShardingStrategy:根據作業名的雜湊值對伺服器列表進行輪轉的分片策略 * 4. 實現JobShardingStrategy介面,並且實現sharding的方法, * 介面方法引數為【作業伺服器IP列表和分片策略選項】,【分片策略選項包括作業名稱】,【分片總數以及分片序列號和個性化引數對照表】,可以根據需求定製化自己的分片策略。 * */ //預設的分片策略 String jobShardingStrategyClass = AverageAllocationJobShardingStrategy.class.getCanonicalName(); LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfiguration).jobShardingStrategyClass(jobShardingStrategyClass).build(); return simpleJobRootConfig; } }
registryCenter()實現的是註冊中心,註冊中心是通過zookeeper實現的,其型別為:ZookeeperRegistryCenter而他需要一個zookeeper的配置ZookeeperConfiguration實體類,在建立ZookeeperConfiguration實體類的時候需要注意他的引數
ZookeeperConfiguration:引數說明(紅色為必傳欄位)
serverLists:連線Zookeeper伺服器的列表,包括IP地址和埠號,多個地址用逗號分隔,如: host1:2181,host2:2181
namespace:Zookeeper的名稱空間,/如果你有多個不同 Elastic-Job叢集 時,使用相同 Zookeeper,可以配置不同的 namespace 進行隔離
baseSleepTimeMilliseconds:等待重試的間隔時間的初始值 單位:毫秒
maxSleepTimeMilliseconds:等待重試的間隔時間的最大值 單位:毫秒
maxRetries:最大重試次數
sessionTimeoutMilliseconds:會話超時時間 單位:毫秒
connectionTimeoutMilliseconds:連線超時時間 單位:毫秒
digest:連線Zookeeper的許可權令牌,預設為不需要許可權驗證
2.3.2 configuration()方法:作業配置,這個方法注意做了三件事情,定義作業核心配置,定義作業型別,定義根配置
作業配置分為3級,分別是JobCoreConfiguration,JobTypeConfiguration和LiteJobConfiguration。LiteJobConfiguration <--- JobTypeConfiguration,JobTypeConfiguration <--- JobCoreConfiguration,層層巢狀
1)定義作業核心配置
作業核心配置:就是起碼要知道我這個定時器叫什麼,什麼時候啟動,有沒有做分片
JobCoreConfiguration:引數說明(紅色為必傳欄位)
jobName:作業名稱
cron:cron表示式,用於控制作業觸發時間
shardingTotalCount:作業分片總數,如果一個作業啟動超過作業分片總數的節點,只有 shardingTotalCount 會執行作業
shardingItemParameters:分片序列號和引數用等號分隔,多個鍵值對用逗號分隔,分片序列號從0開始,不可大於或等於作業分片總數,如:0=a,1=b,2=c
jobParameter:作業自定義引數,作業自定義引數,可通過傳遞該引數為作業排程的業務方法傳參,用於實現帶引數的作業,例:每次獲取的資料量、作業例項從資料庫讀取的主鍵等
failover:是否開啟任務執行失效轉移,開啟表示如果作業在一次任務執行中途宕機,允許將該次未完成的任務在另一作業節點上補償執行
misfire:是否開啟錯過任務重新執行
description:作業描述資訊
jobProperties:配置jobProperties定義的列舉控制Elastic-Job的實現細節,JOB_EXCEPTION_HANDLER用於擴充套件異常處理類,EXECUTOR_SERVICE_HANDLER用於擴充套件作業處理執行緒池類2)定義作業型別
作業型別主要分為三種:SimpleJob(簡單作業),DataflowJob(資料流作業),ScriptJob(指令碼作業),它們的配置實現類分別對應著:SimpleJobConfiguration,DataflowJobConfiguration,ScriptJobConfiguration
由於使用的是Simple型別,使用就對應SimpleJobConfiguration配置SimpleJobConfiguration:引數說明(紅色為必傳欄位)
coreConfig:即1)的核心配置類
jobClass:作業實現類,即實現SimpleJob類的全稱,可以通過【類.class.getCanonicalName()】獲取
3) 定義Lite作業根配置
定義Lite作業根配置主要定義了 對任務排程的監控和使用分片策略
作業分片策略:系統提供三種策略和自定義可供選擇
1. AverageAllocationJobShardingStrategy:基於平均分配演算法的分片策略
策略說明:
如果分片不能整除,則不能整除的多餘分片將依次追加到序號小的伺服器。如:
如果有3臺伺服器,分成9片,則每臺伺服器分到的分片是:1=[0,1,2], 2=[3,4,5], 3=[6,7,8]
如果有3臺伺服器,分成8片,則每臺伺服器分到的分片是:1=[0,1,6], 2=[2,3,7], 3=[4,5]如果有3臺伺服器,分成10片,則每臺伺服器分到的分片是:1=[0,1,2,9], 2=[3,4,5], 3=[6,7,8]
2. OdevitySortByNameJobShardingStrategy:根據作業名的雜湊值奇偶數決定IP升降序演算法的分片策略
策略說明:
作業名的雜湊值為奇數則IP升序。
作業名的雜湊值為偶數則IP降序。用於不同的作業平均分配負載至不同的伺服器。
AverageAllocationJobShardingStrategy的缺點是,一旦分片數小於作業伺服器數,作業將永遠分配至IP地址靠前的伺服器,導致IP地址靠後的伺服器空閒。
而OdevitySortByNameJobShardingStrategy則可以根據作業名稱重新分配伺服器負載。如:
如果有3臺伺服器,分成2片,作業名稱的雜湊值為奇數,則每臺伺服器分到的分片是:1=[0], 2=[1], 3=[]如果有3臺伺服器,分成2片,作業名稱的雜湊值為偶數,則每臺伺服器分到的分片是:3=[0], 2=[1], 1=[]
3. RotateServerByNameJobShardingStrategy:根據作業名的雜湊值對伺服器列表進行輪轉的分片策略
4. 實現JobShardingStrategy介面,並且實現sharding的方法,介面方法引數為【作業伺服器IP列表和分片策略選項】,【分片策略選項包括作業名稱】,【分片總數以及分片序列號和個性化引數對照表】,可以根據需求定製化自己的分片策略。
LiteJobConfiguration:引數說明(紅色為必傳欄位)
jobConfig:即 2)中的任務型別配置
jobShardingStrategyClass:作業分片策略實現類全路徑,預設使用平均分配策略
monitorExecution:監控作業執行時狀態
每次作業執行時間和間隔時間均非常短的情況,建議不監控作業執行時狀態以提升效率。因為是瞬時狀態,所以無必要監控。請使用者自行增加資料堆積監控。並且不能保證資料重複選取,應在作業中實現冪等性。
每次作業執行時間和間隔時間均較長的情況,建議監控作業執行時狀態,可保證資料不會重複選取。monitorPort:作業監控埠,建議配置作業監控埠, 方便開發者dump作業資訊。,使用方法: echo “dump” | nc 127.0.0.1 9888
maxTimeDiffSeconds:最大允許的本機與註冊中心的時間誤差秒數,如果時間誤差超過配置秒數則作業啟動時將拋異常,配置為-1表示不校驗時間誤差
reconcileIntervalMinutes:修復作業伺服器不一致狀態服務排程間隔時間,配置為小於1的任意值表示不執行修復 單位:分鐘
eventTraceRdbDataSource:作業事件追蹤的資料來源Bean引用
3. 啟動API,呼叫ApiJobSimple中main方法
當啟動之後,定時器開始正常工作,控制檯打印出的日誌是這樣的:
----------------------0------------------- 任務排程執行3: 0 ----------------------1------------------- 任務排程執行3: 1 ----------------------2------------------- 任務排程執行3: 2
就是當只有一個定時任務在跑的時候,所有任務都會被他執行,但是當你再啟動一個面方法,之前的那個不要關閉,那麼就出現2個輸出控制檯日誌為:
第一臺的控制檯 ----------------------1------------------- 任務排程執行3: 1 第二臺的控制檯 ----------------------0------------------- 任務排程執行3: 0 ----------------------2------------------- 任務排程執行3: 2
任務就被分片到這2臺機器上了,預設採用的是平均分配演算法的分片策略,當然你再啟用一下main方法,你就發現他們各自輸出一條記錄,好了就到這裡了。
最後感謝各位觀看,如需轉載請標明原處,我的程式碼已上傳到github上:https://github.com/1181888200/boot-elastic-job
相關推薦
elastic-job之Simple型別作業實現
1.什麼是Simple型別作業?Simple型別作業意為簡單實現,未經任何封裝的型別。需實現SimpleJob介面。該介面僅提供單一方法用於覆蓋,此方法將定時執行。與Quartz原生介面相似,但提供了彈性擴縮容和分片等功能。2. 建立Simple型別專案,通過API啟動方式呼
elastic-job之Dataflow型別作業實現
一、前序二、Dataflow是什麼?Dataflow型別用於處理資料流,需實現DataflowJob介面。該介面提供2個方法可供覆蓋,分別用於抓取(fetchData)和處理(processData)資料。三、 怎麼開啟?可通過DataflowJobConfiguration
elastic-job之監聽器
每個作業都可以配置一個任務監聽器,確切的說是隻能配置一個本地監聽器和一個分散式監聽器。Elastic-job有三種作業型別,但是它們的通用配置都是一樣的,所以本文在介紹作業的監聽器配置時將僅以簡單作業的配置為例。本地監聽器本地監聽器只在節點執行自己分片的時候排程,每個分片任務
Elastic-Job之簡單Job
簡介 elastic-job是噹噹網開源的基於zookeeper和quartz實現的分散式作業排程框架。github地址是https://github.com/dangdangdotcom/elastic-job,官方網站是http://elasticjob.
Elastic-Job何為分散式作業
原文地址:http://dangdangdotcom.github.io/elastic-job/post/1.x/distribution/ 何為分散式作業? 分片概念 任務的分散式執行,需要將一個任務拆分為n個獨立的任務項,然後由分散式的伺服器分別執行某一個或
Elastic-Job-Lite 原始碼閱讀 ---- 作業執行
作業執行的核心流程: 因為使用了 Quartz,任務執行是實現了 Quartz 的 Job 介面: public final class LiteJob implements Job { @Setter private ElasticJob elasticJob
Elastic-Job原始碼解析(二)之定時核心實現quartz
Elastic-Job是一個分散式定時任務框架,其內部的定時主要是利用quartz來實現,而Elastic-Job核心是對quartz進行了封裝,並提供了分散式任務的功能。具體是怎麼實現呢? 怎麼實現分散式呢? 主要是通過Zookeeper通訊,獲取任務伺服器ip地址,並通
Elastic Job 入門教程(二)— Spring Boot框架下是實現Elastic Job 指令碼作業(Script Job)
在Elastic Job 入門教程(一)— 與Spring Boot整合這篇文章中,我們簡單介紹了Spring Boot與Elastic Job 的整合,並簡單實現了SimpleJob型別作業。本章,我
elastic-job--作業型別
elastic-job提供了三種類型的作業: Simple型別作業 SimpleJob需要實現SimpleJob介面,意為簡單實現,未經過任何封裝,與quartz原生介面相似,比如示例程式碼中所使用的job。 Dataflow型別作業 Dataflow型別用於
Elastic-Job-Lite詳解之作業排程
JobScheduler是elastic-job作業排程的關鍵類,也是起始類,在包com.dangdang.ddframe.job.lite.api下。排程任務的執行需要包含兩大步驟:任務的配置和任務的註冊。JobScheduler的建構函式除了任務配置和註冊相關資訊之
Elastic-Job-Lite 源碼分析 —— 作業分片策略
哈希 AD hash alloc hub strings put iat 總數 摘要: 原創出處 http://www.iocoder.cn/Elastic-Job/job-sharding-strategy/ 「芋道源碼」歡迎轉載,保留摘要,謝謝! 本文基於 Elast
Elastic-Job原始碼解析(三)之分片定時任務執行
通過本篇的閱讀你將學會了解Elastic-Job的定時時機,及如何通過分片方式做一個分散式的定時任務框架。瞭解常用的三種分片策略,及如何自定義分散式分片策略 目錄 Elastic-Job如何通過SpringJobScheduler啟動定時 Ela
Elastic-Job原始碼解析(一)之與Spring完美整合
看過小編寫SpringFramework原始碼解析的同學應該對Spring支援自定義標籤還有點印象吧,沒有的話我們回顧下,然後看看Elastic-Job是如何巧妙的利用自定義標籤生成Job任務的吧。請注意這裡用了一個巧妙關鍵字。我們看它如何巧妙的吧。 Spring自定義
詳解當當網的分散式作業框架elastic-job
轉自:http://www.infoq.com/cn/articles/dangdang-distributed-work-framework-elastic-job 作業的必要性以及存在的問題 為什麼需要作業? 作業即定時任務。一般來說,系統可使用訊息傳遞代替部分使用作業的場
Elastic-job實戰(分散式作業排程框架)
就拿一個場景來說吧,如果我們的專案是部署到多臺機器上,那麼某一時刻,我們的定時任務肯定每臺機器上都會執行一遍,那這肯定不是我們想要的結果,我們只希望有一臺機器能執行。 一.前言 Elastic job是噹噹網架構師基於Zookepper、Quartz開發並開源的
springboot2.0+Elastic Job動態job配置實現微服務呼叫
springboot2.0+Elastic Job實現微服務呼叫 本猿第一次寫部落格,寫的不好勿噴 一、前提 本文只介紹如何把elastic-job和springcloud微服務整合,不對其他知識做介紹,需先掌握以下知識 1.會使用springboot 2.會使用springcl
Elastic Job 定時任務實現
官方文件:http://dangdangdotcom.github.io/elastic-job/elastic-job-lite/00-overview/intro/ 該說的文件上都說了;在過程中遇到一些錯誤記下了 環境:zookeeper版本 zookeeper-3.
Elastic Job入門示例-實現DataflowJob介面
1.接上篇內容:https://blog.csdn.net/seanme/article/details/802564602.本次介紹流式處理任務型別 * 流式任務型別:業務實現兩個介面:抓取(fetchData)和處理(processData)資料 * a.流式處
Elastic-Job是否支援動態新增做作業
原文地址:http://dangdangdotcom.github.io/elastic-job/post/faq/ 6. 是否支援動態新增作業? 回答: 動態新增作業這個概念每個人理解不盡相同。 elast
Elastic-Job作業執行狀態監聽
原文地址:http://dangdangdotcom.github.io/elastic-job/post/1.x/execution_monitor/ 作業執行狀態監控 通過監聽elastic-job的zookeeper註冊中心的幾個關鍵節點即可完成作業執行狀態監控