1. 程式人生 > 程式設計 >Java中批處理框架spring batch詳細介紹

Java中批處理框架spring batch詳細介紹

spring batch簡介

spring batch是spring提供的一個數據處理框架。企業域中的許多應用程式需要批量處理才能在關鍵任務環境中執行業務操作。 這些業務運營包括:

  • 無需使用者互動即可最有效地處理大量資訊的自動化,複雜處理。 這些操作通常包括基於時間的事件(例如月末計算,通知或通訊)。
  • 在非常大的資料集中重複處理複雜業務規則的定期應用(例如,保險利益確定或費率調整)。
  • 整合從內部和外部系統接收的資訊,這些資訊通常需要以事務方式格式化,驗證和處理到記錄系統中。 批處理用於每天為企業處理數十億的交易。

Spring Batch是一個輕量級,全面的批處理框架,旨在開發對企業系統日常運營至關重要的強大批處理應用程式。 Spring Batch構建了人們期望的Spring Framework特性(生產力,基於POJO的開發方法和一般易用性),同時使開發人員可以在必要時輕鬆訪問和利用更高階的企業服務。 Spring Batch不是一個schuedling的框架。

Spring Batch提供了可重用的功能,這些功能對於處理大量的資料至關重要,包括記錄/跟蹤,事務管理,作業處理統計,作業重啟,跳過和資源管理。 它還提供更高階的技術服務和功能,通過優化和分割槽技術實現極高容量和高效能的批處理作業。 Spring Batch可用於兩種簡單的用例(例如將檔案讀入資料庫或執行儲存過程)以及複雜的大量用例(例如在資料庫之間移動大量資料,轉換它等等) 上)。 大批量批處理作業可以高度可擴充套件的方式利用該框架來處理大量資訊。

Spring Batch架構介紹

一個典型的批處理應用程式大致如下:

從資料庫,檔案或佇列中讀取大量記錄。以某種方式處理資料。以修改之後的形式寫回資料。

其對應的示意圖如下:

Java中批處理框架spring batch詳細介紹

spring batch的一個總體的架構如下:

Figure 2.1: Batch Stereotypes

在spring batch中一個job可以定義很多的步驟step,在每一個step裡面可以定義其專屬的ItemReader用於讀取資料,ItemProcesseor用於處理資料,ItemWriter用於寫資料,而每一個定義的job則都在JobRepository裡面,我們可以通過JobLauncher來啟動某一個job。

Spring Batch核心概念介紹

下面是一些概念是Spring batch框架中的核心概念。

什麼是Job

Job和Step是spring batch執行批處理任務最為核心的兩個概念。

其中Job是一個封裝整個批處理過程的一個概念。Job在spring batch的體系當中只是一個最頂層的一個抽象概念,體現在程式碼當中則它只是一個最上層的介面,其程式碼如下:

 
/**
 * Batch domain object representing a job. Job is an explicit abstraction
 * representing the configuration of a job specified by a developer. It should
 * be noted that restart policy is applied to the job as a whole and not to a
 * step.
 */
public interface Job {
 
	String getName();
 
 
	boolean isRestartable();
 
 
	void execute(JobExecution execution);
 
 
	JobParametersIncrementer getJobParametersIncrementer();
 
 
	JobParametersValidator getJobParametersValidator();
 
}

在Job這個介面當中定義了五個方法,它的實現類主要有兩種型別的job,一個是simplejob,另一個是flowjob。在spring batch當中,job是最頂層的抽象,除job之外我們還有JobInstance以及JobExecution這兩個更加底層的抽象。

一個job是我們執行的基本單位,它內部由step組成。job本質上可以看成step的一個容器。一個job可以按照指定的邏輯順序組合step,並提供了我們給所有step設定相同屬性的方法,例如一些事件監聽,跳過策略。

Spring Batch以SimpleJob類的形式提供了Job介面的預設簡單實現,它在Job之上建立了一些標準功能。一個使用java config的例子程式碼如下:

