1. 程式人生 > >大資料批處理框架Spring Batch 的全面解析

大資料批處理框架Spring Batch 的全面解析

如今微服務架構討論的如火如荼。但在企業架構裡除了大量的OLTP交易外,還存在海量的批處理交易。在諸如銀行的金融機構中,每天有3-4萬筆的批處理作業需要處理。針對OLTP,業界有大量的開源框架、優秀的架構設計給予支撐;但批處理領域的框架確鳳毛麟角。是時候和我們一起來了解下批處理的世界哪些優秀的框架和設計了,今天我將以Spring Batch為例,和大家一起探祕批處理的世界。

批處理典型業務場景

對賬是典型的批處理業務處理場景,各個金融機構的往來業務和跨主機系統的業務都會涉及到對賬的過程,如大小額支付、銀聯交易、人行往來、現金管理、POS業務、ATM業務、證券公司資金賬戶、證券公司與證券結算公司。

下面是某行網銀的部分日終跑批例項場景需求。

涉及到的需求點包括:

批量的每個單元都需要錯誤處理和回退;

每個單元在不同平臺中執行;

需要有分支選擇;

每個單元需要監控和獲取單元處理日誌;

提供多種觸發規則,按日期,日曆,週期觸發;

除此之外典型的批處理適用於如下的業務場景:

定期提交批處理任務(日終處理)

並行批處理:並行處理任務

企業訊息驅動處理

大規模的並行處理

手動或定時重啟

按順序處理依賴的任務(可擴充套件為工作流驅動的批處理)

部分處理:忽略記錄(例如在回滾時)

完整的批處理事務

與OLTP型別交易不同,批處理作業兩個典型特徵是批量執行與自動執行(需要無人值守):前者能夠處理大批量資料的匯入、匯出和業務邏輯計算;後者無需人工干預,能夠自動化執行批量任務。

在關注其基本功能之外,還需要關注如下的幾點:

健壯性:不會因為無效資料或錯誤資料導致程式崩潰;

可靠性:通過跟蹤、監控、日誌及相關的處理策略(重試、跳過、重啟)實現批作業的可靠執行;

擴充套件性:通過併發或者並行技術實現應用的縱向和橫向擴充套件,滿足海量資料處理的效能需求;

苦於業界真的缺少比較好的批處理框架,Spring Batch是業界目前為數不多的優秀批處理框架(Java語言開發),SpringSource和Accenture(埃森哲)共同貢獻了智慧。

Accenture在批處理架構上有著豐富的工業級別的經驗,貢獻了之前專用的批處理體系框架(這些框架歷經數十年研發和使用,為Spring Batch提供了大量的參考經驗)。

SpringSource則有著深刻的技術認知和Spring框架程式設計模型,同時借鑑了JCL(Job Control Language)和COBOL的語言特性。2013年JSR-352將批處理納入規範體系,並被包含在了JEE7之中。這意味著,所有的JEE7應用伺服器都會有批處理的能力,目前第一個實現此規範的應用伺服器是Glassfish 4。當然也可以在Java SE中使用。

但最為關鍵的一點是:JSR-352規範大量借鑑了Spring Batch框架的設計思路,從上圖中的核心模型和概念中可以看出究竟,核心的概念模型完全一致。完整的JSR-252規範可以從https://jcp.org/aboutJava/communityprocess/final/jsr352/index.html下載。

通過Spring Batch框架可以構建出輕量級的健壯的並行處理應用,支援事務、併發、流程、監控、縱向和橫向擴充套件,提供統一的介面管理和任務管理。

框架提供了諸如以下的核心能力,讓大家更關注在業務處理上。更是提供瞭如下的豐富能力:

明確分離批處理的執行環境和應用

將通用核心的服務以介面形式提供

提供“開箱即用” 的簡單的預設的核心執行介面

提供Spring框架中配置、自定義、和擴充套件服務

所有預設實現的核心服務能夠容易的被擴充套件與替換,不會影響基礎層

