1. 程式人生 > 其它 >設計訊息匯流排(對訊息佇列進行二次封裝) 基於ASP.NET Core 5.0使用RabbitMQ訊息佇列實現事件匯流排(EventBus)

設計訊息匯流排(對訊息佇列進行二次封裝) 基於ASP.NET Core 5.0使用RabbitMQ訊息佇列實現事件匯流排(EventBus)

基於ASP.NET Core 5.0使用RabbitMQ訊息佇列實現事件匯流排(EventBus)

 

文章閱讀請前先參考看一下 https://www.cnblogs.com/hudean/p/13858285.html 安裝RabbitMQ訊息佇列軟體與瞭解C#中如何使用RabbitMQ 和 https://www.cnblogs.com/Keep-Ambition/p/8038885.html 新增一個使用者並可以遠端訪問,

 訊息佇列的作用:跨服務通訊、服務之間解耦,削峰、非同步,其實還有一個作用是提高接收者效能

RabbitMQ 官方網站:https://www.rabbitmq.com/

RabbitMQ 中文文件網址:http://rabbitmq.mr-ping.com/

本文程式碼GitHub 地址是: https://github.com/hudean/MQDemo

一、初衷

為什麼要設計訊息匯流排(對訊息佇列進行二次封裝),而不是讓各業務系統直接使用RabbitMQ、Kafka、RocketMQ這樣的成熟的訊息佇列呢? 如果業務系統比較簡單,確實不需要考慮這樣的問題,直接拿最成熟的開源方案是最好的方式,但是在複雜的多系統下、多人分工合作的場景下,直接使用成熟的訊息佇列一般都會面臨以下問題
  • 開發難度大,各系統間分別隔離,需要關注訊息中介軟體的各種複雜繁瑣的配置,關注不同的訊息則需要對接不同的訊息佇列
  • 維護成本高,各系統或團隊需要分別管理訊息中介軟體、處理各種服務異常、(訊息中介軟體的高可用、業務的高可用等)
  • 管理難度大,沒法對訊息的生產和消費進行業務管理,也不方便對訊息中的敏感資料進行許可權管理
  • 擴充套件成本高,無法統一訊息系統擴充套件功能,如路由、延時、重試、消費確認等 總結訊息佇列是一個面向技術的接入,重點關注訊息佇列的配置、介面對接;而訊息匯流排則是通過遮蔽部署、分組和通訊等技術細節,實現一個面向業務的接入,重點關注要接收什麼訊息。

定義

事件匯流排是實現基於事件驅動模式的方式之一,事件傳送者將事件訊息傳送到一個事件總線上,事件訂閱者向事件匯流排訂閱和接收事件,而後再處理接收到的事件。固然,訂閱者不只能夠接收和消費事件,它們自己也能夠建立事件,並將它們傳送到事件總線上。

事件匯流排是對釋出-訂閱模式的一種實現。它是一種集中式事件處理機制,容許不一樣的元件之間進行彼此通訊而又不須要相互依賴,達到一種解耦的目的。

如前所述,使用基於事件的通訊時,當值得注意的事件發生時,微服務會發布事件,例如更新業務實體時。 其他微服務訂閱這些事件。 微服務收到事件時,可以更新其自己的業務實體,這可能會導致釋出更多事件。 這是最終一致性概念的本質。 通常通過使用事件匯流排實現來執行此釋出/訂閱系統。 事件匯流排可以設計為包含 API 的介面,該 API 是訂閱和取消訂閱事件和釋出事件所需的。 它還可以包含一個或多個基於跨程序或訊息通訊的實現,例如支援非同步通訊和釋出/訂閱模型的訊息佇列或服務匯流排。

可以使用事件來實現跨多個服務的業務事務,這可提供這些服務間的最終一致性。 最終一致事務由一系列分散式操作組成。 在每個操作中,微服務會更新業務實體,併發布可觸發下一個操作的事件。 下面的圖 6-18 顯示了通過事件匯流排釋出了 PriceUpdated 事件,因此價格更新傳播到購物籃和其他微服務。

圖 6-18。 基於事件匯流排的事件驅動的通訊

本部分介紹如何使用通用事件匯流排介面(如圖 6-18 所示)實現這種與 .NET 的通訊。 存在多種可能的實現,每種實現使用不同的技術或基礎結構,例如 RabbitMQ、Azure 服務匯流排或任何其他第三方開源或商用服務匯流排。

