1. 程式人生 > >分散式定時任務框架Elastic-Job的使用

分散式定時任務框架Elastic-Job的使用

開發十年,就只剩下這套架構體系了! >>>   

一、前言

    Elastic-Job是一個優秀的分散式作業排程框架。

    Elastic-Job是一個分散式排程解決方案,由兩個相互獨立的子專案Elastic-Job-Lite和Elastic-Job-Cloud組成。

    Elastic-Job-Lite定位為輕量級無中心化解決方案,使用jar包的形式提供分散式任務的協調服務。

    Elastic-Job-Cloud使用Mesos + Docker的解決方案,額外提供資源治理、應用分發以及程序隔離等服務。

1. Elastic-Job-Lite

  • 分散式排程協調

  • 彈性擴容縮容

  • 失效轉移

  • 錯過執行作業重觸發

  • 作業分片一致性,保證同一分片在分散式環境中僅一個執行例項

  • 自診斷並修復分散式不穩定造成的問題

  • 支援並行排程

  • 支援作業生命週期操作

  • 豐富的作業型別

  • Spring整合以及名稱空間提供

  • 運維平臺

2. Elastic-Job-Cloud

  • 應用自動分發

  • 基於Fenzo的彈性資源分配

  • 分散式排程協調

  • 彈性擴容縮容

  • 失效轉移

  • 錯過執行作業重觸發

  • 作業分片一致性,保證同一分片在分散式環境中僅一個執行例項

  • 支援並行排程

  • 支援作業生命週期操作

  • 豐富的作業型別

  • Spring整合

  • 運維平臺

  • 基於Docker的程序隔離(TBD)

二、導讀

    1、Elastic-Job的核心思想

    2、Elastic-Job的基本使用

三、Elastic-Job的核心思想

    對於分散式計算而言,分片是最基本的思想,Elastic-Job也是沿用了這個思想,每個job跑部分資料,所有job執行完成,便是全量資料,官網給出的SimpleJob例子如下:

public class MyElasticJob implements SimpleJob {
    
    @Override
    public void execute(ShardingContext context) {
        switch (context.getShardingItem()) {
            case 0: 
                // do something by sharding item 0
                break;
            case 1: 
                // do something by sharding item 1
                break;
            case 2: 
                // do something by sharding item 2
                break;
            // case n: ...
        }
    }
}

    用switch case迴圈來對應分片的業務邏輯,case分片的index,進入業務邏輯執行。當然這裡也有不適應的場景,類似於MapReduce需要shuffle的場景就不適合了,比方說,要根據某一個欄位全域性分組聚合求結果,這時候怎麼分片都可能會不合理,因為每個分片只能處理N分之一的資料,沒辦法shuffle再聚合,這一點,也要根據具體的業務來使用。

   那麼ShardingContext可以拿到那些資訊呢?原始碼如下

    

public final class ShardingContext {
    
    /**
     * 作業名稱.
     */
    private final String jobName;
    
    /**
     * 作業任務ID.
     */
    private final String taskId;
    
    /**
     * 分片總數.
     */
    private final int shardingTotalCount;
    
    /**
     * 作業自定義引數.
     * 可以配置多個相同的作業, 但是用不同的引數作為不同的排程例項.
     */
    private final String jobParameter;
    
    /**
     * 分配於本作業例項的分片項.
     */
    private final int shardingItem;
    
    /**
     * 分配於本作業例項的分片引數.
     */
    private final String shardingParameter;
    
    public ShardingContext(final ShardingContexts shardingContexts, final int shardingItem) {
        jobName = shardingContexts.getJobName();
        taskId = shardingContexts.getTaskId();
        shardingTotalCount = shardingContexts.getShardingTotalCount();
        jobParameter = shardingContexts.getJobParameter();
        this.shardingItem = shardingItem;
        shardingParameter = shardingContexts.getShardingItemParameters().get(shardingItem);
    }
}

    以上程式碼,jobParameter和shardingItem是最有用的引數,shardingItem決定switch case迴圈的走向,shardingParameter可以用業務的查詢條件,也可以用字串拼接的方式組裝很複雜的引數用於特定的業務。

四、Elastic-Job的基本使用

    1、Job配置項

public class ElasticJobConfig {
	private static CoordinatorRegistryCenter createRegistryCenter() {

		ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration("127.0.0.1:2181", "elastic-job");
		CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(zookeeperConfiguration);
		regCenter.init();
		return regCenter;
	}

	private static LiteJobConfiguration createJobConfiguration() {

		JobCoreConfiguration simpleCoreConfig = JobCoreConfiguration.newBuilder("jobdemo", "0/5 * * * * ?", 3)
				.shardingItemParameters("0=A,1=A,2=B").failover(true).misfire(true).build();
		SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(simpleCoreConfig,
				MyElasticJob.class.getCanonicalName());
		LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).overwrite(true)
				.build();
		return simpleJobRootConfig;
	}

	public static void main(String[] args) {
		new JobScheduler(createRegistryCenter(), createJobConfiguration()).init();
	}
}

    幾點說明:

    註冊中心配置項,設定zookeeper叢集地址,我這裡用的本地單節點,所以只有一個,當然可以配置任務名稱,名稱空間(namespace,本質上會在zk裡生成一個目錄),超時時間,最大重試次數等等

    LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).overwrite(true).build()中,overwrite引數非常重要,設定這個引數為true,修改過job配置資訊才會覆蓋zookeeper裡的資料,要不然不會生效。

    2、SimpleJob的實現

public class MyElasticJob implements SimpleJob {

	@Override
	public void execute(ShardingContext shardingContext) {
		switch (shardingContext.getShardingItem()) {
		case 0: {
			System.out.println("當前分片:" + shardingContext.getShardingItem() + "=====" + "引數:"
					+ shardingContext.getShardingParameter() + " =====" + Thread.currentThread());
			break;
		}
		case 1: {
			System.out.println("當前分片:" + shardingContext.getShardingItem() + "=====" + "引數:"
					+ shardingContext.getShardingParameter() + " =====" + Thread.currentThread());
			break;
		}
		case 2: {
			System.out.println("當前分片:" + shardingContext.getShardingItem() + "=====" + "引數:"
					+ shardingContext.getShardingParameter() + " =====" + Thread.currentThread());
			break;
		}
		default: {
			System.out.println("當前分片:" + shardingContext.getShardingItem() + "=====" + "引數:"
					+ shardingContext.getShardingParameter() + " =====" + Thread.currentThread());
			break;
		}
		}
	}
}

    上面設定每5秒鐘執行一次,執行ElasticJobConfig的main方法,執行結果如下:

    

    從上面的結果,可以看出,執行每個分片的任務,其實是放到一個執行緒池去執行的,對應的分片資訊和引數資訊在shardingContext可以拿到,實現業務非常方便。

    最後,如果啟動多個JVM,那麼這些任務就分散到各個節點裡,如果一個節點宕機,下次觸發任務時,將把該分片任務丟到健康機器執行,這裡做到了節點容錯。但是某個分片的任務在執行過程中失敗了,那麼這裡是不會重新觸發改分片任務的執行的。

 

 

&nb