ElasticJob -- 分散式作業排程
Elastic-Job是一個分散式排程解決方案,由兩個相互獨立的子專案Elastic-Job-Lite和Elastic-Job-Cloud組成。
Elastic-Job-Lite定位為輕量級無中心化解決方案,使用jar包的形式提供最輕量級的分散式任務的協調服務,外部依賴僅Zookeeper。
Elastic-Job-Lite定位為輕量級無中心化解決方案,使用jar包的形式提供分散式任務的協調服務。
專案開源地址:https://github.com/dangdangdotcom/elastic-job
場景分析:
任務的分散式執行,需要將一個任務拆分為多個獨立的任務項,然後由分散式的伺服器分別執行某一個或幾個分片項。
場景1:有一個遍歷資料庫某張表的作業,現有2臺伺服器。為了快速的執行作業,那麼每臺伺服器應執行作業的50%。 為滿足此需求,可將作業分成2片,每臺伺服器執行1片。作業遍歷資料的邏輯應為:伺服器A遍歷ID以奇數結尾的資料;伺服器B遍歷ID以偶數結尾的資料。 如果分成10片,則作業遍歷資料的邏輯應為:每片分到的分片項應為ID%10,而伺服器A被分配到分片項0,1,2,3,4;伺服器B被分配到分片項5,6,7,8,9,直接的結果就是伺服器A遍歷ID以0-4結尾的資料;伺服器B遍歷ID以5-9結尾的資料。
場景2:餘額寶裡的昨日收益,系統需要job在每天某個時間點開始,給所有餘額寶使用者計算收益。如果使用者數量不多,我們可以輕易使用quartz來完成,我們讓計息job在某個時間點開始執行,迴圈遍歷所有使用者計算利息,這沒問題。可是,如果使用者體量特別大,我們可能會面臨著在第二天之前處理不完這麼多使用者。另外,我們部署job的時候也得注意,我們可能會把job直接放在我們的webapp裡,webapp通常是多節點部署的,這樣,我們的job也就是多節點,多個job同時執行,很容易造成重複執行,比如使用者重複計息,為了避免這種情況,我們可能會對job的執行加鎖,保證始終只有一個節點能執行,或者乾脆讓job從webapp裡剝離出來,獨自部署一個節點。
elastic-job就可以幫助我們解決上面的問題,elastic底層的任務排程還是使用的quartz,通過zookeeper來動態給job節點分片
整體架構圖
Elastic-Job-Lite
Elastic-Job-Cloud
作業啟動流程
-
彈性分散式實現
-
第一臺伺服器上線觸發主伺服器選舉。主伺服器一旦下線,則重新觸發選舉,選舉過程中阻塞,只有主伺服器選舉完成,才會執行其他任務。
-
某作業伺服器上線時會自動將伺服器資訊註冊到註冊中心,下線時會自動更新伺服器狀態。
-
主節點選舉,伺服器上下線,分片總數變更均更新重新分片標記。
-
定時任務觸發時,如需重新分片,則通過主伺服器分片,分片過程中阻塞,分片結束後才可執行任務。如分片過程中主伺服器下線,則先選舉主伺服器,再分片。
-
通過4可知,為了維持作業執行時的穩定性,執行過程中只會標記分片狀態,不會重新分片。分片僅可能發生在下次任務觸發前。
-
每次分片都會按伺服器IP排序,保證分片結果不會產生較大波動。
-
實現失效轉移功能,在某臺伺服器執行完畢後主動抓取未分配的分片,並且在某臺伺服器下線後主動尋找可用的伺服器執行任務。
-
作業執行流程
應用:
1. 引入框架的jar包
<!-- 引入elastic-job-lite核心模組 -->
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-core</artifactId>
<version>2.0.5</version>
</dependency>
<!-- 使用springframework自定義名稱空間時引入 -->
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-spring</artifactId>
<version>2.0.5</version>
</dependency>
2. 構建job
public class MyTask implements SimpleJob{
public void execute(ShardingContext context) {
System.out.println("定時任務測試");
}
}
3. spring 配置
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:reg="http://www.dangdang.com/schema/ddframe/reg"
xmlns:job="http://www.dangdang.com/schema/ddframe/job"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.dangdang.com/schema/ddframe/reg
http://www.dangdang.com/schema/ddframe/reg/reg.xsd
http://www.dangdang.com/schema/ddframe/job
http://www.dangdang.com/schema/ddframe/job/job.xsd
">
<!-- 配置註冊中心 ,任務的資訊都會在zk中儲存 -->
<reg:zookeeper id="regCenter" server-lists="127.0.0.1:2181" namespace="test-job"
base-sleep-time-milliseconds="1000" max-sleep-time-milliseconds="3000" max-retries="3" />
<!-- 配置簡單作業 -->
<job:simple id="myTask"
class="com.xxx.MyTask"
registry-center-ref="regCenter" cron="0 10 * * * ?"
sharding-total-count="1" overwrite="true"><!-- 分片為1,即不需要分片;支援覆蓋,即會用本次的配置覆蓋快取在zk中的配置 -->
<job:event-log /><!-- job執行日誌記錄到log -->
<job:event-rdb driver="${ds1.jdbc.driver_class_name}" <!-- job執行日誌記錄到DB, 詳細參考:http://dangdangdotcom.github.io/elastic-job/post/user_guide/common/event_trace/-->
url="${ds1.jdbc.url}" username="${ds1.jdbc.username}" password="${ds1.jdbc.password}"
log-level="INFO" />
</job:simple>
</beans>
分片:
public interface SimpleJob extends ElasticJob {
/**
* 執行作業.
*
* @param shardingContext 分片上下文
*/
void execute(ShardingContext shardingContext);
}
注意這裡面有一個shardingContext引數,看下原始碼:
/**
* 分片上下文.
*
* @author zhangliang
*/
@Getter
@ToString
public final class ShardingContext {
/**
* 作業名稱.
*/
private final String jobName;
/**
* 作業任務ID.
*/
private final String taskId;
/**
* 分片總數.
*/
private final int shardingTotalCount;
/**
* 作業自定義引數.
* 可以配置多個相同的作業, 但是用不同的引數作為不同的排程例項.
*/
private final String jobParameter;
/**
* 分配於本作業例項的分片項.
*/
private final int shardingItem;
/**
* 分配於本作業例項的分片引數.
*/
private final String shardingParameter;
public ShardingContext(final ShardingContexts shardingContexts, final int shardingItem) {
jobName = shardingContexts.getJobName();
taskId = shardingContexts.getTaskId();
shardingTotalCount = shardingContexts.getShardingTotalCount();
jobParameter = shardingContexts.getJobParameter();
this.shardingItem = shardingItem;
shardingParameter = shardingContexts.getShardingItemParameters().get(shardingItem);
}
}
這裡面有2個很重要的屬性:shardingTotalCount 分片總數(比如:2)、shardingItem 當前分片索引(比如:1),前面提到的效能擴容,就可以根據2個引數進行簡單的處理,假設在電商系統中,每天晚上有個定時任務,要統計每家店的銷量。商家id一般在表設計上是一個自增數字,如果總共2個分片(注:通常也就是部署2個節點),可以把 id為奇數的放到分片0,id為偶數的放到分片1,這樣2個機器各跑一半,相對只有1臺機器而言,就快多了。
虛擬碼如下:
public class TestJob implements SimpleJob {
@Override
public void execute(ShardingContext shardingContext) {
int shardIndx = shardingContext.getShardingItem();
if (shardIndx == 0) {
//處理id為奇數的商家
} else {
//處理id為偶數的商家
}
}
}
這個還可以進一步簡化,如果使用mysql查詢商家列表,mysql中有一個mod函式,直接可以對商家id進行取模運算
select * from shop where mod(shop_id,2)=0
如果把上面的2、0換成引數,mybatis中類似這樣:
select * from shop where mod(shop_id,#{shardTotal})=#{shardIndex}
作業型別:
elastic-job提供了三種類型的作業:Simple型別作業、Dataflow型別作業、Script型別作業。這裡主要講解前兩者。Script型別作業意為指令碼型別作業,支援shell,python,perl等所有型別指令碼,使用不多,可以參見github文件。
SimpleJob需要實現SimpleJob介面,意為簡單實現,未經過任何封裝,與quartz原生介面相似,比如示例程式碼中所使用的job。
Dataflow型別用於處理資料流,需實現DataflowJob介面。該介面提供2個方法可供覆蓋,分別用於抓取(fetchData)和處理(processData)資料。
可通過DataflowJobConfiguration配置是否流式處理。
流式處理資料只有fetchData方法的返回值為null或集合長度為空時,作業才停止抓取,否則作業將一直執行下去; 非流式處理資料則只會在每次作業執行過程中執行一次fetchData方法和processData方法,隨即完成本次作業。
實際開發中,Dataflow型別的job還是很有好用的。
public class MyDataFlowJob implements DataflowJob<User> {
/*
status
0:待處理
1:已處理
*/
@Override
public List<User> fetchData(ShardingContext shardingContext) {
List<User> users = null;
/**
* users = SELECT * FROM user WHERE status = 0 AND MOD(id, shardingTotalCount) = shardingItem Limit 0, 30
*/
return users;
}
@Override
public void processData(ShardingContext shardingContext, List<User> data) {
for (User user: data) {
user.setStatus(1);
/**
* update user
*/
}
}
}
<job:dataflow id="myDataFlowJob" class="com.fanfan.sample001.MyDataFlowJob" registry-center-ref="regCenter"
sharding-total-count="2" cron="0 0 02 * * ?" streaming-process="true" overwrite="true" />
控制檯:
elastic-job還提供了一個不錯的UI控制檯,專案原始碼git clone到本地,mvn install就能得到一個elastic-job-lite-console-${version}.tar.gz的包,解壓,然後執行裡面的bin/start.sh 就能跑起來,介面類似如下:
-
作業詳細資訊頁
通過這個控制檯,可以動態調整每個定時任務的觸發時間(即:cornExpress)。詳情可參考官網文件-運維平臺部分。
Refrence:
https://www.cnblogs.com/yjmyzz/p/elastic-job-tutorial.html
https://github.com/elasticjob/elastic-job-lite
https://www.cnblogs.com/wyb628/p/7682580.html