三、整合事件

整合事件用於跨多個微服務或外部系統保持域狀態同步。 此功能可通過在微服務外發布整合事件來實現。 將事件釋出到多個接收方微服務(訂閱到整合事件的儘可能多個微服務)時,每個接收方微服務中的相應事件處理程式會處理該事件。

整合事件基本上是資料保持類,如以下示例所示:

 View Code  View Code

 

 

事件匯流排

事件匯流排可實現釋出/訂閱式通訊,無需元件之間相互顯式識別,如圖 6-19 所示。

圖 6-19。 事件匯流排的釋出/訂閱基礎知識

上圖顯示了微服務 A 釋出到事件匯流排,這會分發到訂閱微服務 B 和 C,釋出伺服器無需知道訂閱伺服器。 事件匯流排與觀察者模式和釋出-訂閱模式相關。

觀察者模式

觀察者模式中,主物件(稱為可觀察物件)將相關資訊(事件)告知其他感興趣的物件(稱為觀察者)。

釋出-訂閱(釋出/訂閱)模式

釋出/訂閱模式的用途與觀察者模式相同:某些事件發生時,需要告知其他服務。 但觀察者模式與釋出/訂閱模式之間存在重要區別。 在觀察者模式中,直接從可觀察物件廣播到觀察者,因此它們“知道”彼此。 但在釋出/訂閱模式中,存在稱為中轉站、訊息中轉站或事件匯流排的第三個元件,釋出伺服器和訂閱伺服器都知道第三個元件。 因此,使用釋出/訂閱模式時,釋出伺服器和訂閱伺服器通過所述的事件匯流排或訊息中轉站精確分離。

中轉站或事件匯流排

如何實現釋出伺服器和訂閱伺服器之間的匿名? 一個簡單方法是讓中轉站處理所有通訊。 事件匯流排是一個這樣的中轉站。

事件匯流排通常由兩部分組成:

  • 抽象或介面。

  • 一個或多個實現。

在圖 6-19 中,從應用程式角度看,會發現事件匯流排實際上是一個釋出/訂閱通道。 實現此非同步通訊的方式可能會有差異。 它可以具有多個實現,以便你進行交換,具體取決於環境要求(例如,生產和開發環境)。

在圖 6-20 中,可看到事件匯流排的抽象,包含基於 RabbitMQ、Azure 服務匯流排或其他事件/訊息中轉站等基礎結構訊息技術的多個實現。

圖 6- 20。 事件匯流排的多個實現

最好通過介面定義事件匯流排,以便它可使用多種技術(例如 RabbitMQ、Azure 服務匯流排等)來實現。 但是,如前所述,僅當需要由你的抽象支援的基本事件匯流排功能時,才適合使用你自己的抽象(事件匯流排介面)。 如果需要更豐富的服務匯流排功能,應使用你喜歡的商用服務匯流排提供的 API 和抽象,而不是你自己的抽象。

定義事件匯流排介面

首先,讓我們瞭解一下事件匯流排介面的一些實現程式碼和可能的實現。 介面應是通用和簡單的,如下所示介面。

 View Code

藉助 RabbitMQ 的事件匯流排實現,微服務可訂閱事件、釋出事件和接收事件,如圖 6-21 所示。

圖 6-21。 事件匯流排的 RabbitMQ 實現

RabbitMQ 充當訊息釋出伺服器和訂閱者之間的中介,處理分發。 在程式碼中,EventBusRabbitMQ 類實現了泛型 IEventBus 介面。 此實現基於依賴項注入,以便可以從此開發/測試版本交換到生產版本。

 View Code

示例開發/測試事件匯流排的 RabbitMQ 實現是樣板程式碼。 它必須處理與 RabbitMQ 伺服器的連線,並提供用於將訊息事件釋出到佇列的程式碼。 它還必須為每個事件型別實現收集整合事件處理程式的字典;這些事件型別可以對每個接收器微服務具有不同的例項化和不同的訂閱,如圖 6-21 所示。

四、使用 RabbitMQ 實現一個簡單的釋出方法

下面的程式碼是 RabbitMQ 的事件匯流排實現的簡化版,用以展示整個方案。 你真的不必以這種方式處理連線。 要檢視完整的實現,在後面

 View Code