提供一個簡單的部署模式,使用Maven進行編譯

批處理關鍵領域模型及關鍵架構

先來個Hello World示例,一個典型的批處理作業。

典型的一個作業分為3部分:作業讀、作業處理、作業寫,也是典型的三步式架構。整個批處理框架基本上圍繞Read、Process、Writer來處理。除此之外,框架提供了作業排程器、作業倉庫(用以存放Job的元資料資訊,支援記憶體、DB兩種模式)。

完整的領域概念模型參加下圖:

Job Launcher(作業排程器)是Spring Batch框架基礎設施層提供的執行Job的能力。通過給定的Job名稱和作Job Parameters,可以通過Job Launcher執行Job。

通過Job Launcher可以在Java程式中呼叫批處理任務,也可以在通過命令列或者其它框架(如定時排程框架Quartz)中呼叫批處理任務。

Job Repository來儲存Job執行期的元資料(這裡的元資料是指Job Instance、Job Execution、Job Parameters、Step Execution、Execution Context等資料),並提供兩種預設實現。

一種是存放在記憶體中;另一種將元資料存放在資料庫中。通過將元資料存放在資料庫中,可以隨時監控批處理Job的執行狀態。Job執行結果是成功還是失敗,並且使得在Job失敗的情況下重新啟動Job成為可能。Step表示作業中的一個完整步驟,一個Job可以有一個或者多個Step組成。

批處理框架執行期的模型也非常簡單:

Job Instance(作業例項)是一個執行期的概念,Job每執行一次都會涉及到一個Job Instance。

Job Instance來源可能有兩種:一種是根據設定的Job Parameters從Job Repository(作業倉庫)中獲取一個;如果根據Job Parameters從Job Repository沒有獲取Job Instance,則新建立一個新的Job Instance。

Job Execution表示Job執行的控制代碼,一次Job的執行可能成功也可能失敗。只有Job執行成功後,對應的Job Instance才會被完成。因此在Job執行失敗的情況下,會有一個Job Instance對應多個Job Execution的場景發生。

總結下批處理的典型概念模型,其設計非常精簡的十個概念,完整支撐了整個框架。

Job提供的核心能力包括作業的抽象與繼承,類似面向物件中的概念。對於執行異常的作業,提供重啟的能力。

框架在Job層面,同樣提供了作業編排的概念,包括順序、條件、並行作業編排。

在一個Job中配置多個Step。不同的Step間可以順序執行,也可以按照不同的條件有選擇的執行(條件通常使用Step的退出狀態決定),通過next元素或者decision元素來定義跳轉規則;

為了提高多個Step的執行效率,框架提供了Step並行執行的能力(使用split進行宣告,通常該情況下需要Step之間沒有任何的依賴關係,否則容易引起業務上的錯誤)。Step包含了一個實際執行的批處理任務中的所有必需的資訊,其實現可以是非常簡單的業務實現,也可以是非常複雜的業務處理,Step的複雜程度通常是業務決定的。

每個Step由ItemReader、ItemProcessor、ItemWriter組成,當然根據不同的業務需求,ItemProcessor可以做適當的精簡。同時框架提供了大量的ItemReader、ItemWriter的實現,提供了對FlatFile、XML、Json、DataBase、Message等多種資料型別的支援。

框架還為Step提供了重啟、事務、重啟次數、併發數;以及提交間隔、異常跳過、重試、完成策略等能力。基於Step的靈活配置,可以完成常見的業務功能需求。其中三步走(Read、Processor、Writer)是批處理中的經典抽象。

作為面向批的處理,在Step層提供了多次讀、處理,一次提交的能力。

在Chunk的操作中,可以通過屬性commit-interval設定read多少條記錄後進行一次提交。通過設定commit-interval的間隔值,減少提交頻次,降低資源使用率。Step的每一次提交作為一個完整的事務存在。預設採用Spring提供的宣告式事務管理模式,事務編排非常方便。如下是一個宣告事務的示例:

框架對於事務的支援能力包括:

Chunk支援事務管理,通過commit-interval設定每次提交的記錄數;

支援對每個Tasklet設定細粒度的事務配置:隔離界別、傳播行為、超時;

支援rollback和no rollback,通過skippable-exception-classes和no-rollback-exception-classes進行支撐;

支援JMS Queue的事務級別配置;

另外,在框架資深的模型抽象方面,Spring Batch也做了極為精簡的抽象。

僅僅使用六張業務表儲存了所有的元資料資訊(包括Job、Step的例項,上下文,執行器資訊,為後續的監控、重啟、重試、狀態恢復等提供了可能)。

BATCH_JOB_INSTANCE:作業例項表,用於存放Job的例項資訊

BATCH_JOB_EXECUTION_PARAMS:作業引數表,用於存放每個Job執行時候的引數資訊,該引數實際對應Job例項的。

BATCH_JOB_EXECUTION:作業執行器表,用於存放當前作業的執行資訊,比如建立時間,執行開始時間,執行結束時間,執行的那個Job例項,執行狀態等。

BATCH_JOB_EXECUTION_CONTEXT:作業執行上下文表,用於存放作業執行器上下文的資訊。

BATCH_STEP_EXECUTION:作業步執行器表,用於存放每個Step執行器的資訊,比如作業步開始執行時間,執行完成時間,執行狀態,讀寫次數,跳過次數等資訊。

實現作業的健壯性與擴充套件性

批處理要求Job必須有較強的健壯性,通常Job是批量處理資料、無人值守的,這要求在Job執行期間能夠應對各種發生的異常、錯誤,並對Job執行進行有效的跟蹤。

一個健壯的Job通常需要具備如下的幾個特性:

  1. 容錯性

在Job執行期間非致命的異常,Job執行框架應能夠進行有效的容錯處理,而不是讓整個Job執行失敗;通常只有致命的、導致業務不正確的異常才可以終止Job的執行。

  1. 可追蹤性

Job執行期間任何發生錯誤的地方都需要進行有效的記錄,方便後期對錯誤點進行有效的處理。例如在Job執行期間任何被忽略處理的記錄行需要被有效的記錄下來,應用程式維護人員可以針對被忽略的記錄後續做有效的處理。

  1. 可重啟性

Job執行期間如果因為異常導致失敗,應該能夠在失敗的點重新啟動Job;而不是從頭開始重新執行Job。

框架提供了支援上面所有能力的特性,包括Skip(跳過記錄處理)、Retry(重試給定的操作)、Restart(從錯誤點開始重新啟動失敗的Job):

Skip,在對資料處理期間,如果資料的某幾條的格式不能滿足要求,可以通過Skip跳過該行記錄的處理,讓Processor能夠順利的處理其餘的記錄行。

Retry,將給定的操作進行多次重試,在某些情況下操作因為短暫的異常導致執行失敗,如網路連線異常、併發處理異常等,可以通過重試的方式避免單次的失敗,下次執行操作時候網路恢復正常,不再有併發的異常,這樣通過重試的能力可以有效的避免這類短暫的異常。

Restart,在Job執行失敗後,可以通過重啟功能來繼續完成Job的執行。在重啟時候,批處理框架允許在上次執行失敗的點重新啟動Job,而不是從頭開始執行,這樣可以大幅提高Job執行的效率。

對於擴充套件性,框架提供的擴充套件能力包括如下的四種模式 :

Multithreaded Step 多執行緒執行一個Step;

Parallel Step 通過多執行緒並行執行多個Step;

Remote Chunking 在遠端節點上執行分散式Chunk操作;

Partitioning Step 對資料進行分割槽,並分開執行;

我們先來看第一種的實現Multithreaded Step:

批處理框架在Job執行時預設使用單個執行緒完成任務的執行,同時框架提供了執行緒池的支援(Multithreaded Step模式),可以在Step執行時候進行並行處理,這裡的並行是指同一個Step使用執行緒池進行執行,同一個Step被並行的執行。使用tasklet的屬性task-executor可以非常容易的將普通的Step變成多執行緒Step。