@Bean
public Job footballJob() {
 return this.jobBuilderFactory.get("footballJob")
 .start(playerLoad())
 .next(gameLoad())
 .next(playerSummarization())
 .end()
 .build();
}

這個配置的意思是:首先給這個job起了一個名字叫footballJob,接著指定了這個job的三個step,他們分別由方法,playerLoad,gameLoad,playerSummarization實現。

什麼是JobInstance

我們在上文已經提到了JobInstance,他是Job的更加底層的一個抽象,他的定義如下:

public interface JobInstance {
	/**
	 * Get unique id for this JobInstance.
	 * @return instance id
	 */
	public long getInstanceId();
	/**
	 * Get job name.
	 * @return value of 'id' attribute from <job>
	 */
	public String getJobName();	
}

他的方法很簡單,一個是返回Job的id,另一個是返回Job的名字。

JobInstance指的是job運行當中,作業執行過程當中的概念。Instance本就是例項的意思。

比如說現在有一個批處理的job,它的功能是在一天結束時執行行一次。我們假定這個批處理job的名字為'EndOfDay'。在這個情況下,那麼每天就會有一個邏輯意義上的JobInstance,而我們必須記錄job的每次執行的情況。

什麼是JobParameters

在上文當中我們提到了,同一個job每天執行一次的話,那麼每天都有一個jobIntsance,但他們的job定義都是一樣的,那麼我們怎麼來區別一個job的不同jobinstance了。 不妨先做個猜想,雖然jobinstance的job定義一樣,但是他們有的東西就不一樣,例如執行時間。

spring batch中提供的用來標識一個jobinstance的東西是:JobParameters。 JobParameters物件包含一組用於啟動批處理作業的引數,它可以在執行期間用於識別或甚至用作參考資料。我們假設的執行時間,就可以作為一個JobParameters。

例如,我們前面的'EndOfDay'的job現在已經有了兩個例項,一個產生於1月1日,另一個產生於1月2日,那麼我們就可以定義兩個JobParameter物件:一個的引數是01-01,另一個的引數是01-02。 因此,識別一個JobInstance的方法可以定義為:

Java中批處理框架spring batch詳細介紹

因此,我麼可以通過Jobparameter來操作正確的JobInstance

什麼是JobExecution

JobExecution指的是單次嘗試執行一個我們定義好的Job的程式碼層面的概念。 job的一次執行可能以失敗也可能成功。只有當執行成功完成時,給定的與執行相對應的JobInstance才也被視為完成。

還是以前面描述的EndOfDay的job作為示例,假設第一次執行01-01-2019的JobInstance結果是失敗。 那麼此時如果使用與第一次執行相同的Jobparameter引數(即01-01-2019)作業引數再次執行,那麼就會建立一個對應於之前jobInstance的一個新的JobExecution例項,JobInstance仍然只有一個。

JobExecution的介面定義如下:

public interface JobExecution {
	/**
	 * Get unique id for this JobExecution.
	 * @return execution id
	 */
	public long getExecutionId();
	/**
	 * Get job name.
	 * @return value of 'id' attribute from <job>
	 */
	public String getJobName(); 
	/**
	 * Get batch status of this execution.
	 * @return batch status value.
	 */
	public BatchStatus getBatchStatus();
	/**
	 * Get time execution entered STARTED status. 
	 * @return date (time)
	 */
	public Date getStartTime();
	/**
	 * Get time execution entered end status: COMPLETED,STOPPED,FAILED 
	 * @return date (time)
	 */
	public Date getEndTime();
	/**
	 * Get execution exit status.
	 * @return exit status.
	 */
	public String getExitStatus();
	/**
	 * Get time execution was created.
	 * @return date (time)
	 */
	public Date getCreateTime();
	/**
	 * Get time execution was last updated updated.
	 * @return date (time)
	 */
	public Date getLastUpdatedTime();
	/**
	 * Get job parameters for this execution.
	 * @return job parameters 
	 */
	public Properties getJobParameters();
	
}