五、使用 RabbitMQ API 實現訂閱程式碼

與釋出程式碼一樣,下面的程式碼是 RabbitMQ 事件匯流排實現的簡化部分。

public class EventBusRabbitMQ : IEventBus, IDisposable
{
    // Member objects and other methods ...
    // ...

    public void Subscribe<T, TH>()
        where T : IntegrationEvent
        where TH : IIntegrationEventHandler<T>
    {
        var eventName = _subsManager.GetEventKey<T>();

        var containsKey = _subsManager.HasSubscriptionsForEvent(eventName);
        if (!containsKey)
        {
            if (!_persistentConnection.IsConnected)
            {
                _persistentConnection.TryConnect();
            }

            using (var channel = _persistentConnection.CreateModel())
            {
                channel.QueueBind(queue: _queueName,
                                    exchange: BROKER_NAME,
                                    routingKey: eventName);
            }
        }

        _subsManager.AddSubscription<T, TH>();
    }
}

每個事件型別都有一個相關的通道,以獲取 RabbitMQ 中的事件。 然後,可以根據需要在每個通道和事件型別中擁有儘可能多的事件處理程式。

訂閱方法接受一個 IIntegrationEventHandler 物件,該物件相當於當前微服務中的回撥方法,以及其相關的 IntegrationEvent 物件。 然後,程式碼將該事件處理程式新增到事件處理程式列表,每個客戶端微服務的每個整合事件型別都可具有事件處理程式。 如果客戶端程式碼尚未訂閱事件,該程式碼將為事件型別建立一個通道,以便在從任何其他服務中釋出事件時,它可以從 RabbitMQ 以推送方式接收事件。

六、使用 RabbitMQ 完整實現事件匯流排程式碼

結構圖如下:

 

 

動態整合事件處理器介面

 View Code

事件匯流排介面

 View Code

整合事件處理器介面

 View Code

整合事件

 View Code

GenericTypeExtensions

 View Code

事件匯流排訂閱管理器介面

 View Code

記憶體中事件匯流排訂閱管理器

 View Code

SubscriptionInfo

 View Code

EventBusRabbitMQ裡的程式碼

預設 RabbitMQ 持久連線

 View Code

使用RabbitMQ的事件匯流排

 View Code

RabbitMQ持續連線介面

 View Code

完整程式碼 地址 : https://github.com/hudean/EventBusTest.git

執行訂閱服務和釋出服務結果如圖

 

 

 

文章閱讀請前先參考看一下 https://www.cnblogs.com/hudean/p/13858285.html 安裝RabbitMQ訊息佇列軟體與瞭解C#中如何使用RabbitMQ 和 https://www.cnblogs.com/Keep-Ambition/p/8038885.html 新增一個使用者並可以遠端訪問,

 訊息佇列的作用:跨服務通訊、服務之間解耦,削峰、非同步,其實還有一個作用是提高接收者效能

RabbitMQ 官方網站:https://www.rabbitmq.com/

RabbitMQ 中文文件網址:http://rabbitmq.mr-ping.com/

本文程式碼GitHub 地址是: https://github.com/hudean/MQDemo

一、初衷

為什麼要設計訊息匯流排(對訊息佇列進行二次封裝),而不是讓各業務系統直接使用RabbitMQ、Kafka、RocketMQ這樣的成熟的訊息佇列呢? 如果業務系統比較簡單,確實不需要考慮這樣的問題,直接拿最成熟的開源方案是最好的方式,但是在複雜的多系統下、多人分工合作的場景下,直接使用成熟的訊息佇列一般都會面臨以下問題
  • 開發難度大,各系統間分別隔離,需要關注訊息中介軟體的各種複雜繁瑣的配置,關注不同的訊息則需要對接不同的訊息佇列
  • 維護成本高,各系統或團隊需要分別管理訊息中介軟體、處理各種服務異常、(訊息中介軟體的高可用、業務的高可用等)
  • 管理難度大,沒法對訊息的生產和消費進行業務管理,也不方便對訊息中的敏感資料進行許可權管理
  • 擴充套件成本高,無法統一訊息系統擴充套件功能,如路由、延時、重試、消費確認等 總結訊息佇列是一個面向技術的接入,重點關注訊息佇列的配置、介面對接;而訊息匯流排則是通過遮蔽部署、分組和通訊等技術細節,實現一個面向業務的接入,重點關注要接收什麼訊息。

