微服務業務生命週期流程管控引擎
如題,這裡介紹我最近開源的進行業務生命週期管控的流程引擎(OSS.EventFlow),當然現在開源的工作流框架很多,無心比較異同,僅表達個人感受:
典型的OA工作流程引擎,這也是當前很多工作流引擎的重要功能,只是OA場景對於系統來說,其業務特點比較突出,抽象歸納相對容易很多。但是這個對於很多業務場景特別是複雜場景能夠觸達的有限,大多複雜業務流程進度的推進是糅合在業務的具體操作程式碼之中,不能有效的把動作執行邏輯和業務的流動邏輯切割,如果開發人員沒有較高的技術能力和一定的業務深度,新人員需要遍覽系統角角落落才能拼湊出產品的概覽圖,甚至可能是個夾雜很多廢棄邏輯的畸形流程。同樣,對於新的產品業務流程開發人員又將其拆解粘合在系統中各處。
在另一方面,在實際的程式碼開發過程中,當一個模組分給開發人員之後,不管其水平高低,這個模組的質量主管基本很難有效控制,好點的團隊有程式碼評審,但終究只能算是事後補救。這些基本帶來以下三個問題,也是我在這個框架編寫過程中一直思考的:
1. 微服務的劃分,在單個業務點上已經做到了獨立,可是站在整個產品的流程上看,多個服務可能的交叉依賴呼叫,特別是複雜邏輯,產品流程如何落實到程式碼中進行清晰管控
2. 業務單元的邊界和銜接處理,常見的是在當前功能方法的底部,直接呼叫下一步的方法,又如業務之間新增訊息佇列的處理,都直接侵入業務程式碼中,當前方法的的單元化和可複用性,直接受限於實施工程師水平的高低。同時這個又直接影響開發負責人或開發主管的管控能力。
3. 隨著產品的迭代,業務流程的快速變化,如何能夠不動已有業務單元程式碼本身的情況下,快速在當前業務流程中排除或新增業務操作單元。
要減少以上的問題的發生,關鍵在於如何解決業務操作單元之間的關聯性。回到現實,對於任何產品,它一定存在一個業務生命週期,在這個週期內,圍繞某個物件,隨著時間推進,執行一系列的動作,最終得到某個業務結果。既然存在時序性,那麼就可以嘗試抽象一個時序輪廓的路徑圖,在這個輪廓之下,再去填充具體動作實現,進度由這個時序輪廓路徑圖根據具體的動作結果決定如何推進,這個路徑圖則可以和產品流程圖進行對映,形成系統裡的流程管控中樞,在系統編碼層級將事件內呼叫的”隱式導向”轉變為事件外的“顯式導向”。
OSS.EventFlow是以BPMN 2.0 流程管理為思路,設計的輕量級業務生命週期流程引擎基礎框架,將業務領域物件的流程管控和事件功能抽象剝離,切斷事件功能方法內的鏈式呼叫,提權至流程引擎統一協調管控,事件功能作為獨立處理單元嵌入業務流程之中,由流程引擎處理事件的觸發與訊息傳遞,達成事件處理單元的有效隔離。由此流程的銜接變成可獨立程式設計的部分,同時向上層提供業務動作的獨立擴充套件,保證業務單元的絕對獨立和可複用性, 目的是可以像搭積木一樣來完成不同功能程式碼的整合,系統向真正的低程式碼平臺過渡。
OSS.EventFlow引擎的原理是將整個業務流當做一個流程管道,結合流程流轉的特性,此引擎抽象了三類核心流程的管道元件:
1. 事件活動元件
這類元件主要是處理任務的具體內容,如傳送簡訊,執行下單,扣減庫存等實際業務操作
2. 閘道器元件
這類元件主要負責業務流程方向性的邏輯規則處理,如分支,合併流程
3. 聯結器元件
這類元件主要負責其他元件之間的訊息傳遞與轉化
一. 事件活動元件
這個元件就是業務的動作本身,根據任務觸發的特性,如關聯自動執行,中斷觸發(如使用者觸發,或訊息佇列等),根據這兩種情形,提供了兩個抽象基類:
1. BaseActivity - 直接執行活動元件
常見如自動稽核功能,或者支付成功後自動觸發郵件傳送等,最簡單也是最基本的一種動作處理。 繼承此基類,重寫Executing方法實現活動內容,同一個流體下實現自動關聯執行。 如果Executing方法返回False,則觸發Block,業務流不再向後續管道傳遞 返回True,則流體自動流入後續管道
2.BaseActionActivity<TContext, TResult> - 使用者觸發活動元件
繼承此基類 ,重寫Executing方法(自定義返回結果型別)實現活動內容。 當業務流流入當前元件時,觸發呼叫Notice(虛方法可重寫),之後業務流動停止, 當用戶觸發時,顯式呼叫 Action 方法(內部呼叫Executing返回自定義結果型別),流程繼續向後流動執行。
二. 閘道器元件
此元件主要負責邏輯的規則處理,業務的走向邏輯無非分與合,這裡給出兩個基類:
1.BaseAggregateGateway - 聚合業務分支流程活動元件
將多條業務分支聚合到當前閘道器元件下,由當前閘道器統一控制是否將業務流程向後傳遞,只需要繼承此基類重寫IfMatchCondition 方法即可
2.BaseBranchGateway - 分支閘道器元件
此元件將業務分流處理,定義流體時通過AddBranchPipe新增多個分支,至於如何分流,只需要繼承此基類重寫FilterNextPipes方法即可,你也可以在此之上實現BPMN中的幾種閘道器型別(並行,排他,和包含)。
三. 聯結器元件
此元件主要負責訊息的傳遞和轉化處理,在訊息的傳遞過程中又支援直接傳遞和非同步緩衝(IBufferTunnel)傳遞,根據是否需要轉化,或者非同步定義三個基類如下:
1.BaseConnector<InContext, OutContext> - 轉化連線元件
業務流經過此元件,直接執行Convert方法(需重寫),轉化成對應的下個元件執行引數,自動進入下個元件。
2.BaseBufferConnector<TContext> - 非同步緩衝連線元件
繼承IBufferTunnel介面 繼承此元件後,必須重寫Push方法,實現非同步緩衝儲存的處理,業務流進入此元件後,呼叫Push方法儲存,之後業務流動停止, 訊息喚醒時,需顯式呼叫Pop方法,業務流繼續向後執行
3.BaseBufferConnector<InContext, OutContext> - 非同步緩衝+轉化 連線元件
繼承此元件後,必須重寫Push,Convert方法,實現非同步緩衝儲存和轉化的處理,業務流進入此元件後,同樣呼叫Push方法儲存,之後業務流動停止, 訊息喚醒時,需顯式呼叫Pop方法(內部呼叫Convert方法,完成引數轉化),業務流繼續向後執行
四. 簡單示例場景
首先我們先假設當前有一個進貨管理的場景,,需經歷 進貨申請,申請審批,購買支付,入庫(同時郵件通知申請人) 幾個環節,流程圖如下:
根據此流程圖,每個環節對應一個事件活動,這裡以申請活動我們定義如下:
public class ApplyActivity : BaseActivity<ApplyContext> { protected override Task<bool> Executing(ApplyContext data) { // ...... LogHelper.Info("這裡剛才發生了一個採購申請"); return Task.FromResult(true); } }
這裡為了方便觀察,直接繼承 BaseActivity,即多個活動連線後,自動執行。 相同的處理方式我們定義剩下幾個環節事件,列表如下:
ApplyActivity - 申請事件 (引數:ApplyContext) AutoAuditActivity - 稽核事件 (引數:ApplyContext) PayActivity - 購買事件 (引數:PayContext) StockActivity - 入庫事件 (引數:StockContext) EmailActivity - 傳送郵件事件 (引數:SendEmailContext)
以上五個事件活動,其具體實現和引數完全獨立,同時因為購買支付後郵件和入庫是相互獨立的事件,定義分支閘道器做分流(規則)處理,程式碼如下:
public class PayGateway:BaseBranchGateway<PayContext> { protected override IEnumerable<BasePipe<PayContext>> FilterNextPipes(List<BasePipe<PayContext>> branchItems, PayContext context) { // ...... LogHelper.Info("這裡進行支付通過後的分流"); return branchItems; } }
這裡的意思相對簡單,即傳入的所有的分支不用過濾,直接全部分發。
同樣因為五個事件的方法引數不盡相同,中間的我們新增訊息聯結器,作為訊息的中轉和轉化處理(也可以在建立流體時表示式處理),以支付引數到郵件的引數轉化示例:
public class PayEmailConnector : BaseConnector<PayContext, SendEmailContext> { protected override SendEmailContext Convert(PayContext inContextData) { // ...... return new SendEmailContext() { id = inContextData.id }; } }
通過以上,申購流程的元件定義完畢,串聯使用如下(這裡是單元測試類,實際業務我們可以建立一個Service處理):
public readonly ApplyActivity ApplyActivity = new ApplyActivity(); public readonly AuditActivity AuditActivity = new AuditActivity(); public readonly PayActivity PayActivity = new PayActivity(); public readonly PayGateway PayGateway = new PayGateway(); public readonly StockConnector StockConnector = new StockConnector(); public readonly StockActivity StockActivity = new StockActivity(); public readonly PayEmailConnector EmailConnector = new PayEmailConnector(); public readonly SendEmailActivity EmailActivity = new SendEmailActivity(); // 建構函式內定義流體關聯 public BuyFlowTests() { ApplyActivity .Append(AuditActivity) .AppendConvert(applyContext => new PayContext() {id = applyContext.id})// 表示式方式的轉化器 .Append(PayActivity) .Append(PayGateway); // 閘道器分支 - 傳送郵件分支 PayGateway.AddBranchPipe(EmailConnector) .Append(EmailActivity); // 閘道器分支- 入庫分支 PayGateway.AddBranchPipe(StockConnector) .Append(StockActivity); //.Append(後續事件) } [TestMethod] public async Task FlowTest() { await ApplyActivity.Start(new ApplyContext() { id = "test_business_id" }); }
執行單元測試,結果如下:
xxx Detail:這裡剛才發生了一個採購申請 xxx Detail:管理員稽核通過 xxx Detail:發起支付處理 xxx Detail:這裡進行支付通過後的分流 xxx Detail:分流-1.郵件傳送 xxx Detail:分流-2.庫存儲存
如果你已經看到這裡,並且感覺還行的話可以在下方點個贊,或者也可以關注我的公眾號(見二維碼)
_________________________________________