每一個方法的註釋已經解釋的很清楚,這裡不再多做解釋。只提一下BatchStatus,JobExecution當中提供了一個方法getBatchStatus用於獲取一個job某一次特地執行的一個狀態。BatchStatus是一個代表job狀態的列舉類,其定義如下:

public enum BatchStatus {STARTING,STARTED,STOPPING,FAILED,COMPLETED,ABANDONED }

這些屬性對於一個job的執行來說是非常關鍵的資訊,並且spring batch會將他們持久到資料庫當中. 在使用Spring batch的過程當中spring batch會自動建立一些表用於儲存一些job相關的資訊,用於儲存JobExecution的表為batch_job_execution,下面是一個從資料庫當中截圖的例項:

Java中批處理框架spring batch詳細介紹

什麼是Step

每一個Step物件都封裝了批處理作業的一個獨立的階段。 事實上,每一個Job本質上都是由一個或多個步驟組成。 每一個step包含定義和控制實際批處理所需的所有資訊。 任何特定的內容都由編寫Job的開發人員自行決定。 一個step可以非常簡單也可以非常複雜。 例如,一個step的功能是將檔案中的資料載入到資料庫中,那麼基於現在spring batch的支援則幾乎不需要寫程式碼。 更復雜的step可能具有複雜的業務邏輯,這些邏輯作為處理的一部分。 與Job一樣,Step具有與JobExecution類似的StepExecution,如下圖所示:

Figure 2.1: Job Hierarchy With Steps

什麼是StepExecution

StepExecution表示一次執行Step,每次執行一個Step時都會建立一個新的StepExecution,類似於JobExecution。 但是,某個步驟可能由於其之前的步驟失敗而無法執行。 且僅當Step實際啟動時才會建立StepExecution。

一次step執行的例項由StepExecution類的物件表示。 每個StepExecution都包含對其相應步驟的引用以及JobExecution和事務相關的資料,例如提交和回滾計數以及開始和結束時間。 此外,每個步驟執行都包含一個ExecutionContext,其中包含開發人員需要在批處理執行中保留的任何資料,例如重新啟動所需的統計資訊或狀態資訊。下面是一個從資料庫當中截圖的例項:

Java中批處理框架spring batch詳細介紹

什麼是ExecutionContext

ExecutionContext即每一個StepExecution的執行環境。它包含一系列的鍵值對。我們可以用如下程式碼獲取ExecutionContext

ExecutionContext ecStep = stepExecution.getExecutionContext();
ExecutionContext ecJob = jobExecution.getExecutionContext();

什麼是JobRepository

JobRepository是一個用於將上述job,step等概念進行持久化的一個類。 它同時給Job和Step以及下文會提到的JobLauncher實現提供CRUD操作。 首次啟動Job時,將從repository中獲取JobExecution,並且在執行批處理的過程中,StepExecution和JobExecution將被儲存到repository當中。

@EnableBatchProcessing註解可以為JobRepository提供自動配置。

< id="joblauncher">什麼是JobLauncher

JobLauncher這個介面的功能非常簡單,它是用於啟動指定了JobParameters的Job,為什麼這裡要強調指定了JobParameter,原因其實我們在前面已經提到了,jobparameter和job一起才能組成一次job的執行。下面是程式碼例項:

public interface JobLauncher {
 
public JobExecution run(Job job,JobParameters jobParameters)
 throws JobExecutionAlreadyRunningException,JobRestartException,JobInstanceAlreadyCompleteException,JobParametersInvalidException;
}

上面run方法實現的功能是根據傳入的job以及jobparamaters從JobRepository獲取一個JobExecution並執行Job。

什麼是Item Reader

ItemReader是一個讀資料的抽象,它的功能是為每一個Step提供資料輸入。 當ItemReader以及讀完所有資料時,它會返回null來告訴後續操作資料已經讀完。Spring Batch為ItemReader提供了非常多的有用的實現類,比如JdbcPagingItemReader,JdbcCursorItemReader等等。

ItemReader支援的讀入的資料來源也是非常豐富的,包括各種型別的資料庫,檔案,資料流,等等。幾乎涵蓋了我們的所有場景。

