Elastic-Job開發指南
開發指南
程式碼開發
作業型別
目前提供3種作業型別,分別是Simple,DataFlow和Script。
DataFlow型別用於處理資料流,它又提供2種作業型別,分別是ThroughputDataFlow和SequenceDataFlow。需要繼承相應的抽象類。
Script型別用於處理指令碼,可直接使用,無需編碼。
方法引數shardingContext包含作業配置,分片和執行時資訊。可通過getShardingTotalCount(),getShardingItems()等方法分別獲取分片總數,執行在本作業伺服器的分片序列號集合等。
Simple型別作業
Simple型別作業意為簡單實現,未經任何封裝的型別。需要繼承AbstractSimpleElasticJob,該類只提供了一個方法用於覆蓋,此方法將被定時執行。用於執行普通的定時任務,與Quartz原生介面相似,只是增加了彈性擴縮容和分片等功能。
public class JobMain { public static void main(final String[] args) { long startTimeoutMills = 5000L; long completeTimeoutMills = 10000L; new JobScheduler(regCenter, jobConfig, new MyDistributeOnceElasticJobListener(startTimeoutMills, completeTimeoutMills)).init(); } }
ThroughputDataFlow型別作業
ThroughputDataFlow型別作業意為高吞吐的資料流作業。需要繼承AbstractInpidualThroughputDataFlowElasticJob並可以指定返回值泛型,該類提供3個方法可覆蓋,分別用於抓取資料,處理資料和指定是否流式處理資料。可以獲取資料處理成功失敗次數等輔助監控資訊。如果流式處理資料,fetchData方法的返回值只有為null或長度為空時,作業才會停止執行,否則作業會一直執行下去;非流式處理資料則只會在每次作業執行過程中執行一次fetchData方法和processData方法,即完成本次作業。流式資料處理參照TbSchedule設計,適用於不間歇的資料處理。
作業執行時會將fetchData的資料傳遞給processData處理,其中processData得到的資料是通過多執行緒(執行緒池大小可配)拆分的。如果採用流式作業處理方式,建議processData處理資料後更新其狀態,避免fetchData再次抓取到,從而使得作業永遠不會停止。processData的返回值用於表示資料是否處理成功,丟擲異常或者返回false將會在統計資訊中歸入失敗次數,返回true則歸入成功次數。
public class MyElasticJob extends AbstractInpidualThroughputDataFlowElasticJob { @Override public List fetchData(JobExecutionMultipleShardingContext context) { Map<integer, string=""> offset = context.getOffsets(); List result = // get data from database by sharding items and by offset return result; } @Override public boolean processData(JobExecutionMultipleShardingContext context, Foo data) { // process data // ... // store offset for (int each : context.getShardingItems()) { updateOffset(each, "your offset, maybe id"); } return true; } }
SequenceDataFlow型別作業
SequenceDataFlow型別作業和ThroughputDataFlow作業型別極為相似,所不同的是ThroughputDataFlow作業型別可以將獲取到的資料多執行緒處理,但不會保證多執行緒處理資料的順序。如:從2個分片共獲取到100條資料,第1個分片40條,第2個分片60條,配置為兩個執行緒處理,則第1個執行緒處理前50條資料,第2個執行緒處理後50條資料,無視分片項;SequenceDataFlow型別作業則根據當前伺服器所分配的分片項數量進行多執行緒處理,每個分片項使用同一執行緒處理,防止了同一分片的資料被多執行緒處理,從而導致的順序問題。如:從2個分片共獲取到100條資料,第1個分片40條,第2個分片60條,則系統自動分配兩個執行緒處理,第1個執行緒處理第1個分片的40條資料,第2個執行緒處理第2個分片的60條資料。由於ThroughputDataFlow作業可以使用多於分片項的任意執行緒數處理,所以效能調優的可能會優於SequenceDataFlow作業。
public class MyElasticJob extends AbstractInpidualSequenceDataFlowElasticJob { @Override public List fetchData(JobExecutionSingleShardingContext context) { int offset = context.getOffset(); List result = // get data from database by sharding items and by offset return result; } @Override public boolean processData(JobExecutionSingleShardingContext context, Foo data) { // process data // ... // store offset updateOffset(context.getShardingItem(), "your offset, maybe id"); return true; } }
Script型別作業
Script型別作業意為指令碼型別作業,支援shell,Python,perl等所有型別指令碼。只需通過控制檯/程式碼配置scriptCommandLine即可。執行指令碼路徑可以包含引數,最後一個引數為作業執行時資訊.
#!/bin/bash
echo sharding execution context is $*
作業執行時輸出
sharding execution context is {"shardingItems":[0,1,2,3,4,5,6,7,8,9],"shardingItemParameters":{},"offsets":{},"jobName":"scriptElasticDemoJob","shardingTotalCount":10,"jobParameter":"","monitorExecution":true,"fetchDataCount":1}
批量處理
為了提高資料處理效率,資料流型別作業提供了批量處理資料的功能。之前逐條處理資料的兩個抽象類分別是AbstractInpidualThroughputDataFlowElasticJob和AbstractInpidualSequenceDataFlowElasticJob,批量處理則使用另外兩個介面AbstractBatchThroughputDataFlowElasticJob和AbstractBatchSequenceDataFlowElasticJob。不同之處在於processData方法的返回值從boolean型別變為int型別,用於表示一批資料處理的成功數量,第二個入參則轉變為List資料集合。
異常處理
elastic-job在最上層介面提供了handleJobExecutionException方法,使用作業時可以覆蓋此方法,並使用quartz提供的JobExecutionException控制異常後作業的宣告週期。預設實現是直接將異常丟擲。示例:
任務監聽配置
可以通過配置多個任務監聽器,在任務執行前和執行後執行監聽的方法。監聽器分為每臺作業節點均執行和分散式場景中僅單一節點執行兩種。
每臺作業節點均執行的監聽
若作業處理作業伺服器的檔案,處理完成後刪除檔案,可考慮使用每個節點均執行清理任務。此型別任務實現簡單,且無需考慮全域性分散式任務是否完成,請儘量使用此型別監聽器。
步驟:
定義監聽器
import com.dangdang.ddframe.job.api.JobExecutionMultipleShardingContext; import com.dangdang.ddframe.job.api.listener.ElasticJobListener; public class MyElasticJobListener implements ElasticJobListener { @Override public void beforeJobExecuted(final JobExecutionMultipleShardingContext shardingContext) { // do something ... } @Override public void afterJobExecuted(final JobExecutionMultipleShardingContext shardingContext) { // do something ... } }
將監聽器作為引數傳入JobScheduler
public class JobMain { public static void main(final String[] args) { new JobScheduler(regCenter, jobConfig, new MyElasticJobListener()).init(); } }
分散式場景中僅單一節點執行的監聽
若作業處理資料庫資料,處理完成後只需一個節點完成資料清理任務即可。此型別任務處理複雜,需同步分散式環境下作業的狀態同步,提供了超時設定來避免作業不同步導致的死鎖,請謹慎使用。
步驟:
定義監聽器
import com.dangdang.ddframe.job.api.JobExecutionMultipleShardingContext; import com.dangdang.ddframe.job.api.listener.AbstractDistributeOnceElasticJobListener; public final class TestDistributeOnceElasticJobListener extends AbstractDistributeOnceElasticJobListener { public TestDistributeOnceElasticJobListener(final long startTimeoutMills, final long completeTimeoutMills) { super(startTimeoutMills, completeTimeoutMills); } @Override public void doBeforeJobExecutedAtLastStarted(final JobExecutionMultipleShardingContext shardingContext) { // do something ... } @Override public void doAfterJobExecutedAtLastCompleted(final JobExecutionMultipleShardingContext shardingContext) { // do something ... } }
將監聽器作為引數傳入JobScheduler
public class JobMain { public static void main(final String[] args) { long startTimeoutMills = 5000L; long completeTimeoutMills = 10000L; new JobScheduler(regCenter, jobConfig, new MyDistributeOnceElasticJobListener(startTimeoutMills, completeTimeoutMills)).init(); } }
作業配置
與spring容器配合使用作業,可以將作業Bean配置為Spring Bean,可在作業中通過依賴注入使用Spring容器管理的資料來源等物件。可用placeholder佔位符從屬性檔案中取值。
Spring名稱空間配置
job:simple名稱空間屬性詳細說明
屬性名 | 型別 | 是否必填 | 預設值 | 描述 |
---|---|---|---|---|
id | String | 是 | 作業名稱 | |
class | String | 否 | 作業實現類,需實現ElasticJob介面,指令碼型作業不需要配置 | |
registry-center-ref | String | 是 | 註冊中心Bean的引用,需引用reg:zookeeper的宣告 | |
cron | String | 是 | cron表示式,用於配置作業觸發時間 | |
sharding-total-count | int | 是 | 作業分片總數 | |
sharding-item-parameters | String | 否 | 分片序列號和引數用等號分隔,多個鍵值對用逗號分隔 分片序列號從0開始,不可大於或等於作業分片總數 如: 0=a,1=b,2=c |
|
job-parameter | String | 否 | 作業自定義引數 可以配置多個相同的作業,但是用不同的引數作為不同的排程例項 |
|
monitor-execution | boolean | 否 | true | 監控作業執行時狀態 每次作業執行時間和間隔時間均非常短的情況,建議不監控作業執行時狀態以提升效率。因為是瞬時狀態,所以無必要監控。請使用者自行增加資料堆積監控。並且不能保證資料重複選取,應在作業中實現冪等性。 每次作業執行時間和間隔時間均較長的情況,建議監控作業執行時狀態,可保證資料不會重複選取。 |
monitor-port | int | 否 | -1 | 作業監控埠 建議配置作業監控埠, 方便開發者dump作業資訊。 使用方法: echo “dump” | nc 127.0.0.1 9888 |
max-time-diff-seconds | int | 否 | -1 | 最大允許的本機與註冊中心的時間誤差秒數 如果時間誤差超過配置秒數則作業啟動時將拋異常 配置為-1表示不校驗時間誤差 |
failover | boolean | 否 | false | 是否開啟失效轉移 僅monitorExecution開啟,失效轉移才有效 |
misfire | boolean | 否 | true | 是否開啟錯過任務重新執行 |
job-sharding-strategy-class | String | 否 | true | 作業分片策略實現類全路徑 預設使用平均分配策略 詳情參見:作業分片策略 |
description | String | 否 | 作業描述資訊 | |
disabled | boolean | 否 | false | 作業是否禁止啟動 可用於部署作業時,先禁止啟動,部署結束後統一啟動 |
overwrite | boolean | 否 | false | 本地配置是否可覆蓋註冊中心配置 如果可覆蓋,每次啟動作業都以本地配置為準 |
job:dataflow名稱空間屬性詳細說明
job:dataflow名稱空間擁有job:simple名稱空間的全部屬性,以下僅列出特有屬性
屬性名 | 型別 | 是否必填 | 預設值 | 描述 |
---|---|---|---|---|
process-count-interval-seconds | int | 否 | 300 | 統計作業處理資料數量的間隔時間 單位:秒 |
concurrent-data-process-thread-count | int | 否 | CPU核數*2 | 同時處理資料的併發執行緒數 不能小於1 僅ThroughputDataFlow作業有效 |
fetch-data-count | int | 否 | 1 | 每次抓取的資料量 |
streaming-process | boolean | 否 | false | 是否流式處理資料 如果流式處理資料, 則fetchData不返回空結果將持續執行作業 如果非流式處理資料, 則處理資料完成後作業結束 |
job:script名稱空間屬性詳細說明,基本屬性參照job:simple名稱空間屬性詳細說明
job:script名稱空間擁有job:simple名稱空間的全部屬性,以下僅列出特有屬性
屬性名 | 型別 | 是否必填 | 預設值 | 描述 |
---|---|---|---|---|
script-command-line | String | 否 | 指令碼型作業執行命令列 |
job:listener名稱空間屬性詳細說明
job:listener必須配置為job:bean的子元素
屬性名 | 型別 | 是否必填 | 預設值 | 描述 |
---|---|---|---|---|
class | String | 是 | 前置後置任務監聽實現類,需實現ElasticJobListener介面 | |
started-timeout-milliseconds | long | 否 | Long.MAX_VALUE | AbstractDistributeOnceElasticJobListener型監聽器,最後一個作業執行前的執行方法的超時時間 單位:毫秒 |
completed-timeout-milliseconds | long | 否 | Long.MAX_VALUE | AbstractDistributeOnceElasticJobListener型監聽器,最後一個作業執行後的執行方法的超時時間 單位:毫秒 |
reg:bean名稱空間屬性詳細說明
屬性名 | 型別 | 是否必填 | 預設值 | 描述 |
---|---|---|---|---|
id | String | 是 | 註冊中心在Spring容器中的主鍵 | |
server-lists | String | 是 | 連線Zookeeper伺服器的列表 包括IP地址和埠號 多個地址用逗號分隔 如: host1:2181,host2:2181 |
|
namespace | String | 是 | Zookeeper的名稱空間 | |
base-sleep-time-milliseconds | int | 否 | 1000 | 等待重試的間隔時間的初始值 單位:毫秒 |
max-sleep-time-milliseconds | int | 否 | 3000 | 等待重試的間隔時間的最大值 單位:毫秒 |
max-retries | int | 否 | 3 | 最大重試次數 |
session-timeout-milliseconds | int | 否 | 60000 | 會話超時時間 單位:毫秒 |
connection-timeout-milliseconds | int | 否 | 15000 | 連線超時時間 單位:毫秒 |
digest | String | 否 | 無驗證 | 連線Zookeeper的許可權令牌 預設為不需要許可權驗證 |
不使用Spring配置
如果不使用Spring框架,可以用如下方式啟動作業。
import com.dangdang.ddframe.job.api.config.JobConfiguration; import com.dangdang.ddframe.job.api.JobScheduler; import com.dangdang.ddframe.reg.base.CoordinatorRegistryCenter; import com.dangdang.ddframe.reg.zookeeper.ZookeeperConfiguration; import com.dangdang.ddframe.reg.zookeeper.ZookeeperRegistryCenter; import com.dangdang.example.elasticjob.core.job.SimpleJobDemo; import com.dangdang.example.elasticjob.core.job.ThroughputDataFlowJobDemo; import com.dangdang.example.elasticjob.core.job.SequenceDataFlowJobDemo; import com.dangdang.ddframe.job.plugin.job.type.integrated.ScriptElasticJob; public class JobDemo { // 定義Zookeeper註冊中心配置物件 private ZookeeperConfiguration zkConfig = new ZookeeperConfiguration("localhost:2181", "elastic-job-example", 1000, 3000, 3); // 定義Zookeeper註冊中心 private CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(zkConfig); // 定義簡單作業配置物件 private final SimpleJobConfiguration simpleJobConfig = JobConfigurationFactory.createSimpleJobConfigurationBuilder("simpleElasticDemoJob", SimpleJobDemo.class, 10, "0/30 * * * * ?").build(); // 定義高吞吐流式處理的資料流作業配置物件 private final DataFlowJobConfiguration throughputJobConfig = JobConfigurationFactory.createDataFlowJobConfigurationBuilder("throughputDataFlowElasticDemoJob", ThroughputDataFlowJobDemo.class, 10, "0/5 * * * * ?").streamingProcess(true).build(); // 定義順序的資料流作業配置物件 private final DataFlowJobConfiguration sequenceJobConfig = JobConfigurationFactory.createDataFlowJobConfigurationBuilder("sequenceDataFlowElasticDemoJob", SequenceDataFlowJobDemo.class, 10, "0/5 * * * * ?").build(); // 定義指令碼作業配置物件 private final ScriptJobConfiguration scriptJobConfig = JobConfigurationFactory.createScriptJobConfigurationBuilder("scriptElasticDemoJob", 10, "0/5 * * * * ?", "test.sh").build(); public static void main(final String[] args) { new JobDemo().init(); } private void init() { // 連線註冊中心 regCenter.init(); // 啟動簡單作業 new JobScheduler(regCenter, simpleJobConfig).init(); // 啟動高吞吐流式處理的資料流作業 new JobScheduler(regCenter, throughputJobConfig).init(); // 啟動順序的資料流作業 new JobScheduler(regCenter, sequenceJobConfig).init(); // 啟動指令碼作業 new JobScheduler(regCenter, scriptJobConfig).init(); } }
轉載:https://www.2cto.com/kf/201611/566016.html