【elastic-job】elastic-job部署以及簡單例子
阿新 • • 發佈:2019-01-06
一、elastic-job是什麼
elastic-job是噹噹開發的基於qutarz以及zookeeper封裝的作業排程工具,主要有兩個大框架,一個是elastic-job lite另外一個是elastic-job cloud,其中qutarz是一個開源的作業排程工具,zookeeper是分散式排程工具,這兩者結合搭建了elastic-job-lite,這是一個無中心節點的排程,而elastic-job-cloud是一個有中心節點的分散式排程開源工具,只需要設定好機器以及分片,就可以自動的排程到對應的機器上執行,與lite的不同時cloud採用了mesos來進行分散式資源管理,簡單的來說兩者的不同是:同一個作業在兩臺機器上跑,lite需要手動在兩臺機器上跑,但是cloud只需要上傳作業包,就可以自動的在兩臺機器上跑,因為lite不支援作業的排程,為無中心的。二、環境的搭建
(1)jdk的安裝
(2)zookeeper的安裝
(3)maven的安裝
官網maven要求3.0.4以及以上,具體的安裝過程與jdk類似,請自行百度三、elastic-job-lite的優勢及特點
(1)簡單的概念及適用場景
1. 分片概念
任務的分散式執行,需要將一個任務拆分為n個獨立的任務項,然後由分散式的伺服器分別執行某一個或幾個分片項。 例如:有一個遍歷資料庫某張表的作業,現有2臺伺服器。為了快速的執行作業,那麼每臺伺服器應執行作業的50%。 為滿足此需求,可將作業分成2片,每臺伺服器執行1片。作業遍歷資料的邏輯應為:伺服器A遍歷ID以奇數結尾的資料;伺服器B遍歷ID以偶數結尾的資料。 如果分成10片,則作業遍歷資料的邏輯應為:每片分到的分片項應為ID%10,而伺服器A被分配到分片項0,1,2,3,4;伺服器B被分配到分片項5,6,7,8,9,直接的結果就是伺服器A遍歷ID以0-4結尾的資料;伺服器B遍歷ID以5-9結尾的資料。2. 分片項與業務處理解耦
3. 個性化引數的適用場景
個性化引數即shardingItemParameter,可以和分片項匹配對應關係,用於將分片項的數字轉換為更加可讀的業務程式碼。 例如:按照地區水平拆分資料庫,資料庫A是北京的資料;資料庫B是上海的資料;資料庫C是廣州的資料。 如果僅按照分片項配置,開發者需要了解0表示北京;1表示上海;2表示廣州。 合理使用個性化引數可以讓程式碼更可讀,如果配置為0=北京,1=上海,2=廣州,那麼程式碼中直接使用北京,上海,廣州的列舉值即可完成分片項和業務邏輯的對應關係。(2)elastic-job-lite優勢及特點
1. 分散式排程
Elastic-Job-Lite並無作業排程中心節點,而是基於部署作業框架的程式在到達相應時間點時各自觸發排程。 註冊中心僅用於作業註冊和監控資訊儲存。而主作業節點僅用於處理分片和清理等功能。2. 作業高可用
Elastic-Job-Lite提供最安全的方式執行作業。將分片總數設定為1,並使用多於1臺的伺服器執行作業,作業將會以1主n從的方式執行。 一旦執行作業的伺服器崩潰,等待執行的伺服器將會在下次作業啟動時替補執行。開啟失效轉移功能效果更好,可以保證在本次作業執行時崩潰,備機立即啟動替補執行。3. 最大限度利用資源
Elastic-Job-Lite也提供最靈活的方式,最大限度的提高執行作業的吞吐量。將分片項設定為大於伺服器的數量,最好是大於伺服器倍數的數量,作業將會合理的利用分散式資源,動態的分配分片項。 例如:3臺伺服器,分成10片,則分片項分配結果為伺服器A=0,1,2;伺服器B=3,4,5;伺服器C=6,7,8,9。 如果伺服器C崩潰,則分片項分配結果為伺服器A=0,1,2,3,4;伺服器B=5,6,7,8,9。在不丟失分片項的情況下,最大限度的利用現有資源提高吞吐量。三、簡單的例子
elastic-job的作業型別分為三種,一種是簡單的simple的形式,一種是基於流式資料的處理,一種是基於指令碼的排程,因為本人所使用的情況是基於流式的處理,那麼就簡單搭了一個基於流式的demo,其他型別的類似 流式作業的方式適合於不間斷的資料處理的型別,例如需要拉取訂單資料,因為訂單是連續不間斷的,因此需要一直拉取。 按照elastic-job官網上介紹,搭建一個基於dataflow(流式處理)的demo,這個demo的功能就是,從一個數據中心裡面取資料,按照資料中心的資料id%分片個數==分片引數進行拉取資料,拉取完成後將對應的資料id置為完成的狀態,具體程式碼如下所示:(1)入口函式main函式以及作業的配置
package ElasticJobExample.ElasticJobExample;
import com.dangdang.ddframe.job.config.JobCoreConfiguration;
import com.dangdang.ddframe.job.config.dataflow.DataflowJobConfiguration;
import com.dangdang.ddframe.job.lite.api.JobScheduler;
import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;
import com.dangdang.ddframe.job.reg.base.CoordinatorRegistryCenter;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperConfiguration;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
/**
* Hello world!
*
*/
public class App
{
public static void main(String[] args) {
new JobScheduler(createRegistryCenter(), createJobConfiguration()).init();
}
private static CoordinatorRegistryCenter createRegistryCenter() {
CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(new ZookeeperConfiguration("ip:2181", "elastic-job-demo"));
regCenter.init();
return regCenter;
}
private static LiteJobConfiguration createJobConfiguration() {
// 建立作業配置
JobCoreConfiguration coreConfig = JobCoreConfiguration.newBuilder("myDataFlowTest", "0/10 * * * * ?", 3).shardingItemParameters("0=0,1=1,2=2").build();
DataflowJobConfiguration dataflowJobConfig = new DataflowJobConfiguration(coreConfig, JavaDataflowJob.class.getCanonicalName(), true);
LiteJobConfiguration result = LiteJobConfiguration.newBuilder(dataflowJobConfig).build();
return result;
}
}
(2)作業的邏輯處理部分
package ElasticJobExample.ElasticJobExample;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.dataflow.DataflowJob;
import dataflowjob.entity.Foo;
import dataflowjob.process.DataProcess;
import dataflowjob.process.DataProcessFactory;
public class JavaDataflowJob implements DataflowJob<Foo> {
private DataProcess dataProcess = DataProcessFactory.getDataProcess();
@Override
public List<Foo> fetchData(ShardingContext context) {
List<Foo> result = new ArrayList<Foo>();
result = dataProcess.getData(context.getShardingParameter(), context.getShardingTotalCount());
System.out.println(String.format("------Thread ID: %s, Date: %s, Sharding Context: %s, Action: %s, Data: %s", Thread.currentThread().getId(), new Date(), context, "fetch data",result));
return result;
}
@Override
public void processData(ShardingContext shardingContext, List<Foo> data) {
System.out.println(String.format("------Thread ID: %s, Date: %s, Sharding Context: %s, Action: %s, Data: %s", Thread.currentThread().getId(), new Date(), shardingContext, "finish data",data));
for(Foo foo:data){
dataProcess.setData(foo.getId());
}
}
}
(3)具體的處理類
package dataflowjob.process;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import dataflowjob.entity.Foo;
public class DataProcess {
private Map<Integer, Foo> data = new ConcurrentHashMap<>(30, 1);
public DataProcess()
{
for(int i=0;i<30;i++){
data.put(i, new Foo(i,Foo.Status.TODO));
}
}
public List<Foo> getData(String tailId,int shardNum)
{
int intId = Integer.parseInt(tailId);
List<Foo> result = new ArrayList<Foo>();
for (Map.Entry<Integer, Foo> each : data.entrySet()) {
Foo foo = each.getValue();
int key = each.getKey();
if (key % shardNum == intId && foo.getStatus() == Foo.Status.TODO) {
result.add(foo);
}
}
return result;
}
public void setData(int i){
data.get(i).setStatus(Foo.Status.DONE);
}
}
(4)entity類Foo
package dataflowjob.entity;
public class Foo {
private int id;
private Status status;
public Foo(final int id,final Status status) {
this.id = id;
this.status = status;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public Status getStatus() {
return status;
}
public void setStatus(Status status) {
this.status = status;
}
public enum Status{
TODO,
DONE
}
}
(5)具體處理工廠類
package dataflowjob.process;
public class DataProcessFactory {
private static DataProcess dataProcess = new DataProcess();
public static DataProcess getDataProcess() {
return dataProcess;
}
}