下面是一個JdbcPagingItemReader的例子程式碼:

@Bean
public JdbcPagingItemReader itemReader(DataSource dataSource,PagingQueryProvider queryProvider) {
 Map<String,Object> parameterValues = new HashMap<>();
 parameterValues.put("status","NEW");
 
 return new JdbcPagingItemReaderBuilder<CustomerCredit>()
  .name("creditReader")
  .dataSource(dataSource)
  .queryProvider(queryProvider)
  .parameterValues(parameterValues)
  .rowMapper(customerCreditMapper())
  .pageSize(1000)
  .build();
}
 
@Bean
public SqlPagingQueryProviderFactoryBean queryProvider() {
 SqlPagingQueryProviderFactoryBean provider = new SqlPagingQueryProviderFactoryBean();
 
 provider.setSelectClause("select id,name,credit");
 provider.setFromClause("from customer");
 provider.setWhereClause("where status=:status");
 provider.setSortKey("id");
 
 return provider;
}

JdbcPagingItemReader必須指定一個PagingQueryProvider,負責提供SQL查詢語句來按分頁返回資料。

下面是一個JdbcCursorItemReader的例子程式碼:

 private JdbcCursorItemReader<Map<String,Object>> buildItemReader(final DataSource dataSource,String tableName,String tenant) {
 
 JdbcCursorItemReader<Map<String,Object>> itemReader = new JdbcCursorItemReader<>();
 itemReader.setDataSource(dataSource);
 itemReader.setSql("sql here");
 itemReader.setRowMapper(new RowMapper());
 return itemReader;
 }

什麼是Item Writer

既然ItemReader是讀資料的一個抽象,那麼ItemWriter自然就是一個寫資料的抽象,它是為每一個step提供資料寫出的功能。寫的單位是可以配置的,我們可以一次寫一條資料,也可以一次寫一個chunk的資料,關於chunk下文會有專門的介紹。ItemWriter對於讀入的資料是不能做任何操作的。

Spring Batch為ItemWriter也提供了非常多的有用的實現類,當然我們也可以去實現自己的writer功能。

什麼是Item Processor

ItemProcessor對專案的業務邏輯處理的一個抽象,當ItemReader讀取到一條記錄之後,ItemWriter還未寫入這條記錄之前,I我們可以藉助temProcessor提供一個處理業務邏輯的功能,並對資料進行相應操作。如果我們在ItemProcessor發現一條資料不應該被寫入,可以通過返回null來表示。ItemProcessor和ItemReader以及ItemWriter可以非常好的結合在一起工作,他們之間的資料傳輸也非常方便。我們直接使用即可。

chunk 處理流程

spring batch提供了讓我們按照chunk處理資料的能力,一個chunk的示意圖如下:

Java中批處理框架spring batch詳細介紹

它的意思就和圖示的一樣,由於我們一次batch的任務可能會有很多的資料讀寫操作,因此一條一條的處理並向資料庫提交的話效率不會很高,因此spring batch提供了chunk這個概念,我們可以設定一個chunk size,spring batch 將一條一條處理資料,但不提交到資料庫,只有當處理的資料數量達到chunk size設定的值得時候,才一起去commit.

java的例項定義程式碼如下:

Java中批處理框架spring batch詳細介紹

在上面這個step裡面,chunk size被設為了10,當ItemReader讀的資料數量達到10的時候,這一批次的資料就一起被傳到itemWriter,同時transaction被提交。

skip策略和失敗處理

一個batch的job的step,可能會處理非常大數量的資料,難免會遇到出錯的情況,出錯的情況雖出現的概率較小,但是我們不得不考慮這些情況,因為我們做資料遷移最重要的是要保證資料的最終一致性。spring batch當然也考慮到了這種情況,並且為我們提供了相關的技術支援,請看如下bean的配置:

Java中批處理框架spring batch詳細介紹

我們需要留意這三個方法,分別是skipLimit(),skip(),noSkip(),