定義

事件匯流排是實現基於事件驅動模式的方式之一,事件傳送者將事件訊息傳送到一個事件總線上,事件訂閱者向事件匯流排訂閱和接收事件,而後再處理接收到的事件。固然,訂閱者不只能夠接收和消費事件,它們自己也能夠建立事件,並將它們傳送到事件總線上。

事件匯流排是對釋出-訂閱模式的一種實現。它是一種集中式事件處理機制,容許不一樣的元件之間進行彼此通訊而又不須要相互依賴,達到一種解耦的目的。

如前所述,使用基於事件的通訊時,當值得注意的事件發生時,微服務會發布事件,例如更新業務實體時。 其他微服務訂閱這些事件。 微服務收到事件時,可以更新其自己的業務實體,這可能會導致釋出更多事件。 這是最終一致性概念的本質。 通常通過使用事件匯流排實現來執行此釋出/訂閱系統。 事件匯流排可以設計為包含 API 的介面,該 API 是訂閱和取消訂閱事件和釋出事件所需的。 它還可以包含一個或多個基於跨程序或訊息通訊的實現,例如支援非同步通訊和釋出/訂閱模型的訊息佇列或服務匯流排。

可以使用事件來實現跨多個服務的業務事務,這可提供這些服務間的最終一致性。 最終一致事務由一系列分散式操作組成。 在每個操作中,微服務會更新業務實體,併發布可觸發下一個操作的事件。 下面的圖 6-18 顯示了通過事件匯流排釋出了 PriceUpdated 事件,因此價格更新傳播到購物籃和其他微服務。

圖 6-18。 基於事件匯流排的事件驅動的通訊

本部分介紹如何使用通用事件匯流排介面(如圖 6-18 所示)實現這種與 .NET 的通訊。 存在多種可能的實現,每種實現使用不同的技術或基礎結構,例如 RabbitMQ、Azure 服務匯流排或任何其他第三方開源或商用服務匯流排。

三、整合事件

整合事件用於跨多個微服務或外部系統保持域狀態同步。 此功能可通過在微服務外發布整合事件來實現。 將事件釋出到多個接收方微服務(訂閱到整合事件的儘可能多個微服務)時,每個接收方微服務中的相應事件處理程式會處理該事件。

整合事件基本上是資料保持類,如以下示例所示:

 View Code  View Code

 

 

事件匯流排

事件匯流排可實現釋出/訂閱式通訊,無需元件之間相互顯式識別,如圖 6-19 所示。

圖 6-19。 事件匯流排的釋出/訂閱基礎知識

上圖顯示了微服務 A 釋出到事件匯流排,這會分發到訂閱微服務 B 和 C,釋出伺服器無需知道訂閱伺服器。 事件匯流排與觀察者模式和釋出-訂閱模式相關。

觀察者模式

觀察者模式中,主物件(稱為可觀察物件)將相關資訊(事件)告知其他感興趣的物件(稱為觀察者)。

釋出-訂閱(釋出/訂閱)模式

釋出/訂閱模式的用途與觀察者模式相同:某些事件發生時,需要告知其他服務。 但觀察者模式與釋出/訂閱模式之間存在重要區別。 在觀察者模式中,直接從可觀察物件廣播到觀察者,因此它們“知道”彼此。 但在釋出/訂閱模式中,存在稱為中轉站、訊息中轉站或事件匯流排的第三個元件,釋出伺服器和訂閱伺服器都知道第三個元件。 因此,使用釋出/訂閱模式時,釋出伺服器和訂閱伺服器通過所述的事件匯流排或訊息中轉站精確分離。

中轉站或事件匯流排

如何實現釋出伺服器和訂閱伺服器之間的匿名? 一個簡單方法是讓中轉站處理所有通訊。 事件匯流排是一個這樣的中轉站。

事件匯流排通常由兩部分組成:

  • 抽象或介面。

  • 一個或多個實現。

在圖 6-19 中,從應用程式角度看,會發現事件匯流排實際上是一個釋出/訂閱通道。 實現此非同步通訊的方式可能會有差異。 它可以具有多個實現,以便你進行交換,具體取決於環境要求(例如,生產和開發環境)。

在圖 6-20 中,可看到事件匯流排的抽象,包含基於 RabbitMQ、Azure 服務匯流排或其他事件/訊息中轉站等基礎結構訊息技術的多個實現。

