1. 程式人生 > 實用技巧 >.NET 開源工作流: Slickflow流程引擎高階開發(七)--訊息佇列(RabbitMQ)的整合使用

.NET 開源工作流: Slickflow流程引擎高階開發(七)--訊息佇列(RabbitMQ)的整合使用

前言:工作流流程過程中,除了正常的人工審批型別的節點外,事件型別的節點處理也尤為重要。比如比較常見的事件型別的節點有:Timer/Message/Signal等。本文重點闡述訊息型別的節點處理,以及實現訊息驅動流程過程中對訊息佇列(RabbitMQ)的整合使用方式。

1. 節點間訊息傳遞

1.1 MessageThrow

訊息丟擲節點,當執行到這個節點時,特定訊息主題的訊息記錄將會新增到訊息佇列,然後等待被訂閱的訊息消費者來啟用訊息處理服務。

1.2 MessageCatch

訊息接收節點,訊息接收節點上定義了訊息主題,並且在訊息佇列中訂閱了該主題。當有特定主題的訊息被髮布時候,訊息消費者會捕獲到訊息主題,同時訊息處理服務中,將會定位到流程的該節點位置,然後通過流程服務來判定下一步的流轉流轉。

1.3 單一流程內的節點訊息傳遞

該流程只進行訊息的釋出或者接收,而訊息的另外一方則可能是業務系統。兩者之間的關聯是靠訊息主題來識別。如下圖所示:

1.4 跨流程間的節點訊息傳遞

通常可以用泳道流程來表示跨流程間的訊息通知。泳道流程中,有多個流程,其中一個是泳道主流程,其它的則為泳道附屬流程。將會在下面的章節中具體描述。

2. 跨流程訊息傳遞

2.1 泳道流程

泳道流程中,使用多個泳道來區分不同的流程,流程之間可以通過訊息來傳遞資訊,如下圖所示:

2.2 訊息節點型別

訊息節點在流程中的位置主要有:開始節點(Start)、中間節點(Internmediate)和結束節點(End)

3. 訊息佇列(RabbitMQ)整合使用

3.1 訊息佇列介紹

RabbitMQ是實現了高階訊息佇列協議(AMQP)的開源訊息代理軟體(亦稱面向訊息的中介軟體)。RabbitMQ伺服器是用Erlang語言編寫的,而群集和故障轉移是構建在開放電信平臺框架上的。所有主要的程式語言均有與代理介面通訊的客戶端庫。

來源:https://baike.baidu.com/item/rabbitmq/9372144?fr=aladdin

3.2 訊息佇列整合引擎

Slickflow引擎整合RabbitMQ訊息佇列,其特點是:已經經過大量使用者案例檢驗,證明效能能夠滿足日常業務需求,同時檔案豐富,便於開發人員學習上手。

4. 泳道流程訊息佇列整合案例

下圖是泳道流程的具體示例,包含了一個主流程,和一個泳道附屬流程,兩個流程間的觸發處理,是靠訊息來傳遞。

4.1 流程協作說明

1) 泳道附屬流程的啟動

泳道附屬流程的開始節點(StartCatch)是一個訊息接收節點,其接收內容來自主流程的中間訊息節點(IntermediateThrow)。當訂單主流程的“同步訂單”節點完成後,將會發布訊息,此時,生產附屬流程因為訂閱了該訊息主題,所以根據開始節點的型別為訊息觸發型別,生產附屬流程將會被啟動,生成新的流程例項。

2) 中間訊息節點接收訊息

流程執行到到中間節點位置後,需要等待特殊主體的訊息記錄後,然後才能繼續流轉。常見的場景:比如電子商務系統接入第三方支付系統時,當商城客戶下單支付後,從第三方回傳支付成功的訊息後,當前的訂購流程財主繼續向下流轉。這種訊息等待接收的處理,就可以使用訊息佇列來完成。

在泳道流程中,當"生產計劃"流程完成後,將會發布訊息給主流程,泳道主流程接收到訊息後,可以繼續處理當前的節點,並且向下進行流程流轉到"客戶反饋"節點。

4.2 訊息驅動過程說明

1) 訊息釋出程式碼示例

        /// <summary>
/// 訊息釋出
/// </summary>
/// <param name="topic">主題</param>
/// <param name="line">內容</param>
public void Publish(string topic, string line)
{
var channel = MQClientFactory.CreatePublishChannel();
channel.QueueDeclare(queue: topic,
durable: false,
exclusive: false,
autoDelete: false,
arguments: null); var body = Encoding.UTF8.GetBytes(line);
channel.BasicPublish(exchange: "",
routingKey: topic,
basicProperties: null,
body: body);
}

  

2) 訊息訂閱程式碼示例

        /// <summary>
/// 訊息訂閱
/// </summary>
/// <param name="topic">主題</param>
/// <returns></returns>
public void Subscribe(string topic)
{
var channel = MQClientFactory.CreateRecieveChannel();
channel.QueueDeclare(queue: topic,
durable: false,
exclusive: false,
autoDelete: false,
arguments: null); var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var line = Encoding.UTF8.GetString(body);
var msgMediator = new MessageMediator();
msgMediator.InvokeFromMessage(ConsumeMessageFunction, topic, line);
};
channel.BasicConsume(queue: topic,
autoAck: true,
consumer: consumer);
}

5. 訊息佇列資料監控

RabbitMQ帶有後檯面板監控,可以看到訊息佇列中的資料情況,可以幫助開發人員除錯訊息處理程式的正常工作。

6. 總結

流程引擎整合訊息佇列RabbitMQ的方式,可以保障業務系統跟流程關聯絡統整合關聯的可靠性。存在訊息佇列中的訊息主題,可以被訊息訂閱方在後期處理,使得系統的部署更加靈活,降低了系統之間的耦合性。其次,跨流程直接的訊息通訊比較常見,通過整合訊息佇列來儲存訊息和分發訊息,可以使得業務系統處理的能力加強。從單一流程的流轉過度到多流程直接的集體協作模式。