1. 程式人生 > >微服務業務生命週期流程管控引擎

微服務業務生命週期流程管控引擎

  如題,這裡介紹我最近開源的進行業務生命週期管控的流程引擎(OSS.EventFlow),當然現在開源的工作流框架很多,無心比較異同,僅表達個人感受:

  典型的OA工作流程引擎,這也是當前很多工作流引擎的重要功能,只是OA場景對於系統來說,其業務特點比較突出,抽象歸納相對容易很多。但是這個對於很多業務場景特別是複雜場景能夠觸達的有限,大多複雜業務流程進度的推進是糅合在業務的具體操作程式碼之中,不能有效的把動作執行邏輯和業務的流動邏輯切割,如果開發人員沒有較高的技術能力和一定的業務深度,新人員需要遍覽系統角角落落才能拼湊出產品的概覽圖,甚至可能是個夾雜很多廢棄邏輯的畸形流程。同樣,對於新的產品業務流程開發人員又將其拆解粘合在系統中各處。

   在另一方面,在實際的程式碼開發過程中,當一個模組分給開發人員之後,不管其水平高低,這個模組的質量主管基本很難有效控制,好點的團隊有程式碼評審,但終究只能算是事後補救。這些基本帶來以下三個問題,也是我在這個框架編寫過程中一直思考的:

  1. 微服務的劃分,在單個業務點上已經做到了獨立,可是站在整個產品的流程上看,多個服務可能的交叉依賴呼叫,特別是複雜邏輯,產品流程如何落實到程式碼中進行清晰管控

  2. 業務單元的邊界和銜接處理,常見的是在當前功能方法的底部,直接呼叫下一步的方法,又如業務之間新增訊息佇列的處理,都直接侵入業務程式碼中,當前方法的的單元化和可複用性,直接受限於實施工程師水平的高低。同時這個又直接影響開發負責人或開發主管的管控能力。

  3. 隨著產品的迭代,業務流程的快速變化,如何能夠不動已有業務單元程式碼本身的情況下,快速在當前業務流程中排除或新增業務操作單元。

   要減少以上的問題的發生,關鍵在於如何解決業務操作單元之間的關聯性。回到現實,對於任何產品,它一定存在一個業務生命週期,在這個週期內,圍繞某個物件,隨著時間推進,執行一系列的動作,最終得到某個業務結果。既然存在時序性,那麼就可以嘗試抽象一個時序輪廓的路徑圖,在這個輪廓之下,再去填充具體動作實現,進度由這個時序輪廓路徑圖根據具體的動作結果決定如何推進,這個路徑圖則可以和產品流程圖進行對映,形成系統裡的流程管控中樞,在系統編碼層級將事件內呼叫的”隱式導向”轉變為事件外的“顯式導向”。

   OSS.EventFlow是以BPMN 2.0 流程管理為思路,設計的輕量級業務生命週期流程引擎基礎框架,將業務領域物件的流程管控和事件功能抽象剝離,切斷事件功能方法內的鏈式呼叫,提權至流程引擎統一協調管控,事件功能作為獨立處理單元嵌入業務流程之中,由流程引擎處理事件的觸發與訊息傳遞,達成事件處理單元的有效隔離。由此流程的銜接變成可獨立程式設計的部分,同時向上層提供業務動作的獨立擴充套件,保證業務單元的絕對獨立和可複用性, 目的是可以像搭積木一樣來完成不同功能程式碼的整合,系統向真正的低程式碼平臺過渡。

   OSS.EventFlow引擎的原理是將整個業務流當做一個流程管道,結合流程流轉的特性,此引擎抽象了三類核心流程的管道元件:

  1. 事件活動元件

   這類元件主要是處理任務的具體內容,如傳送簡訊,執行下單,扣減庫存等實際業務操作

  2. 閘道器元件

  這類元件主要負責業務流程方向性的邏輯規則處理,如分支,合併流程

  3. 聯結器元件

  這類元件主要負責其他元件之間的訊息傳遞與轉化

 

一. 事件活動元件

  這個元件就是業務的動作本身,根據任務觸發的特性,如關聯自動執行,中斷觸發(如使用者觸發,或訊息佇列等),根據這兩種情形,提供了兩個抽象基類:

  1. BaseActivity - 直接執行活動元件

  常見如自動稽核功能,或者支付成功後自動觸發郵件傳送等,最簡單也是最基本的一種動作處理。 繼承此基類,重寫Executing方法實現活動內容,同一個流體下實現自動關聯執行。 如果Executing方法返回False,則觸發Block,業務流不再向後續管道傳遞 返回True,則流體自動流入後續管道

  2.BaseActionActivity<TContext, TResult> - 使用者觸發活動元件

  繼承此基類 ,重寫Executing方法(自定義返回結果型別)實現活動內容。 當業務流流入當前元件時,觸發呼叫Notice(虛方法可重寫),之後業務流動停止, 當用戶觸發時,顯式呼叫 Action 方法(內部呼叫Executing返回自定義結果型別),流程繼續向後流動執行。

二. 閘道器元件

  此元件主要負責邏輯的規則處理,業務的走向邏輯無非分與合,這裡給出兩個基類:

  1.BaseAggregateGateway - 聚合業務分支流程活動元件

   將多條業務分支聚合到當前閘道器元件下,由當前閘道器統一控制是否將業務流程向後傳遞,只需要繼承此基類重寫IfMatchCondition 方法即可

  2.BaseBranchGateway - 分支閘道器元件

   此元件將業務分流處理,定義流體時通過AddBranchPipe新增多個分支,至於如何分流,只需要繼承此基類重寫FilterNextPipes方法即可,你也可以在此之上實現BPMN中的幾種閘道器型別(並行,排他,和包含)。

