1. 程式人生 > >【elastic-job】elastic-job部署以及簡單例子

【elastic-job】elastic-job部署以及簡單例子

一、elastic-job是什麼

elastic-job是噹噹開發的基於qutarz以及zookeeper封裝的作業排程工具,主要有兩個大框架,一個是elastic-job lite另外一個是elastic-job cloud,其中qutarz是一個開源的作業排程工具,zookeeper是分散式排程工具,這兩者結合搭建了elastic-job-lite,這是一個無中心節點的排程,而elastic-job-cloud是一個有中心節點的分散式排程開源工具,只需要設定好機器以及分片,就可以自動的排程到對應的機器上執行,與lite的不同時cloud採用了mesos來進行分散式資源管理,簡單的來說兩者的不同是:同一個作業在兩臺機器上跑,lite需要手動在兩臺機器上跑,但是cloud只需要上傳作業包,就可以自動的在兩臺機器上跑,因為lite不支援作業的排程,為無中心的。

二、環境的搭建

由於elastic-job-cloud的環境暫時未搭建出來,因此在此簡單介紹lite的搭建

(1)jdk的安裝

(2)zookeeper的安裝

(3)maven的安裝

官網maven要求3.0.4以及以上,具體的安裝過程與jdk類似,請自行百度

三、elastic-job-lite的優勢及特點

(1)簡單的概念及適用場景

1. 分片概念

任務的分散式執行,需要將一個任務拆分為n個獨立的任務項,然後由分散式的伺服器分別執行某一個或幾個分片項。 例如:有一個遍歷資料庫某張表的作業,現有2臺伺服器。為了快速的執行作業,那麼每臺伺服器應執行作業的50%。 為滿足此需求,可將作業分成2片,每臺伺服器執行1片。作業遍歷資料的邏輯應為:伺服器A遍歷ID以奇數結尾的資料;伺服器B遍歷ID以偶數結尾的資料。 如果分成10片,則作業遍歷資料的邏輯應為:每片分到的分片項應為ID%10,而伺服器A被分配到分片項0,1,2,3,4;伺服器B被分配到分片項5,6,7,8,9,直接的結果就是伺服器A遍歷ID以0-4結尾的資料;伺服器B遍歷ID以5-9結尾的資料。

2. 分片項與業務處理解耦

Elastic-Job並不直接提供資料處理的功能,框架只會將分片項分配至各個執行中的作業伺服器,開發者需要自行處理分片項與真實資料的對應關係。

3. 個性化引數的適用場景

個性化引數即shardingItemParameter,可以和分片項匹配對應關係,用於將分片項的數字轉換為更加可讀的業務程式碼。 例如:按照地區水平拆分資料庫,資料庫A是北京的資料;資料庫B是上海的資料;資料庫C是廣州的資料。 如果僅按照分片項配置,開發者需要了解0表示北京;1表示上海;2表示廣州。 合理使用個性化引數可以讓程式碼更可讀,如果配置為0=北京,1=上海,2=廣州,那麼程式碼中直接使用北京,上海,廣州的列舉值即可完成分片項和業務邏輯的對應關係。

(2)elastic-job-lite優勢及特點

1. 分散式排程

Elastic-Job-Lite並無作業排程中心節點,而是基於部署作業框架的程式在到達相應時間點時各自觸發排程。 註冊中心僅用於作業註冊和監控資訊儲存。而主作業節點僅用於處理分片和清理等功能。

2. 作業高可用

Elastic-Job-Lite提供最安全的方式執行作業。將分片總數設定為1,並使用多於1臺的伺服器執行作業,作業將會以1主n從的方式執行。 一旦執行作業的伺服器崩潰,等待執行的伺服器將會在下次作業啟動時替補執行。開啟失效轉移功能效果更好,可以保證在本次作業執行時崩潰,備機立即啟動替補執行。

3. 最大限度利用資源

Elastic-Job-Lite也提供最靈活的方式,最大限度的提高執行作業的吞吐量。將分片項設定為大於伺服器的數量,最好是大於伺服器倍數的數量,作業將會合理的利用分散式資源,動態的分配分片項。 例如:3臺伺服器,分成10片,則分片項分配結果為伺服器A=0,1,2;伺服器B=3,4,5;伺服器C=6,7,8,9。 如果伺服器C崩潰,則分片項分配結果為伺服器A=0,1,2,3,4;伺服器B=5,6,7,8,9。在不丟失分片項的情況下,最大限度的利用現有資源提高吞吐量。

三、簡單的例子

