你一定看得懂的 DDD+CQRS+EDA+ES 核心思想與極簡可執行程式碼示例
前言
隨著分散式架構微服務的興起,DDD(領域驅動設計)、CQRS(命令查詢職責分離)、EDA(事件驅動架構)、ES(事件溯源)等概念也一併成為時下的火熱概念,我也在早些時候閱讀了一些大佬的分析文,學習相關概念,不過一直有種霧裡看花、似懂非懂的感覺。經過一段時間的學習和研究大佬的程式碼後,自己設計實現了一套我消化理解後的程式碼。為了突出重點,避免受到大量實現細節的干擾,當然也是懶(這才是主要原因),其中的所有基礎設施都使用了現成的庫。所實現的研究成果也做成了傻瓜式一鍵體驗(我對對著黑框框敲命令沒什麼興趣,能點兩下滑鼠搞定的事我絕不在鍵盤上敲又臭又長的命令,敲命令能敲出優越感的人我覺得應該是抖M)。
正文
DDD(領域驅動設計)
這一定是最群魔亂舞的一個概念,每個大佬都能講出一大篇演講稿,但都或多或少存在差異或分歧,在我初看 DDD 時,我就被整懵了,這到底是咋回事?
現在回過頭來看,DDD 其實是一個高階思想概念,並不能指導開發者如何敲鍵盤,是指導人如何思考領域問題,而不是指導人思考出具體的領域的。正是因為中間隔了一層虛幻飄渺的概念,導致不同的人得出了不同的結論。還好 DDD 存在一些比較具體容易落實的概念,現在就來講下我對這些常見基礎概念的理解和我編碼時的基本原則,希望大家能在看大佬的文章時不用一臉懵逼,也進行下心得交流。
Entity(實體)
實體是一個儲存資料的類,如果類中包含自身的合法性驗證規則之類的方法,一般稱之為充血模型,相對的單純儲存資料的則稱為貧血模型(有時也叫做 POCO 類)。實體有一個重要性質,相等性是由標識屬性決定的,這個標識可以是一個簡單的 int 型的 Id,也可以是多個內部資料的某種組合(類似資料庫表的複合字段主鍵)。除標識外的其他東西均不對兩個實體物件的相等性產生影響。並且實體的資料屬性是可更改的。
有很多大佬認為實體應該是充血的,但在我看來,貧血的似乎更好,因為需求的不穩定性可能導致這些規則並不穩定,或規則本身並不唯一,在不同場合可能需要不同規則。這時候充血模型無論怎麼辦都很彆扭,如果把規則定義和校驗交給外部元件,這些需求就很容易滿足,比如使用 FluentValidate 為一種實體定義多套規則或對內部的規則條目按情況重新組合。
ValueObject(值物件)
值物件也是用來儲存資料的類。與實體相對,值物件沒有標識屬性,其相等性由所有內部屬性決定,當且僅當兩個值物件例項的所有屬性一一相等時,這兩個值物件相等。並且值物件的所有屬性為只讀,僅能在建構函式中進行唯一一次設定,如果希望修改某個值物件的某一屬性,唯一的辦法是使用新的值物件替換舊的值物件。並且值物件經常作為實體的屬性存在。
這個概念看起來和實體特別相似,都是用來儲存資料的,但也有些性質上的根本不同。網上的大佬通常會為值物件編寫基類,但我認為,值物件和實體在程式碼實現上並沒有這麼大的區別。可以看作整數和小數在計算機中表現為不同的資料型別,但在數學概念上他們沒有區別,僅僅只是因為離散的計算機系統無法完美表示連續的數學數字而產生的縫合怪。我傾向於根據類的程式碼定義所表現出來的性質與誰相符就將其視為誰,而不是看實現的介面或繼承的基類。因為需求的不確定性會導致他們可能會發生轉換,根據程式碼進行自我描述來判斷可以避免很多潛在的麻煩。
Aggregate,Aggregate Root(聚合及聚合根)
聚合根表示一個領域所操作的頂級實體型別,其他附屬資料都是聚合根的內部屬性,聚合根和其所屬的其他實體的組合稱為聚合。這是一個純概念性的東西。對領域實體的操作必須從聚合根開始,也就是說確保資料完整性的基本單位是聚合。大佬的程式碼中經常會用一個空介面來表示聚合根,如果某個實體實現了這個介面,就表示這個實體可以是一個聚合根。請注意,聚合根不一定必須是頂級型別,也可以是其他實體的一個屬性。這表示一個實體在,某些情況下是聚合根,而其他情況下是另一個聚合根的內部屬性。也就是說實體之間並非嚴格的樹狀關係,而是一般有向圖狀關係。
我認為定義這樣的空介面實際意不大,反而可能造成一些誤會。如果某個實體由於需求變動導致不再會成為聚合根,那這個實體事實上將不再是聚合根,但人是會犯錯的,很可能忘記去掉聚合根介面,這時程式碼與事實將產生矛盾。所以我認為聚合根應該基於事實而不是程式碼。當一個實體不再會作為聚合根使用時,將相關程式碼刪除,就同時表示它不再是聚合根,閱讀程式碼的人也因為看不到相關程式碼而自動認為它不是聚合根。在程式碼中的體現方式與下一個的概念有關。
Repository(倉儲)
倉儲表示對聚合根的持久化的抽象,在程式碼上可表現為聲明瞭增刪查改的相關方法的介面,而倉儲的實現類負責具體解決如何對聚合根實體進行增刪查改。例如在倉儲內部使用資料庫完成具體工作。
如果一個倉儲負責管理一個聚合根實體的持久化或者說存取,那這個實體就是一個事實上的聚合根。那麼在這裡,就可以在程式碼操作上將看到某個實體被倉儲管理等價為這個實體是聚合根,反之就不是。也就是說,如果將某個實體的倉儲的最後一個實際使用程式碼刪除,這個實體就在事實上不再是聚合根,此時程式碼表現與事實將完美同步,不再會產生矛盾。至於由於沒看到某個實體的倉儲而將實體誤認為不是聚合根,這其實並沒有任何問題。這說明在你所關注的領域中這個實體確實不是聚合根,而這個實體可能作為聚合根使用的領域你根本不關心,所以看不到,那這個實體是否在其他領域作為聚合根使用對你而言其實是無所謂的。
Domain Service(領域服務)
這就涉及到業務程式碼的編寫了。如果一個業務需要由多個聚合根配合完成,也就是需要多個倉儲,那麼就應該將這些對倉儲的呼叫封裝進一個服務,統一對外暴露提供服務。
如果這些倉儲操作需要具有事務性,也可以在這裡進行協調管理。如果某個業務只需要一個倉儲參與,要不要專門封裝一個服務就看你高興了。
CQRS(命令查詢職責分離)
CQRS 本質上是一種指導思想,指導開發者如何設計一個低耦合高可擴充套件架構的思想。傳統的 CURD 將對資料的操作分為 讀、寫、改、刪,將他們封裝在一起導致他們將緊密耦合在相同的資料來源中,不利於擴充套件。CQRS 則將對資料的操作分為會改變資料來源的和不會改變資料來源的,前者稱為命令,後者稱為查詢。將他們分別封裝能讓他們各自使用不同的資料來源,提高可擴充套件性。
其中命令是一個會改變資料來源,但不返回任何值的方法;查詢是會返回值,但絕不會改變資料來源的方法。但是在我的編碼中,命令是可以返回值的,至於要返回什麼,根據實際情況調整。比如最簡單的返回一個 bool 表示操作是否成功以決定接下來的業務流程該走向何方,這是很常見的情況。所以在我的概念裡,一個方法是命令還是查詢實際上只看這個方法是否會改變資料來源,要封裝在一起還是分別封裝都無所謂。建議分開封裝到不同的倉儲中,通過倉儲關聯到具體的資料來源,命令和查詢的倉儲關聯到不同的資料來源的時候,自然就完成了讀寫分離。通過起名來明示方法的目的應該可以輕鬆分辨一個方法屬於命令還是查詢。只要腦子裡有這個概念,要實現擴充套件辦法多的是。
事件驅動架構(EDA)
可以說所有圖形介面(Gui)程式設計都是清一色的事件驅動架構,這東西一點也不稀奇。說白了,EDA 就是一種被動架構,通過某些事情的發生來觸發某些操作的執行,否則系統就隨時待命,按兵不動。
EDA 的實現需要一箇中介才能實現,在 Windows 中,這個東西叫做 Windows 訊息佇列(訊息迴圈)和事件處理器。同樣的,在非 Gui 程式設計中也需要這倆東西,但通常被稱為訊息匯流排和訊息消費者。在分散式系統中,這個中介將不與系統在同一程序甚至不在同一裝置中,稱為分散式訊息匯流排。這樣在開發時可以分成兩撥,一撥負責寫生產併發送事件的程式碼,一撥負責寫接收事件資訊並進行處理的程式碼。他們之間的溝通僅限於交流關心的事件叫什麼以及事件攜帶了什麼資訊。至於產生的訊息是如何送到正確的消費端並觸發消費處理器的,那是訊息匯流排的事。如果一個訊息匯流排需要這兩撥人瞭解中間的過程甚至需要自己去實現,那這個訊息匯流排是個廢品,也起不到什麼解耦的效果,甚至是個拖後腿的東西。
EDA + CQRS
當他們結合在一起,就產生了命令或查詢的發起和實際處理實現可以分離的效果。命令的發起方向命令匯流排傳送一條命令訊息並帶上必要引數,消費方收到訊息後獲取引數完成任務並返回結果。命令可以看作一種特殊的事件,命令只由一個命令處理器處理,並可向傳送方返回一個處理結果;事件由所有對同種事件感興趣的事件處理器處理,不向事件傳送方返回任何結果。
事件處理器的執行順序是不確定的,所以任何事件處理器都必須獨立完成事件處理。如果兩個事件處理之間存在因果依賴,應該在前置事件處理後由事件處理器釋出新事件,並由後置事件處理器去處理前置事件產生的新事件,而不是讓它們處理同一事件。
ES(事件溯源)
事件溯源表示能追查一個事件的源頭,甚至與之相關的其他事件的概念,說句大白話就是刨祖墳。ES 對歷史狀態回溯的需求有著天然的支援,最常見的如撤銷重做。而 ES 一般會配合 EDA 使用,ES 儲存 EDA 產生的事件資訊,並且這些資訊有隻讀性和因果連貫性。這順便能讓我們對系統中的實體究竟是如何一步一步變成現在這個樣子有一個清晰的瞭解。畢竟實體具有可變性,實體資訊一旦改變,舊的資訊就會丟失,ES 剛好彌補了這個缺陷。
程式碼展示說明
此處的事件訊息中介使用 MediatR 實現。
介面
DDD 相關
實體
定義一個實體的基本要素,實現介面的類就是實體,值物件沒有介面或基類,只看程式碼所展現的性質是否符合值物件的定義,聚合根沒有介面或基類,只看實體是否被倉儲使用,領域服務說白了就是個打包封裝,根據情況來決定,例如重構時提取方法即可視為封裝服務。在此處可簡單認為沒有實現實體介面的資料類是值物件:
1 /// <summary> 2 /// 實體介面 3 /// </summary> 4 public interface IEntity {} 5 6 /// <summary> 7 /// 泛型實體介面,約束Id屬性 8 /// </summary> 9 public interface IEntity<TKey> : IEntity 10 where TKey : IEquatable<TKey> 11 { 12 TKey Id { get; set; } 13 }
倉儲介面
倉儲介面細分為可讀倉儲和可寫倉儲,可寫倉儲有一個分支為可批量提交倉儲,表示修改操作會在呼叫提交儲存方法後批量儲存,也就是事務(就是用來替代操作單元的,這東西就有一個提交操作,名字也莫名其妙,我曾經一直無法理解這東西是幹嘛的),介面宣告參考 EF Core,示例實現也基於 EF Core。由於已經公開了查詢介面型別的 Set 屬性,使用者可以任意自定義查詢。
1 public interface IBulkOperableVariableRepository<TResult, TVariableRepository, TEntity> 2 where TEntity : IEntity 3 where TVariableRepository : IVariableRepository<TEntity> 4 { 5 TResult SaveChanges(); 6 Task<TResult> SaveChangesAsync(CancellationToken cancellationToken); 7 } 8 9 public interface IBulkOperableVariableRepository<TVariableRepository, TEntity> 10 where TEntity : IEntity 11 where TVariableRepository : IVariableRepository<TEntity> 12 { 13 void SaveChanges(); 14 Task SaveChangesAsync(CancellationToken cancellationToken); 15 } 16 17 public interface IReadOnlyRepository<TEntity> 18 where TEntity : IEntity 19 { 20 IQueryable<TEntity> Set { get; } 21 TEntity Find(TEntity entity, bool ignoreNullValue); 22 Task<TEntity> FindAsync(TEntity entity, bool ignoreNullValue); 23 24 } 25 public interface IReadOnlyRepository<TEntity, TKey> : IReadOnlyRepository<TEntity> 26 where TEntity : IEntity<TKey> 27 where TKey : IEquatable<TKey> 28 { 29 TEntity Find(TKey key); 30 Task<TEntity> FindAsync(TKey key); 31 IQueryable<TEntity> Find(IEnumerable<TKey> keys); 32 } 33 34 public interface IVariableRepository<TEntity> 35 where TEntity : IEntity 36 { 37 void Add(TEntity entity); 38 Task AddAsync(TEntity entity, CancellationToken cancellationToken); 39 void Update(TEntity entity); 40 Task UpdateAsync(TEntity entity, CancellationToken cancellationToken); 41 void Delete(TEntity entity, bool isSoftDelete); 42 Task DeleteAsync(TEntity entity, bool isSoftDelete, CancellationToken cancellationToken); 43 void AddRange(IEnumerable<TEntity> entities); 44 Task AddRangeAsync(IEnumerable<TEntity> entities, CancellationToken cancellationToken); 45 void UpdateRange(IEnumerable<TEntity> entities); 46 Task UpdateRangeAsync(IEnumerable<TEntity> entities, CancellationToken cancellationToken); 47 void DeleteRange(IEnumerable<TEntity> entities, bool isSoftDelete); 48 Task DeleteRangeAsync(IEnumerable<TEntity> entities, bool isSoftDelete, CancellationToken cancellationToken); 49 } 50 public interface IVariableRepository<TEntity, TKey> : IVariableRepository<TEntity> 51 where TEntity : IEntity<TKey> 52 where TKey : IEquatable<TKey> 53 { 54 void Delete(TKey key, bool isSoftDelete); 55 Task DeleteAsync(TKey key, bool isSoftDelete, CancellationToken cancellationToken); 56 void DeleteRange(IEnumerable<TKey> keys, bool isSoftDelete); 57 Task DeleteRangeAsync(IEnumerable<TKey> keys, bool isSoftDelete, CancellationToken cancellationToken); 58 } 59 60 public interface IRepository<TEntity> : IVariableRepository<TEntity>, IReadOnlyRepository<TEntity> 61 where TEntity : IEntity 62 { 63 } 64 65 public interface IRepository<TEntity, TKey> : IRepository<TEntity>, IVariableRepository<TEntity, TKey>, IReadOnlyRepository<TEntity, TKey> 66 where TEntity : IEntity<TKey> 67 where TKey : IEquatable<TKey> 68 { 69 }
EF Core 專用特化版倉儲介面
1 public interface IEFCoreRepository<TEntity, TDbContext> : IReadOnlyRepository<TEntity>, IVariableRepository<TEntity>, IBulkOperableVariableRepository<int, IEFCoreRepository<TEntity, TDbContext>, TEntity> 2 where TEntity : class, IEntity 3 where TDbContext : DbContext 4 { } 5 6 public interface IEFCoreRepository<TEntity, TKey, TDbContext> : IEFCoreRepository<TEntity, TDbContext>, IReadOnlyRepository<TEntity, TKey>, IVariableRepository<TEntity, TKey> 7 where TEntity : class, IEntity<TKey> 8 where TKey : IEquatable<TKey> 9 where TDbContext : DbContext 10 { }
CQRS+EDA 相關:
命令介面
分為帶返回值命令和無返回值命令
1 public interface ICommand<out TResult> : ICommand 2 { 3 } 4 5 public interface ICommand : IMessage 6 { 7 }
命令匯流排介面
同樣分為帶返回值和無返回值
1 public interface ICommandBus<in TCommand> 2 where TCommand : ICommand 3 { 4 Task SendCommandAsync(TCommand command, CancellationToken cancellationToken); 5 } 6 7 public interface ICommandBus<in TCommand, TResult> : ICommandBus<TCommand> 8 where TCommand : ICommand<TResult> 9 { 10 new Task<TResult> SendCommandAsync(TCommand command, CancellationToken cancellationToken); 11 }
命令處理器介面
同上
1 public interface ICommandHandler<in TCommand> 2 where TCommand : ICommand 3 { 4 Task Handle(TCommand command, CancellationToken cancellationToken); 5 } 6 7 public interface ICommandHandler<in TCommand, TResult> : ICommandHandler<TCommand> 8 where TCommand : ICommand<TResult> 9 { 10 new Task<TResult> Handle(TCommand command, CancellationToken cancellationToken); 11 }
命令儲存介面
可用於歷史命令追溯,返回值可用於返回儲存是否成功或其他必要資訊
1 public interface ICommandStore 2 { 3 void Save(ICommand command); 4 5 Task SaveAsync(ICommand command, CancellationToken cancellationToken); 6 } 7 8 public interface ICommandStore<TResult> : ICommandStore 9 { 10 new TResult Save(ICommand command); 11 12 new Task<TResult> SaveAsync(ICommand command, CancellationToken cancellationToken); 13 }
事件介面
沒有返回值
1 public interface IEvent : IMessage 2 { 3 }
事件匯流排介面
同上
1 public interface IEventBus 2 { 3 void PublishEvent(IEvent @event); 4 5 Task PublishEventAsync(IEvent @event, CancellationToken cancellationToken); 6 } 7 8 public interface IEventBus<TResult> : IEventBus 9 { 10 new TResult PublishEvent(IEvent @event); 11 12 new Task<TResult> PublishEventAsync(IEvent @event, CancellationToken cancellationToken); 13 }
事件處理器介面
同上
1 public interface IEventHandler<in TEvent> 2 where TEvent : IEvent 3 { 4 Task Handle(TEvent @event, CancellationToken cancellationToken); 5 }
事件儲存介面
同命令儲存介面
1 public interface IEventStore 2 { 3 void Save(IEvent @event); 4 5 Task SaveAsync(IEvent @event, CancellationToken cancellationToken = default); 6 } 7 8 public interface IEventStore<TResult> : IEventStore 9 { 10 new TResult Save(IEvent @event); 11 12 new Task<TResult> SaveAsync(IEvent @event, CancellationToken cancellationToken = default); 13 }
(命令、事件)訊息基礎介面
1 public interface IMessage 2 { 3 Guid Id { get; } 4 5 DateTimeOffset Timestamp { get; } 6 }
相關介面定義完畢。
實現
EF Core 泛型倉儲
未知主鍵的實體使用實體物件為條件查詢時,使用動態生成表示式的方法
1 public class EFCoreRepository<TEntity, TKey, TDbContext> : EFCoreRepository<TEntity, TDbContext>, IEFCoreRepository<TEntity, TKey, TDbContext> 2 where TEntity : class, IEntity<TKey> 3 where TKey : IEquatable<TKey> 4 where TDbContext : DbContext 5 { 6 public EFCoreRepository(TDbContext dbContext) : base(dbContext) 7 { 8 } 9 10 public virtual void Delete(TKey key, bool isSoftDelete) 11 { 12 var entity = Find(key); 13 Delete(entity, isSoftDelete); 14 } 15 16 public virtual Task DeleteAsync(TKey key, bool isSoftDelete, CancellationToken cancellationToken = default) 17 { 18 Delete(key, isSoftDelete); 19 return Task.CompletedTask; 20 } 21 22 public virtual void DeleteRange(IEnumerable<TKey> keys, bool isSoftDelete) 23 { 24 var entities = Find(keys).ToArray(); 25 dbSet.AttachRange(entities); 26 DeleteRange(entities, isSoftDelete); 27 } 28 29 public virtual Task DeleteRangeAsync(IEnumerable<TKey> keys, bool isSoftDelete, CancellationToken cancellationToken = default) 30 { 31 DeleteRange(keys, isSoftDelete); 32 return Task.CompletedTask; 33 } 34 35 public virtual TEntity Find(TKey key) 36 { 37 return Set.SingleOrDefault(x => x.Id.Equals(key)); 38 } 39 40 public virtual IQueryable<TEntity> Find(IEnumerable<TKey> keys) 41 { 42 return Set.Where(x => keys.Contains(x.Id)); 43 } 44 45 public override TEntity Find(TEntity entity, bool ignoreNullValue) 46 { 47 return base.Find(entity, ignoreNullValue); 48 } 49 50 public virtual Task<TEntity> FindAsync(TKey key) 51 { 52 return Set.SingleOrDefaultAsync(x => x.Id.Equals(key)); 53 } 54 55 public override Task<TEntity> FindAsync(TEntity entity, bool ignoreNullValue) 56 { 57 return base.FindAsync(entity, ignoreNullValue); 58 } 59 } 60 61 public class EFCoreRepository<TEntity, TDbContext> : IEFCoreRepository<TEntity, TDbContext> 62 where TEntity : class, IEntity 63 where TDbContext : DbContext 64 { 65 protected readonly TDbContext dbContext; 66 protected readonly DbSet<TEntity> dbSet; 67 68 protected virtual void ProcessChangedEntity() 69 { 70 var changedEntities = dbContext.ChangeTracker.Entries() 71 .Where(x => x.State == EntityState.Added || x.State == EntityState.Modified); 72 foreach (var entity in changedEntities) 73 { 74 (entity as IOptimisticConcurrencySupported)?.GenerateNewConcurrencyStamp(); 75 } 76 77 var changedEntitiesGroups = changedEntities.GroupBy(x => x.State); 78 foreach (var group in changedEntitiesGroups) 79 { 80 switch (group) 81 { 82 case var entities when entities.Key == EntityState.Added: 83 foreach (var entity in entities) 84 { 85 if (entity is IActiveControllable) 86 { 87 (entity as IActiveControllable).Active ??= true; 88 } 89 } 90 break; 91 case var entities when entities.Key == EntityState.Modified: 92 foreach (var entity in entities) 93 { 94 (entity as IEntity)?.ProcessCreationInfoWhenModified(dbContext); 95 96 if (entity is IActiveControllable && (entity as IActiveControllable).Active == null) 97 { 98 entity.Property(nameof(IActiveControllable.Active)).IsModified = false; 99 } 100 } 101 break; 102 default: 103 break; 104 } 105 } 106 } 107 108 protected virtual void ResetDeletedMark(params TEntity[] entities) 109 { 110 foreach (var entity in entities) 111 { 112 if (entity is ILogicallyDeletable) 113 { 114 (entity as ILogicallyDeletable).IsDeleted = false; 115 } 116 } 117 } 118 119 public EFCoreRepository(TDbContext dbContext) 120 { 121 this.dbContext = dbContext; 122 dbSet = this.dbContext.Set<TEntity>(); 123 } 124 125 public virtual void Add(TEntity entity) 126 { 127 dbSet.Add(entity); 128 } 129 130 public virtual Task AddAsync(TEntity entity, CancellationToken cancellationToken = default) 131 { 132 return dbSet.AddAsync(entity, cancellationToken).AsTask(); 133 } 134 135 public virtual void AddRange(IEnumerable<TEntity> entities) 136 { 137 dbSet.AddRange(entities); 138 } 139 140 public virtual Task AddRangeAsync(IEnumerable<TEntity> entities, CancellationToken cancellationToken = default) 141 { 142 return dbSet.AddRangeAsync(entities, cancellationToken); 143 } 144 145 public virtual void Delete(TEntity entity, bool isSoftDelete) 146 { 147 dbSet.Attach(entity); 148 if (isSoftDelete) 149 { 150 if (entity is ILogicallyDeletable) 151 { 152 (entity as ILogicallyDeletable).IsDeleted = true; 153 } 154 else 155 { 156 throw new InvalidOperationException($"要求軟刪除的實體不實現{nameof(ILogicallyDeletable)}介面。"); 157 } 158 } 159 else 160 { 161 dbSet.Remove(entity); 162 } 163 } 164 165 public virtual Task DeleteAsync(TEntity entity, bool isSoftDelete, CancellationToken cancellationToken = default) 166 { 167 Delete(entity, isSoftDelete); 168 return Task.CompletedTask; 169 } 170 171 public virtual void DeleteRange(IEnumerable<TEntity> entities, bool isSoftDelete) 172 { 173 dbSet.AttachRange(entities); 174 foreach (var entity in entities) 175 { 176 Delete(entity, isSoftDelete); 177 } 178 } 179 180 public virtual Task DeleteRangeAsync(IEnumerable<TEntity> entities, bool isSoftDelete, CancellationToken cancellationToken = default) 181 { 182 DeleteRange(entities, isSoftDelete); 183 return Task.CompletedTask; 184 } 185 186 public virtual TEntity Find(TEntity entity, bool ignoreNullValue) 187 { 188 var exp = GenerateWhere(dbContext, entity, ignoreNullValue); 189 190 return Set.SingleOrDefault(exp); 191 } 192 193 public virtual Task<TEntity> FindAsync(TEntity entity, bool ignoreNullValue) 194 { 195 var exp = GenerateWhere(dbContext, entity, ignoreNullValue); 196 197 return Set.SingleOrDefaultAsync(exp); 198 } 199 200 public virtual int SaveChanges() 201 { 202 ProcessChangedEntity(); 203 return dbContext.SaveChanges(); 204 } 205 206 public virtual Task<int> SaveChangesAsync(CancellationToken cancellationToken = default) 207 { 208 ProcessChangedEntity(); 209 return dbContext.SaveChangesAsync(cancellationToken); 210 } 211 212 public virtual IQueryable<TEntity> Set => dbSet.AsNoTracking(); 213 214 public virtual void Update(TEntity entity) 215 { 216 ResetDeletedMark(entity); 217 dbSet.Update(entity); 218 } 219 220 public virtual Task UpdateAsync(TEntity entity, CancellationToken cancellationToken = default) 221 { 222 Update(entity); 223 return Task.CompletedTask; 224 } 225 226 public virtual void UpdateRange(IEnumerable<TEntity> entities) 227 { 228 ResetDeletedMark(entities.ToArray()); 229 dbSet.UpdateRange(entities); 230 } 231 232 public virtual Task UpdateRangeAsync(IEnumerable<TEntity> entities, CancellationToken cancellationToken = default) 233 { 234 UpdateRange(entities); 235 return Task.CompletedTask; 236 } 237 238 static private Expression<Func<TEntity, bool>> GenerateWhere(TDbContext dbContext, TEntity entity, bool ignoreNullValue) 239 { 240 //查詢實體型別主鍵 241 var model = dbContext.Model.FindEntityType(typeof(TEntity)); 242 var key = model.FindPrimaryKey(); 243 244 //查詢所有主鍵屬性,如果沒有主鍵就使用所有實體屬性 245 IEnumerable<PropertyInfo> props; 246 if (key != null) 247 { 248 props = key.Properties.Select(x => x.PropertyInfo); 249 } 250 else 251 { 252 props = model.GetProperties().Select(x => x.PropertyInfo); 253 } 254 255 //生成表示式引數 256 ParameterExpression parameter = Expression.Parameter(typeof(TEntity), "x"); 257 258 //初始化提取實體型別所有屬性資訊生成屬性訪問表示式幷包裝備用 259 var keyValues = props.Select(x => new { key = x, value = x.GetValue(entity), propExp = Expression.Property(parameter, x) }); 260 //初始化儲存由基礎型別組成的屬性資訊(只要個空集合,實際資料在後面的迴圈中填充) 261 var primitiveKeyValues = keyValues.Take(0).Where(x => IsPrimitiveType(x.key.PropertyType)); 262 //初始化基礎型別屬性的相等比較表示式儲存集合(只要個空集合,實際資料在後面的迴圈中填充) 263 var equals = primitiveKeyValues.Take(0).Select(x => Expression.Equal(x.propExp, Expression.Constant(x.value))); 264 //初始化複雜型別屬性儲存集合 265 var notPrimitiveKeyValues = primitiveKeyValues; 266 267 //如果還有元素,說明上次用於提取資訊的複雜屬性內部還存在複雜屬性,接下來用提取到的基礎型別屬性資訊生成相等比較表示式併合併到儲存集合然後繼續提取剩下的複雜型別屬性的內部屬性 268 while (keyValues.Count() > 0) 269 { 270 if (ignoreNullValue) 271 { 272 keyValues = keyValues.Where(x => x.value != null); 273 } 274 //提取由基礎型別組成的屬性資訊 275 primitiveKeyValues = keyValues.Where(x => IsPrimitiveType(x.key.PropertyType)); 276 //生成基礎型別屬性的相等比較表示式 277 equals = equals.Concat(primitiveKeyValues.Select(x => Expression.Equal(x.propExp, Expression.Constant(x.value)))); 278 //提取複雜型別屬性 279 notPrimitiveKeyValues = keyValues.Except(primitiveKeyValues); 280 //分別提取各個複雜型別屬性內部的屬性資訊繼續生成內部屬性訪問表示式 281 keyValues = 282 from kv in notPrimitiveKeyValues 283 from propInfo in kv.value.GetType().GetProperties() 284 select new { key = propInfo, value = propInfo.GetValue(kv.value), propExp = Expression.Property(kv.propExp, propInfo) }; 285 } 286 287 //如果相等比較表示式有多個,將所有相等比較表示式用 && 運算連線起來 288 var and = equals.First(); 289 foreach (var eq in equals.Skip(1)) 290 { 291 and = Expression.AndAlso(and, eq); 292 } 293 294 //生成完整的過濾條件表示式,形如: (TEntity x) => { return x.a == ? && x.b == ? && x.obj1.m == ? && x.obj1.n == ? && x.obj2.u.v == ?; } 295 var exp = Expression.Lambda<Func<TEntity, bool>>(and, parameter); 296 297 //判斷某個型別是否是基礎資料型別 298 static bool IsPrimitiveType(Type type) 299 { 300 var primitiveTypes = new[] { 301 typeof(sbyte) 302 ,typeof(byte) 303 ,typeof(short) 304 ,typeof(ushort) 305 ,typeof(int) 306 ,typeof(uint) 307 ,typeof(long) 308 ,typeof(ulong) 309 ,typeof(float) 310 ,typeof(double) 311 ,typeof(decimal) 312 ,typeof(char) 313 ,typeof(string) 314 ,typeof(bool) 315 ,typeof(DateTime) 316 ,typeof(DateTimeOffset) 317 //,typeof(Enum) 318 ,typeof(Guid)}; 319 320 var tmp = 321 type.IsDerivedFrom(typeof(Nullable<>)) 322 ? Nullable.GetUnderlyingType(type) 323 : type; 324 325 return tmp.IsEnum || primitiveTypes.Contains(tmp); 326 } 327 328 return exp; 329 } 330 }
命令
命令基類
1 public abstract class MediatRCommand : MediatRCommand<Unit>, ICommand, IRequest 2 { 3 } 4 5 public abstract class MediatRCommand<TResult> : ICommand<TResult>, IRequest<TResult> 6 { 7 public Guid Id { get; } 8 9 public DateTimeOffset Timestamp { get; } 10 11 public MediatRCommand() 12 { 13 Id = Guid.NewGuid(); 14 Timestamp = DateTimeOffset.Now; 15 } 16 }
示例具體命令,命令只包含引數資訊,如何使用引數資訊完成任務是命令處理器的事
1 public class ListUserCommand : MediatRCommand<IPagedList<ApplicationUser>> 2 { 3 public PageInfo PageInfo { get; } 4 public QueryFilter QueryFilter { get; } 5 public ListUserCommand(PageInfo pageInfo, QueryFilter queryFilter) 6 { 7 PageInfo = pageInfo; 8 QueryFilter = queryFilter; 9 } 10 }
命令匯流排
1 public class MediatRCommandBus<TCommand, TResult> : ICommandBus<TCommand, TResult> 2 where TCommand : MediatRCommand<TResult> 3 { 4 private readonly IMediator mediator; 5 private readonly ICommandStore commandStore; 6 7 public MediatRCommandBus(IMediator mediator, ICommandStore commandStore) 8 { 9 this.mediator = mediator; 10 this.commandStore = commandStore; 11 } 12 13 public virtual Task<TResult> SendCommandAsync(TCommand command, CancellationToken cancellationToken = default) 14 { 15 commandStore?.SaveAsync(command, cancellationToken); 16 return mediator.Send(command, cancellationToken); 17 } 18 19 Task ICommandBus<TCommand>.SendCommandAsync(TCommand command, CancellationToken cancellationToken) 20 { 21 return SendCommandAsync(command, cancellationToken); 22 } 23 } 24 25 public class MediatRCommandBus<TCommand> : MediatRCommandBus<MediatRCommand<Unit>, Unit> 26 where TCommand : MediatRCommand<Unit> 27 { 28 public MediatRCommandBus(IMediator mediator, ICommandStore commandStore) : base(mediator, commandStore) 29 { 30 } 31 }
命令處理器
命令處理器基類
1 public abstract class MediatRCommandHandler<TCommand, TResult> : ICommandHandler<TCommand, TResult>, IRequestHandler<TCommand, TResult> 2 where TCommand : MediatRCommand<TResult> 3 { 4 public abstract Task<TResult> Handle(TCommand command, CancellationToken cancellationToken = default); 5 6 Task ICommandHandler<TCommand>.Handle(TCommand command, CancellationToken cancellationToken) 7 { 8 return Handle(command, cancellationToken); 9 } 10 } 11 12 public abstract class MediatRCommandHandler<TCommand> : MediatRCommandHandler<TCommand, Unit> 13 where TCommand : MediatRCommand 14 { 15 }
具體命令處理器示例,使用注入的倉儲查詢資料,ApplicationUser 在這裡就是事實上的聚合根實體
1 public class ListUserCommandHandler : MediatRCommandHandler<ListUserCommand, IPagedList<ApplicationUser>> 2 { 3 private IEFCoreRepository<ApplicationUser, int, ApplicationIdentityDbContext> repository; 4 5 public ListUserCommandHandler(IEFCoreRepository<ApplicationUser, int, ApplicationIdentityDbContext> repository) 6 { 7 this.repository = repository; 8 } 9 10 public override Task<IPagedList<ApplicationUser>> Handle(ListUserCommand command, CancellationToken cancellationToken = default) 11 { 12 return repository.Set 13 .OrderBy(x => x.Id) 14 .ToPagedListAsync(command.PageInfo.PageNumber, command.PageInfo.PageSize); 15 } 16 }
命令儲存
什麼都沒幹,實際使用時可以使用資料庫儲存相關資訊
1 public class InProcessCommandStore : ICommandStore<bool> 2 { 3 public bool Save(ICommand command) 4 { 5 return SaveAsync(command).Result; 6 } 7 8 public Task<bool> SaveAsync(ICommand command, CancellationToken cancellationToken = default) 9 { 10 return Task.FromResult(true); 11 } 12 13 void ICommandStore.Save(ICommand command) 14 { 15 Save(command); 16 } 17 18 Task ICommandStore.SaveAsync(ICommand command, CancellationToken cancellationToken) 19 { 20 return SaveAsync(command, cancellationToken); 21 } 22 }
事件部分和命令基本相同,具體程式碼可以到文章末尾下載專案程式碼檢視。
使用
在 Startup.ConfigureServices 方法中註冊相關服務,事件匯流排和命令匯流排都使用 MediatR 實現。.Net Core 內建 DI 支援註冊泛型服務,所以某個實體在實際使用時注入泛型倉儲就表示這個實體是聚合根,不用提前定義具體的聚合根實體倉儲,所以刪除使用程式碼相當於刪除了倉儲定義。
1 services.AddScoped(typeof(ICommandBus<>), typeof(MediatRCommandBus<>)); 2 services.AddScoped(typeof(ICommandBus<,>), typeof(MediatRCommandBus<,>)); 3 services.AddScoped(typeof(ICommandStore), typeof(InProcessCommandStore)); 4 services.AddScoped(typeof(IEventBus), typeof(MediatREventBus)); 5 services.AddScoped(typeof(IEventBus<>), typeof(MediatREventBus<>)); 6 services.AddScoped(typeof(IEventStore), typeof(InProcessEventStore)); 7 services.AddScoped(typeof(IEFCoreRepository<,>), typeof(EFCoreRepository<,>)); 8 services.AddScoped(typeof(IEFCoreRepository<,,>), typeof(EFCoreRepository<,,>)); 9 services.AddMediatR(typeof(ListUserCommandHandler).GetTypeInfo().Assembly);
示例使用比較簡單,就不定義服務了,如果需要定義服務,那麼使用服務的一般是命令處理器,倉儲由服務使用。這裡命令處理器直接使用倉儲。在控制器中注入命令匯流排,向命令匯流排傳送命令就可以獲取結果。MediatR 會自動根據傳送的命令型別查詢匹配的命令處理器去呼叫。
1 [ApiController] 2 [Route("api/[controller]")] 3 public class UsersController : ControllerBase 4 { 5 private readonly ICommandBus<ListUserCommand, IPagedList<ApplicationUser>> _commandBus; 6 private readonly IMapper _mapper; 7 8 public UsersController(ICommandBus<ListUserCommand, IPagedList<ApplicationUser>> commandBus, IMapper mapper) 9 { 10 _commandBus = commandBus; 11 _mapper = mapper; 12 } 13 14 /// <summary> 15 /// 獲取使用者列表 16 /// </summary> 17 /// <param name="page">頁碼</param> 18 /// <param name="size">每頁條目數</param> 19 /// <returns>使用者列表</returns> 20 [HttpGet] 21 [Produces("application/json")] //宣告介面響應 json 資料 22 public async Task<IActionResult> GetAsync(int? page, int? size) 23 { 24 var cmd = new ListUserCommand(new PageInfo(page ?? 1, size ?? 10), new QueryFilter()); 25 var users = await _commandBus.SendCommandAsync(cmd, default); 26 27 return new JsonResult( 28 new 29 { 30 rows = users.Select(u => _mapper.Map<ApplicationUserDto>(u)), 31 total = users.PageCount, //總頁數 32 page = users.PageNumber, //當前頁碼 33 records = users.TotalItemCount //總記錄數 34 } 35 ); 36 } 37 }
使用就是這麼簡單。使用者根本不需要知道命令處理器的存在,把命令傳送到匯流排,等著接收結果就可以了。
事件一般由命令處理器引發,可以改造命令處理器用 DI 注入事件匯流排,然後在命令處理器中向事件匯流排傳送事件,事件匯流排就會自動觸發相應的事件處理器。
結語
完整的流程大概就是:控制器使用注入的服務執行業務流程,業務服務向命令匯流排傳送命令,命令匯流排觸發處理器處理命令,命令處理器向事件匯流排傳送事件,事件匯流排觸發事件處理器處理事件,事件處理器在處理事件後向事件匯流排傳送新的事件觸發後續事件處理器繼續處理新的事件(如果需要),直到最後不傳送事件的事件處理器完成處理。整個流程完結。在此過程中匯流排會自動呼叫注入的匯流排訊息儲存來持久化命令和事件,至此,一個環環相扣的極簡 DDD+CQRS+EDA+ES 架構搭建完成!
想要實際體驗的朋友可以到文章末尾下載專案並執行體驗。啟動除錯後訪問 /swagger 然後嘗試體驗呼叫 api/users 介面。
轉載請完整保留以下內容並在顯眼位置標註,未經授權刪除以下內容進行轉載盜用的,保留追究法律責任的權利!
本文地址:https://www.cnblogs.com/coredx/p/12364960.html
完整原始碼:Github
裡面有各種小東西,這只是其中之一,不嫌棄的話可以Star一