微服務實戰(二):落地微服務架構到直銷系統(構建訊息匯流排框架介面)
從上一篇文章大家可以看出,實現一個自己的訊息匯流排框架是非常重要的內容,訊息匯流排可以將界限上下文之間進行解耦,也可以為大併發訪問提供必要的支援。
訊息匯流排的作用:
1.界限上下文解耦:在DDD第一波文章中,當更新了訂單資訊後,我們通過呼叫經銷商界限上下文的領域模型和倉儲,進行了經銷商資訊的更新,這造成了耦合。通過一個訊息匯流排,可以在訂單界限上下文的WebApi服務(來源微服務-生產者)更新了訂單資訊後,釋出一個事件訊息到訊息匯流排的某個佇列中,經銷商界限上下文的WebApi服務(消費者)訂閱這個事件訊息,然後交給自己的Handler進行訊息處理,更新自己的經銷商資訊。這樣就實現了訂單界限上下文與經銷商界限上下文解耦。
2.大併發支援:可以通過訊息匯流排進一步提升下單的效能。我們可以將使用者下單的操作直接交給一個下單命令WebApi接收,下單命令WebApi接收到命令後,直接丟給一個訊息匯流排的佇列,然後立即給前端返回下單結果。這樣使用者就不用等待後續的複雜訂單業務邏輯,加快速度。後續訂單的一系列處理交給訊息的Handler進行後續的處理與訊息的進一步投遞。
訊息匯流排設計重點:
1.定義訊息(事件)的介面:所有需要投遞與處理的訊息,都從這個訊息介面繼承,因為需要約束訊息中必須包含的內容,比如訊息的ID、訊息產生的時間等。
public interface IEvent { Guid Id {get; set; } DateTime CreateDate { get; set; } }
2.定義訊息(事件)處理器介面:當訊息投遞到訊息匯流排佇列中後,一定有消費者WebApi接收並處理這個訊息,具體的處理方法邏輯在訂閱方處理器中實現,這裡先需要定義處理器的介面,便於在訊息匯流排框架中使用。
public interface IEventHandler { Task<bool> HandleAsync<TEvent>(TEvent @event) where TEvent : IEvent; }
從上面程式碼可以看出,訊息(事件)處理器處理的型別就是從IEvent介面繼承的訊息類。
3.定義訊息(事件)與訊息(事件)處理器關聯介面:一種型別的訊息被投遞後,一定要在訂閱方找到這種訊息的處理器進行處理,所以一定要定義二者的關聯介面,這樣才能將訊息與訊息處理器對應起來,才能實現訊息被訂閱後的處理。
public interface IEventHandlerExecutionContext { void RegisterEventHandler<TEvent, TEventHandler>() where TEvent : IEvent where TEventHandler : IEventHandler; bool IsRegisterEventHandler<TEvent, TEventHandler>() where TEvent : IEvent where TEventHandler : IEventHandler; Task HandleAsync<TEvent>(TEvent @event) where TEvent : IEvent; }
RegisterEventHandler方法就是建立訊息與訊息處理器的關聯,這個方法其實是在訂閱方使用,訂閱方告訴訊息匯流排,什麼樣的訊息應該交給我的哪個處理器進行處理。
IsRegisterEventHandler方法是判斷訊息與處理器之間是否已經存在關聯。
HandleAsync方法是通過查詢到訊息對應的處理器後,然後呼叫處理器自己的Handle方法進行訊息的處理.
4.定義訊息釋出、訂閱與訊息匯流排介面:訊息匯流排至少要支援兩個功能,一個是生產者能夠釋出訊息到我的訊息匯流排,另一個是訂閱方需要能夠從我這個訊息匯流排訂閱訊息。
public interface IEventPublisher { void Publish<TEvent>(TEvent @event) where TEvent : IEvent; }
從上面程式碼可以看出,生產者釋出的訊息仍然要從IEvent繼承的型別。
public interface IEventSubscriber { void Subscribe<TEvent, TEventHandler>() where TEvent : IEvent where TEventHandler : IEventHandler; }
上面程式碼是訂閱方用於從訊息匯流排訂閱訊息,從程式碼中可以看出,它的最終的實現其實就是建立訊息與處理器之間的關聯。
public interface IEventBus:IEventPublisher,IEventSubscriber { }
訊息(事件)匯流排從兩個介面繼承下來,同時支援訊息的釋出與訊息的訂閱。
5.實現事件基類:上面已經訂閱了訊息(事件)的介面,這裡來實現事件的基類,其實就是實現訊息ID與產生的時間:
public class BaseEvent : IEvent { public Guid Id { get; set; } public DateTime CreateDate { get; set; } public BaseEvent() { this.Id = Guid.NewGuid(); this.CreateDate = DateTime.Now; } }
6.實現訊息匯流排基類:訊息匯流排底層的依賴可以是各種訊息代理產品,比如RabbitMq、Kafaka或第三方雲平臺提供的訊息代理產品,通常我們要封裝這些訊息代理產品。在封裝之前,我們需要定義頂層的訊息匯流排基類實現,主要的目的是未來依賴於它的具體實現可替換,另外也將訊息與訊息處理器的關聯介面傳遞進來,便於訂閱方使用。
public abstract class BaseEventBus : IEventBus { protected readonly IEventHandlerExecutionContext eventHandlerExecutionContext; protected BaseEventBus(IEventHandlerExecutionContext eventHandlerExecutionContext) { this.eventHandlerExecutionContext = eventHandlerExecutionContext; } public abstract void Publish<TEvent>(TEvent @event) where TEvent : IEvent; public abstract void Subscribe<TEvent, TEventHandler>() where TEvent : IEvent where TEventHandler : IEventHandler; }
7.實現訊息與處理器關聯:訊息必須與處理器關聯,訂閱方收到特定型別的訊息後,才知道交給哪個處理器處理。
public class EventHandlerExecutionContext : IEventHandlerExecutionContext { private readonly IServiceCollection registry; private readonly IServiceProvider serviceprovider; private Dictionary<Type, List<Type>> registrations = new Dictionary<Type, List<Type>>(); public EventHandlerExecutionContext(IServiceCollection registry,Func<IServiceCollection, IServiceProvider> serviceProviderFactory = null) { this.registry = registry; this.serviceprovider = this.registry.BuildServiceProvider(); } //查詢訊息關聯的處理器,然後呼叫處理器的處理方法 public async Task HandleAsync<TEvent>(TEvent @event) where TEvent : IEvent { var eventtype = @event.GetType(); if(registrations.TryGetValue(eventtype,out List<Type> handlertypes) && handlertypes.Count > 0) { using(var childscope = this.serviceprovider.CreateScope()) { foreach(var handlertype in handlertypes) { var handler = Activator.CreateInstance(handlertype) as IEventHandler; await handler.HandleAsync(@event); } } } } //判斷訊息與處理器之間是否有關聯 public bool IsRegisterEventHandler<TEvent, TEventHandler>() where TEvent : IEvent where TEventHandler : IEventHandler { if(registrations.TryGetValue(typeof(TEvent),out List<Type> handlertypelist)) { return handlertypelist != null && handlertypelist.Contains(typeof(IEventHandler)); } return false; } //將訊息與處理器關聯起來,可以在記憶體中建立關聯,也可以建立在資料庫單獨表中 public void RegisterEventHandler<TEvent, TEventHandler>() where TEvent : IEvent where TEventHandler : IEventHandler { Utils.DictionaryRegister(typeof(TEvent), typeof(TEventHandler), registrations); } }
上面我們基本上就將訊息匯流排的架子搭建起來了,也實現了基本的功能,下一章我們基於它來實現RabbitMq的訊息匯流排。
QQ討論群:309287205
微服務實戰視訊請關注微信公眾號:
相關推薦
微服務實戰(二):落地微服務架構到直銷系統(構建訊息匯流排框架介面)
從上一篇文章大家可以看出,實現一個自己的訊息匯流排框架是非常重要的內容,訊息匯流排可以將界限上下文之間進行解耦,也可以為大併發訪問提供必要的支援。 訊息匯流排的作用: 1.界限上下文解耦:在DDD第一波文章中,當更新了訂單資訊後,我們通過呼叫經銷商界限上下文的領域模型和倉儲,進行了經銷商資訊的更新,這造成了
微服務實戰(九):落地微服務架構到直銷系統(回顧總結)
這個系列我們大概寫了八篇文章,將微服務的最重要的內容過了一遍。當然其中有些內容還沒有涉及到,比如Docker(不是微服務架構風格中必須的)等,關於Docker我們自己可以在網上找找其他文章。 這篇文章就來回顧下微服務架構風格是如何落地的,如果你對以下回顧的內容都很清楚並已經有一些實踐的經驗,那麼恭喜你,你已
微服務實戰(七):落地微服務架構到直銷系統(實現命令與命令處理器)
我們先來看看CQRS架構,你對下圖的架構還有印象嗎?每個元件的功能都還清楚嗎?如果有疑問,請查考文章《微服務實戰(五):落地微服務架構到直銷系統(構建高效能大併發系統)》。 前一篇文章已經實現了Event Store的基礎功能部分,本篇文章我們通過C端的標準方式,實現一個下單的高併發命令端,來看看需要實現
微服務實戰(八):落地微服務架構到直銷系統(服務高可用性)
在微服務架構風格的系統中,如果單個微服務垮掉或地址不可訪問,雖然對系統的影響是有限的,但我們也必須採取一定的手段來保證每個微服務儘量可用;並且在大併發的情況下,雖然可以通過EDA訊息佇列處理的方式提高吞吐量,但仍然需要WebApi能夠更加高效的偵聽使用者請求,處理訊息,即使在某個服務短暫不可用的情況下。本篇文
微服務實戰(一):落地微服務架構到直銷系統(什麼是微服務)
網上有很多關於微服務的文章,從不同的維度對微服務進行了相關的講述;有些高屋建瓴,有些涉及細節,有些側重理論,有些側重程式碼,都是非常不錯的瞭解微服務的文章。 我們這個系列的文章的維度主要是實戰落地,也就是我們在平常工作以及產品開發過程中,考慮為什麼選擇微服務架構風格,以及如何將微服務的架構風格落地到我們實際的
微服務實戰(六):落地微服務架構到直銷系統(事件儲存)
在CQRS架構中,一個比較重要的內容就是當命令處理器從命令佇列中接收到相關的命令資料後,通過呼叫領域物件邏輯,然後將當前事件的物件資料持久化到事件儲存中。主要的用途是能夠快速持久化物件此次的狀態,另外也可以通過未來最終一致性的需求,通過事件資料將物件還原到一個特定的狀態,這個狀態通常是通過物件事件的版本來進行
微服務實戰(三):落地微服務架構到直銷系統(構建基於RabbitMq的訊息匯流排)
從前面文章可以看出,訊息匯流排是EDA(事件驅動架構)與微服務架構的核心部件,沒有訊息匯流排,就無法很好的實現微服務之間的解耦與通訊。通常我們可以利用現有成熟的訊息代理產品或雲平臺提供的訊息服務來構建自己的訊息匯流排;也可以自己完全寫一個訊息代理產品,然後基於它構建自己的訊息匯流排。通常我們不用重複造輪子(除
微服務實戰(五):落地微服務架構到直銷系統(構建高效能大併發系統)
在現代系統中,特別是網際網路軟體,通常會涉及到大量使用者的併發訪問,我們的系統一定要在架構上支援高效能、大併發的訪問。一個高效能的系統通常由很多的方面組成,包括資料庫高效能、Web伺服器高效能、負載均衡、快取、軟體架構等。我們這篇文章先從軟體開發架構的角度作為切入點來介紹如何構建高效能的系統。 傳統架構效能
微服務實戰(四):落地微服務架構到直銷系統(將生產者與消費者接入訊息匯流排)
前一篇文章我們已經完成了基於RabbitMq實現的的訊息匯流排,這篇文章就來看看生產者(訂單微服務)與消費者(經銷商微服務)如何接入訊息匯流排實現訊息的傳送與訊息的接收處理。 定義需要傳送的訊息: 下單訊息要被髮送到訊息匯流排,並被經銷商微服務的處理器處理。經銷商微服務處理時,需要知道要對哪個經銷商處理多少
Spring Cloud構建微服務架構(七)訊息匯流排
先回顧一下,在之前的Spring Cloud Config的介紹中,我們還留了一個懸念:如何實現對配置資訊的實時更新。雖然,我們已經能夠通過/refresh介面和Git倉庫的Web Hook來實現Git倉庫中的內容修改觸發應用程式的屬性更新。但是,若所有觸發操作均需要我
Spring Cloud構建微服務架構(三)訊息匯流排
注:此文不適合0基礎學習者直接閱讀,請先完整的將作者關於微服務的博文全部閱讀一遍,如果還有疑問,可以再來閱讀此文,地址:http://blog.csdn.net/sosfnima/article/details/53178157,推薦讀者去找作者的書籍《Spring C
《Spring Cloud微服務實戰》讀書筆記之服務治理:Spring Cloud Eureka
摘要 服務治理是微服務架構最為核心和基礎的模組,用於實現各個微服務例項的自動化註冊與發現。Spring Cloud Eureka 是對Netflix Eureka的二次封裝,負責服務的治理。 關鍵詞:服務治理 一、服務治理介紹 服務治理是微服務架構最為核心和基礎
Spring cloud微服務實戰(二)——Zuul整合Swagger2及許可權校驗
一、前言 ##二、 整合Swagger2 Swagger2大家肯定都用過,為什麼我在這裡還要提到呢,因為各個微服務都會提供自己的API文件,我們總不能一個地址一個地址去查吧,能不能有個統一的入口呢。這就是這節要講的。 2.1 Zuul整合Swagger2 其他
圖靈學院:【微服務架構】SpringCloud之Eureka(服務註冊和服務發現基礎篇)(二)
一:Eureka簡介 Eureka是Spring Cloud Netflix的一個子模組,也是核心模組之一。用於雲端服務發現,一個基於REST的服務,用於定位服務,以實現雲端中間層服務發現和故障轉移。服務註冊與發現對於微服務系統來說非常重要。有了服務發現與註冊,你就不需要
多研究些架構,少談些框架(1):論微服務架構的核心概念
定位 dubbo spring 提供服務 電信 cor res gate 虛擬 微服務架構和SOA區別 微服務現在辣麽火,業界流行的對比的卻都是所謂的Monolithic單體應用,而大量的系統在十幾年前都是已經是分布式系統了,那麽微服務作為新的理念和原來的分布式系統,或者說
圖靈學院:【微服務架構】SpringCloud之Ribbon(四)
SpringCloud Ribbon 一:Ribbon是什麼? Ribbon是Netflix釋出的開源專案,主要功能是提供客戶端的軟體負載均衡演算法,將Netflix的中間層服務連線在一起。Ribbon客戶端元件提供一系列完善的配置項如連線超時,重試等。簡單的說,就是
《基於SpringCloud微服務架構廣告系統設計與實現》筆記
img span 設計與實現 微服務 課程 png 1-1 分享圖片 bubuko 1-1 課程導學 什麽是廣告系統? 2-1 廣告系統概覽 2-2 廣告系統架構 2-3 準備工作與系統目錄結構 《基於SpringCl
微服務實戰(六):選擇微服務部署策略
因此 區別 嚴重 http 虛擬化 one rose 精確 命名空間 微服務實戰(一):微服務架構的優勢與不足 微服務實戰(二):使用API Gateway 微服務實戰(三):深入微服務架構的進程間通信 微服務實戰(四):服務發現的可行方案以及實踐案例 微服務實踐(五)
微服務實戰(六):選擇微服務部署策略 - DockOne.io
利用 -- 的區別 box imp 通信 標準 應用打包 email 原文:微服務實戰(六):選擇微服務部署策略 - DockOne.io 【編者的話】這篇博客是用微服務建應用的第六篇,第一篇介紹了微服務架構
微服務實戰(四):服務發現的可行方案以及實踐案例
mesos aws ec2 動態配置 load 顯示 一個 cer c118 分布 微服務實戰(四):服務發現的可行方案以及實踐案例 這是關於使用微服務架構創建應用系列的第四篇文章。第一篇介紹了微服務架構的模式,討論了使用微服務架構的優缺點。第二和第三篇描