elastic-job的作業型別分為三種,一種是簡單的simple的形式,一種是基於流式資料的處理,一種是基於指令碼的排程,因為本人所使用的情況是基於流式的處理,那麼就簡單搭了一個基於流式的demo,其他型別的類似 流式作業的方式適合於不間斷的資料處理的型別,例如需要拉取訂單資料,因為訂單是連續不間斷的,因此需要一直拉取。 按照elastic-job官網上介紹,搭建一個基於dataflow(流式處理)的demo,這個demo的功能就是,從一個數據中心裡面取資料,按照資料中心的資料id%分片個數==分片引數進行拉取資料,拉取完成後將對應的資料id置為完成的狀態,具體程式碼如下所示:

(1)入口函式main函式以及作業的配置

package ElasticJobExample.ElasticJobExample;

import com.dangdang.ddframe.job.config.JobCoreConfiguration;
import com.dangdang.ddframe.job.config.dataflow.DataflowJobConfiguration;
import com.dangdang.ddframe.job.lite.api.JobScheduler;
import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;
import com.dangdang.ddframe.job.reg.base.CoordinatorRegistryCenter;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperConfiguration;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;

/**
 * Hello world!
 *
 */
public class App 
{
	public static void main(String[] args) {
        new JobScheduler(createRegistryCenter(), createJobConfiguration()).init();
    }
    
    private static CoordinatorRegistryCenter createRegistryCenter() {
        CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(new ZookeeperConfiguration("ip:2181", "elastic-job-demo"));
        regCenter.init();
        return regCenter;
    }
    
    private static LiteJobConfiguration createJobConfiguration() {
        // 建立作業配置
    	
    	JobCoreConfiguration coreConfig = JobCoreConfiguration.newBuilder("myDataFlowTest", "0/10 * * * * ?", 3).shardingItemParameters("0=0,1=1,2=2").build();
        DataflowJobConfiguration dataflowJobConfig = new DataflowJobConfiguration(coreConfig, JavaDataflowJob.class.getCanonicalName(), true);
        LiteJobConfiguration result = LiteJobConfiguration.newBuilder(dataflowJobConfig).build();
        return result;
    }
}


(2)作業的邏輯處理部分

package ElasticJobExample.ElasticJobExample;

import java.util.ArrayList;
import java.util.Date;
import java.util.List;

import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.dataflow.DataflowJob;

import dataflowjob.entity.Foo;
import dataflowjob.process.DataProcess;
import dataflowjob.process.DataProcessFactory;

public class JavaDataflowJob implements DataflowJob<Foo> {
	private DataProcess dataProcess = DataProcessFactory.getDataProcess();
    
    @Override
    public List<Foo> fetchData(ShardingContext context) {
        List<Foo> result = new ArrayList<Foo>();
        result = dataProcess.getData(context.getShardingParameter(), context.getShardingTotalCount());
        System.out.println(String.format("------Thread ID: %s, Date: %s, Sharding Context: %s, Action: %s, Data: %s", Thread.currentThread().getId(), new Date(), context, "fetch data",result));
		return result;
    }
    
    @Override
    public void processData(ShardingContext shardingContext, List<Foo> data) {
    	System.out.println(String.format("------Thread ID: %s, Date: %s, Sharding Context: %s, Action: %s, Data: %s", Thread.currentThread().getId(), new Date(), shardingContext, "finish data",data));
    	for(Foo foo:data){
    		dataProcess.setData(foo.getId());
    	}
    }

}


(3)具體的處理類

package dataflowjob.process;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import dataflowjob.entity.Foo;

public class DataProcess {

	private Map<Integer, Foo> data = new ConcurrentHashMap<>(30, 1);
	public DataProcess()
	{
		for(int i=0;i<30;i++){
			data.put(i, new Foo(i,Foo.Status.TODO));
		}
	}
	public List<Foo> getData(String tailId,int shardNum)
	{
		int intId  = Integer.parseInt(tailId);
		List<Foo> result = new ArrayList<Foo>();
		for (Map.Entry<Integer, Foo> each : data.entrySet()) {
            Foo foo = each.getValue();
            int key = each.getKey();
            if (key % shardNum == intId && foo.getStatus() == Foo.Status.TODO) {
                result.add(foo);
            }
        }
		return result;
	}
	public void setData(int i){
		data.get(i).setStatus(Foo.Status.DONE);
	}

}


(4)entity類Foo

package dataflowjob.entity;

public class Foo {
	private int id;
	private Status status;
	public Foo(final int id,final Status status) {
        this.id = id;
        this.status = status;
    }
	public int getId() {
		return id;
	}
	public void setId(int id) {
		this.id = id;
	}
	public Status getStatus() {
		return status;
	}
	public void setStatus(Status status) {
		this.status = status;
	}
	public enum Status{
		TODO,
		DONE
	} 

}


(5)具體處理工廠類

package dataflowjob.process;


public class DataProcessFactory {
	  private static DataProcess dataProcess = new DataProcess();
	    
	    public static DataProcess getDataProcess() {
	        return dataProcess;
	    }

}