圖 6- 20。 事件匯流排的多個實現

最好通過介面定義事件匯流排,以便它可使用多種技術(例如 RabbitMQ、Azure 服務匯流排等)來實現。 但是,如前所述,僅當需要由你的抽象支援的基本事件匯流排功能時,才適合使用你自己的抽象(事件匯流排介面)。 如果需要更豐富的服務匯流排功能,應使用你喜歡的商用服務匯流排提供的 API 和抽象,而不是你自己的抽象。

定義事件匯流排介面

首先,讓我們瞭解一下事件匯流排介面的一些實現程式碼和可能的實現。 介面應是通用和簡單的,如下所示介面。

 View Code

藉助 RabbitMQ 的事件匯流排實現,微服務可訂閱事件、釋出事件和接收事件,如圖 6-21 所示。

圖 6-21。 事件匯流排的 RabbitMQ 實現

RabbitMQ 充當訊息釋出伺服器和訂閱者之間的中介,處理分發。 在程式碼中,EventBusRabbitMQ 類實現了泛型 IEventBus 介面。 此實現基於依賴項注入,以便可以從此開發/測試版本交換到生產版本。

 View Code

示例開發/測試事件匯流排的 RabbitMQ 實現是樣板程式碼。 它必須處理與 RabbitMQ 伺服器的連線,並提供用於將訊息事件釋出到佇列的程式碼。 它還必須為每個事件型別實現收集整合事件處理程式的字典;這些事件型別可以對每個接收器微服務具有不同的例項化和不同的訂閱,如圖 6-21 所示。

四、使用 RabbitMQ 實現一個簡單的釋出方法

下面的程式碼是 RabbitMQ 的事件匯流排實現的簡化版,用以展示整個方案。 你真的不必以這種方式處理連線。 要檢視完整的實現,在後面

 View Code

五、使用 RabbitMQ API 實現訂閱程式碼

與釋出程式碼一樣,下面的程式碼是 RabbitMQ 事件匯流排實現的簡化部分。

public class EventBusRabbitMQ : IEventBus, IDisposable
{
    // Member objects and other methods ...
    // ...

    public void Subscribe<T, TH>()
        where T : IntegrationEvent
        where TH : IIntegrationEventHandler<T>
    {
        var eventName = _subsManager.GetEventKey<T>();

        var containsKey = _subsManager.HasSubscriptionsForEvent(eventName);
        if (!containsKey)
        {
            if (!_persistentConnection.IsConnected)
            {
                _persistentConnection.TryConnect();
            }

            using (var channel = _persistentConnection.CreateModel())
            {
                channel.QueueBind(queue: _queueName,
                                    exchange: BROKER_NAME,
                                    routingKey: eventName);
            }
        }

        _subsManager.AddSubscription<T, TH>();
    }
}

每個事件型別都有一個相關的通道,以獲取 RabbitMQ 中的事件。 然後,可以根據需要在每個通道和事件型別中擁有儘可能多的事件處理程式。

訂閱方法接受一個 IIntegrationEventHandler 物件,該物件相當於當前微服務中的回撥方法,以及其相關的 IntegrationEvent 物件。 然後,程式碼將該事件處理程式新增到事件處理程式列表,每個客戶端微服務的每個整合事件型別都可具有事件處理程式。 如果客戶端程式碼尚未訂閱事件,該程式碼將為事件型別建立一個通道,以便在從任何其他服務中釋出事件時,它可以從 RabbitMQ 以推送方式接收事件。

六、使用 RabbitMQ 完整實現事件匯流排程式碼

結構圖如下:

 

 

動態整合事件處理器介面

 View Code

事件匯流排介面

 View Code

整合事件處理器介面

 View Code

整合事件

 View Code

GenericTypeExtensions

 View Code

事件匯流排訂閱管理器介面

 View Code

記憶體中事件匯流排訂閱管理器

 View Code

SubscriptionInfo

 View Code

EventBusRabbitMQ裡的程式碼

預設 RabbitMQ 持久連線

 View Code

使用RabbitMQ的事件匯流排

 View Code

RabbitMQ持續連線介面

 View Code

完整程式碼 地址 : https://github.com/hudean/EventBusTest.git

執行訂閱服務和釋出服務結果如圖