Multithreaded Step的實現示例:

需要注意的是Spring Batch框架提供的大部分的ItemReader、ItemWriter等操作都是執行緒不安全的。

可以通過擴充套件的方式顯現執行緒安全的Step。

下面為大家展示一個擴充套件的實現:

需求:針對資料表的批量處理,實現執行緒安全的Step,並且支援重啟能力,即在執行失敗點可以記錄批處理的狀態。

對於示例中的資料庫讀取元件JdbcCursorItemReader,在設計資料庫表時,在表中增加一個欄位Flag,用於標識當前的記錄是否已經讀取並處理成功,如果處理成功則標識Flag=true,等下次重新讀取的時候,對於已經成功讀取且處理成功的記錄直接跳過處理。

Multithreaded Step(多執行緒步)提供了多個執行緒執行一個Step的能力,但這種場景在實際的業務中使用的並不是非常多。

更多的業務場景是Job中不同的Step沒有明確的先後順序,可以在執行期並行的執行。

Parallel Step:提供單個節點橫向擴充套件的能力

使用場景:Step A、Step B兩個作業步由不同的執行緒執行,兩者均執行完畢後,Step C才會被執行。

框架提供了並行Step的能力。可以通過Split元素來定義並行的作業流,並制定使用的執行緒池。

Parallel Step模式的執行效果如下:

每個作業步並行處理不同的記錄,示例中三個作業步,處理同一張表中的不同資料。

並行Step提供了在一個節點上橫向處理,但隨著作業處理量的增加,有可能一臺節點無法滿足Job的處理,此時我們可以採用遠端Step的方式將多個機器節點組合起來完成一個Job的處理。

Remote Chunking:遠端Step技術本質上是將對Item讀、寫的處理邏輯進行分離;通常情況下讀的邏輯放在一個節點進行操作,將寫操作分發到另外的節點執行。

遠端分塊是一個把step進行技術分割的工作,不需要對處理資料的結構有明確瞭解。

任何輸入源能夠使用單程序讀取並在動態分割後作為"塊"傳送給遠端的工作程序。

遠端程序實現了監聽者模式,反饋請求、處理資料最終將處理結果非同步返回。請求和返回之間的傳輸會被確保在傳送者和單個消費者之間。

在Master節點,作業步負責讀取資料,並將讀取的資料通過遠端技術傳送到指定的遠端節點上,進行處理,處理完畢後Master負責回收Remote端執行的情況。

在Spring Batch框架中通過兩個核心的介面來完成遠端Step的任務,分別是ChunkProvider與ChunkProcessor。

ChunkProvider:根據給定的ItemReader操作產生批量的Chunk操作;

ChunkProcessor:負責獲取ChunkProvider產生的Chunk操作,執行具體的寫邏輯;

Spring Batch中對遠端Step沒有預設的實現,但我們可以藉助SI或者AMQP實現來實現遠端通訊能力。

基於SI實現Remote Chunking模式的示例:

Step本地節點負責讀取資料,並通過MessagingGateway將請求傳送到遠端Step上;遠端Step提供了佇列的監聽器,當請求佇列中有訊息時候獲取請求資訊並交給ChunkHander負責處理。

接下來我們看下最後一種分割槽模式;Partitioning Step:分割槽模式需要對資料的結構有一定的瞭解,如主鍵的範圍、待處理的檔案的名字等。

這種模式的優點在於分割槽中每一個元素的處理器都能夠像一個普通Spring Batch任務的單步一樣執行,也不必去實現任何特殊的或是新的模式,來讓他們能夠更容易配置與測試。

通過分割槽可以實現以下的優點:

分割槽實現了更細粒度的擴充套件;

基於分割槽可以實現高效能的資料切分;

分割槽比遠端通常具有更高的擴充套件性;

分割槽後的處理邏輯,支援本地與遠端兩種模式;