skipLimit方法的意思是我們可以設定一個我們允許的這個step可以跳過的異常數量,假如我們設定為10,則當這個step執行時,只要出現的異常數目不超過10,整個step都不會fail。注意,若不設定skipLimit,則其預設值是0.

skip方法我們可以指定我們可以跳過的異常,因為有些異常的出現,我們是可以忽略的。

noSkip方法的意思則是指出現這個異常我們不想跳過,因此這種異常出現一次時,計數器就會加一,直到達到上限。

批處理操作指南

本部分是一些使用spring batch時的值得注意的點

< id="%E6%89%B9%E5%A4%84%E7%90%86%E5%8E%9F%E5%88%99">

批處理原則

在構建批處理解決方案時,應考慮以下關鍵原則和注意事項。

  • 批處理體系結構通常會影響體系結構
  • 儘可能簡化並避免在單批應用程式中構建複雜的邏輯結構
  • 保持資料的處理和儲存在物理上靠得很近(換句話說,將資料儲存在處理過程中)。
  • 最大限度地減少系統資源的使用,尤其是I / O. 在internal memory中執行儘可能多的操作。
  • 檢視應用程式I / O(分析SQL語句)以確保避免不必要的物理I / O. 特別是,需要尋找以下四個常見缺陷:

當資料可以被讀取一次並快取或儲存在工作儲存中時,讀取每個事務的資料。
重新讀取先前在同一事務中讀取資料的事務的資料。
導致不必要的表或索引掃描。
未在SQL語句的WHERE子句中指定鍵值。

在批處理執行中不要做兩次一樣的事情。 例如,如果需要資料彙總以用於報告目的,則應該(如果可能)在最初處理資料時遞增儲存的總計,因此您的報告應用程式不必重新處理相同的資料。

在批處理應用程式開始時分配足夠的記憶體,以避免在此過程中進行耗時的重新分配。

總是假設資料完整性最差。 插入適當的檢查和記錄驗證以維護資料完整性。

儘可能實施校驗和以進行內部驗證。 例如,對於一個檔案裡的資料應該有一個數據條數紀錄,告訴檔案中的記錄總數以及關鍵欄位的彙總。

在具有真實資料量的類似生產環境中儘早計劃和執行壓力測試。

在大批量系統中,資料備份可能具有挑戰性,特別是如果系統以24-7線上的情況執行。 資料庫備份通常在線上設計中得到很好的處理,但檔案備份應該被視為同樣重要。 如果系統依賴於檔案,則檔案備份過程不僅應該到位並記錄在案,還應定期進行測試。

如何預設不啟動job

在使用java config使用spring batch的job時,如果不做任何配置,專案在啟動時就會預設去跑我們定義好的批處理job。那麼如何讓專案在啟動時不自動去跑job呢?

spring batch的job會在專案啟動時自動run,如果我們不想讓他在啟動時run的話,可以在application.properties中新增如下屬性:

spring.batch.job.enabled=false

在讀資料時記憶體不夠

在使用spring batch做資料遷移時,發現在job啟動後,執行到一定時間點時就卡在一個地方不動了,且log也不再列印,等待一段時間之後,得到如下錯誤:

Java中批處理框架spring batch詳細介紹

紅字的資訊為:Resource exhaustion event:the JVM was unable to allocate memory from the heap.

翻譯過來的意思就是專案發出了一個資源耗盡的事件,告訴我們java虛擬機器無法再為堆分配記憶體。

造成這個錯誤的原因是: 這個專案裡的batch job的reader是一次性拿回了資料庫裡的所有資料,並沒有進行分頁,當這個資料量太大時,就會導致記憶體不夠用。解決的辦法有兩個:

  • 調整reader讀資料邏輯,按分頁讀取,但實現上會麻煩一些,且執行效率會下降
  • 增大service記憶體

更多資訊請參考部落格:spring batch使用reader讀資料的記憶體容量問題

到此這篇關於Java中批處理框架spring batch詳細介紹的文章就介紹到這了,更多相關Java 批處理框架spring batch內容請搜尋我們以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援我們!