三. 聯結器元件

  此元件主要負責訊息的傳遞和轉化處理,在訊息的傳遞過程中又支援直接傳遞和非同步緩衝(IBufferTunnel)傳遞,根據是否需要轉化,或者非同步定義三個基類如下:

  1.BaseConnector<InContext, OutContext> - 轉化連線元件

  業務流經過此元件,直接執行Convert方法(需重寫),轉化成對應的下個元件執行引數,自動進入下個元件。

  2.BaseBufferConnector<TContext> - 非同步緩衝連線元件

  繼承IBufferTunnel介面 繼承此元件後,必須重寫Push方法,實現非同步緩衝儲存的處理,業務流進入此元件後,呼叫Push方法儲存,之後業務流動停止, 訊息喚醒時,需顯式呼叫Pop方法,業務流繼續向後執行

  3.BaseBufferConnector<InContext, OutContext> - 非同步緩衝+轉化 連線元件

   繼承此元件後,必須重寫Push,Convert方法,實現非同步緩衝儲存和轉化的處理,業務流進入此元件後,同樣呼叫Push方法儲存,之後業務流動停止, 訊息喚醒時,需顯式呼叫Pop方法(內部呼叫Convert方法,完成引數轉化),業務流繼續向後執行

四. 簡單示例場景

  首先我們先假設當前有一個進貨管理的場景,,需經歷 進貨申請,申請審批,購買支付,入庫(同時郵件通知申請人) 幾個環節,流程圖如下: 

  

 根據此流程圖,每個環節對應一個事件活動,這裡以申請活動我們定義如下:

    public class ApplyActivity : BaseActivity<ApplyContext>
    {
        protected override Task<bool> Executing(ApplyContext data)
        {
            // ......
            LogHelper.Info("這裡剛才發生了一個採購申請"); 
            return Task.FromResult(true);
        }
    }

這裡為了方便觀察,直接繼承 BaseActivity,即多個活動連線後,自動執行。 相同的處理方式我們定義剩下幾個環節事件,列表如下:

    ApplyActivity      - 申請事件    (引數:ApplyContext)
    AutoAuditActivity  - 稽核事件    (引數:ApplyContext)
    PayActivity        - 購買事件    (引數:PayContext)
    StockActivity      - 入庫事件    (引數:StockContext)
    EmailActivity      - 傳送郵件事件    (引數:SendEmailContext)

以上五個事件活動,其具體實現和引數完全獨立,同時因為購買支付後郵件和入庫是相互獨立的事件,定義分支閘道器做分流(規則)處理,程式碼如下:

    public class PayGateway:BaseBranchGateway<PayContext>
    {
        protected override IEnumerable<BasePipe<PayContext>> FilterNextPipes(List<BasePipe<PayContext>> branchItems, PayContext context)
        {
            // ......
            LogHelper.Info("這裡進行支付通過後的分流");
            return branchItems;
        }
    }

這裡的意思相對簡單,即傳入的所有的分支不用過濾,直接全部分發。

同樣因為五個事件的方法引數不盡相同,中間的我們新增訊息聯結器,作為訊息的中轉和轉化處理(也可以在建立流體時表示式處理),以支付引數到郵件的引數轉化示例:

    public class PayEmailConnector : BaseConnector<PayContext, SendEmailContext>
    {
        protected override SendEmailContext Convert(PayContext inContextData)
        {
            // ......
            return new SendEmailContext() { id = inContextData.id };
        }
    }

通過以上,申購流程的元件定義完畢,串聯使用如下(這裡是單元測試類,實際業務我們可以建立一個Service處理):

        public readonly ApplyActivity ApplyActivity = new ApplyActivity();
        public readonly AuditActivity AuditActivity = new AuditActivity();

        public readonly PayActivity PayActivity = new PayActivity();

        public readonly PayGateway PayGateway = new PayGateway();

        public readonly StockConnector StockConnector = new StockConnector();
        public readonly StockActivity  StockActivity  = new StockActivity();

        public readonly PayEmailConnector EmailConnector = new PayEmailConnector();
        public readonly SendEmailActivity EmailActivity  = new SendEmailActivity();

          //  建構函式內定義流體關聯
        public BuyFlowTests()
        {
            ApplyActivity
            .Append(AuditActivity)
            .AppendConvert(applyContext => new PayContext() {id = applyContext.id})// 表示式方式的轉化器
            .Append(PayActivity)
            .Append(PayGateway);

            // 閘道器分支 - 傳送郵件分支
            PayGateway.AddBranchPipe(EmailConnector)
            .Append(EmailActivity);

            // 閘道器分支- 入庫分支
            PayGateway.AddBranchPipe(StockConnector)
            .Append(StockActivity);
            //.Append(後續事件)
        }


        [TestMethod]
        public async Task FlowTest()
        {
            await ApplyActivity.Start(new ApplyContext()
            {
                id = "test_business_id"
            });
        }

執行單元測試,結果如下:

xxx Detail:這裡剛才發生了一個採購申請

xxx Detail:管理員稽核通過

xxx Detail:發起支付處理

xxx Detail:這裡進行支付通過後的分流

xxx Detail:分流-1.郵件傳送

xxx Detail:分流-2.庫存儲存

 

如果你已經看到這裡,並且感覺還行的話可以在下方點個贊,或者也可以關注我的公眾號(見二維碼) 

_________________________________________