分割槽作業典型的可以分成兩個處理階段,資料分割槽、分割槽處理;

資料分割槽:根據特殊的規則(例如:根據檔名稱,資料的唯一性標識,或者雜湊演算法)將資料進行合理的資料切片,為不同的切片生成資料執行上下文Execution Context、作業步執行器Step Execution。可以通過介面Partitioner生成自定義的分割槽邏輯,Spring Batch批處理框架預設實現了對多檔案的實現org.springframework.batch.core.partition.support.MultiResourcePartitioner;也可以自行擴充套件介面Partitioner來實現自定義的分割槽邏輯。

分割槽處理:通過資料分割槽後,不同的資料已經被分配到不同的作業步執行器中,接下來需要交給分割槽處理器進行作業,分割槽處理器可以本地執行也可以遠端執行被劃分的作業。介面PartitionHandler定義了分割槽處理的邏輯,Spring Batch批處理框架預設實現了本地多執行緒的分割槽處理org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler;也可以自行擴充套件介面PartitionHandler來實現自定義的分割槽處理邏輯。

Spring Batch框架提供了對檔案分割槽的支援,實現類org.springframework.batch.core.partition.support.MultiResourcePartitioner提供了對檔案分割槽的預設支援,根據檔名將不同的檔案處理進行分割槽,提升處理的速度和效率,適合有大量小檔案需要處理的場景。

示例展示了將不同檔案分配到不同的作業步中,使用MultiResourcePartitioner進行分割槽,意味著每個檔案會被分配到一個不同的分割槽中。如果有其它的分割槽規則,可以通過實現介面Partitioner來進行自定義的擴充套件。有興趣的TX,可以自己實現基於資料庫的分割槽能力哦。

總結一下,批處理框架在擴充套件性上提供了4中不同能力,每種都是各自的使用場景,我們可以根據實際的業務需要進行選擇。

批處理框架的不足與增強

Spring Batch批處理框架雖然提供了4種不同的監控方式,但從目前的使用情況來看,都不是非常的友好。

通過DB直接檢視,對於管理人員來講,真的不忍直視;

通過API實現自定義的查詢,這是程式設計師的天堂,確實運維人員的地獄;

提供了Web控制檯,進行Job的監控和操作,目前提供的功能太裸露,無法直接用於生產;

提供JMX查詢方式,對於非開發人員太不友好;

但在企業級應用中面對批量資料處理,僅僅提供批處理框架僅能滿足批處理作業的快速開發、執行能力。

企業需要統一的批處理平臺來處理複雜的企業批處理應用,批處理平臺需要解決作業的統一排程、批處理作業的集中管理和管控、批處理作業的統一監控等能力。

那完美的解決方案是什麼呢?

企業級批處理平臺需要在Spring Batch批處理框架的基礎上,整合排程框架,通過排程框架可以將任務按照企業的需求進行任務的定期執行;

豐富目前Spring Batch Admin(Spring Batch的管理監控平臺,目前能力比較薄弱)框架,提供對Job的統一管理功能,增強Job作業的監控、預警等能力;

通過與企業的組織機構、許可權管理、認證系統進行合理的整合,增強平臺對Job作業的許可權控制、安全管理能力。

結語

感謝您的觀看,如有不足之處,歡迎批評指正。

為了幫助大家讓學習變得輕鬆、高效,給大家免費分享一大批資料,幫助大家在成為大資料工程師,乃至架構師的路上披荊斬棘。在這裡給大家推薦一個大資料學習交流圈:658558542 歡迎大家進×××流討論,學習交流,共同進步。

當真正開始學習的時候難免不知道從哪入手,導致效率低下影響繼續學習的信心。

但最重要的是不知道哪些技術需要重點掌握,學習時頻繁踩坑,最終浪費大量時間,所以有有效資源還是很有必要的。

最後祝福所有遇到瓶疾且不知道怎麼辦的大資料程式設計師們,祝福大家在往後的工作與面試中一切順利。