1. 程式人生 > >淺談命令查詢職責分離(CQRS)模式

淺談命令查詢職責分離(CQRS)模式

sel 參考 .html 主庫 mat 再處理 remove 編輯 obj

在常用的三層架構中,通常都是通過數據訪問層來修改或者查詢數據,一般修改和查詢使用的是相同的實體。在一些業務邏輯簡單的系統中可能沒有什麽問題,但是隨著系統邏輯變得復雜,用戶增多,這種設計就會出現一些性能問題。雖然在DB上可以做一些讀寫分離的設計,但在業務上如果在讀寫方面混合在一起的話,仍然會出現一些問題。

本文介紹了命令查詢職責分離模式(Command Query Responsibility Segregation,CQRS),該模式從業務上分離修改 (Command,增,刪,改,會對系統狀態進行修改)和查詢(Query,查,不會對系統狀態進行修改)的行為。從而使得邏輯更加清晰,便於對不同部分進行針對性的優化。文章首先簡要介紹了傳統的CRUD方式存在的問題,接著介紹了CQRS模式,最後以一個簡單的在線日記系統演示了如何實現CQRS模式。要談到讀寫操作,首先我們來看傳統的CRUD的問題。

一 CRUD方式的問題

在以前的管理系統中,命令(Command,通常用來更新數據,操作DB)和查詢(Query)通常使用的是在數據訪問層中Repository中的實體對象(這些對象是對DB中表的映射),這些實體有可能是SQLServer中的一行數據或者多個表。

通常對DB執行的增,刪,改,查(CRUD)都是針對的系統的實體對象。如通過數據訪問層獲取數據,然後通過數據傳輸對象DTO傳給表現層。或者,用戶需要更新數據,通過DTO對象將數據傳給Model,然後通過數據訪問層寫回數據庫,系統中的所有交互都是和數據查詢和存儲有關,可以認為是數據驅動(Data-Driven)的,如下圖:

技術分享圖片

對於一些比較簡單的系統,使用這種CRUD的設計方式能夠滿足要求。特別是通過一些代碼生成工具及ORM等能夠非常方便快速的實現功能。

但是傳統的CRUD方法有一些問題:

  • 使用同一個對象實體來進行數據庫讀寫可能會太粗糙,大多數情況下,比如編輯的時候可能只需要更新個別字段,但是卻需要將整個對象都穿進去,有些字段其實是不需要更新的。在查詢的時候在表現層可能只需要個別字段,但是需要查詢和返回整個實體對象。
  • 使用同一實體對象對同一數據進行讀寫操作的時候,可能會遇到資源競爭的情況,經常要處理的鎖的問題,在寫入數據的時候,需要加鎖。讀取數據的時候需要判斷是否允許臟讀。這樣使得系統的邏輯性和復雜性增加,並且會對系統吞吐量的增長會產生影響。
  • 同步的,直接與數據庫進行交互在大數據量同時訪問的情況下可能會影響性能和響應性,並且可能會產生性能瓶頸。
  • 由於同一實體對象都會在讀寫操作中用到,所以對於安全和權限的管理會變得比較復雜。

這裏面很重要的一個問題是,系統中的讀寫頻率比,是偏向讀,還是偏向寫,就如同一般的數據結構在查找和修改上時間復雜度不一樣,在設計系統的結構時也需要考慮這樣的問題。解決方法就是我們經常用到的對數據庫進行讀寫分離。 讓主數據庫處理事務性的增,刪,改操作(Insert,Update,Delete)操作,讓從數據庫處理查詢操作(Select操作),數據庫復制被用來將事務性操作導致的變更同步到集群中的從數據庫。這只是從DB角度處理了讀寫分離,但是從業務或者系統上面讀和寫仍然是存放在一起的。他們都是用的同一個實體對象。

要從業務上將讀和寫分離,就是接下來要介紹的命令查詢職責分離模式。

二 什麽是CQRS

CQRS最早來自於Betrand Meyer(Eiffel語言之父,開-閉原則OCP提出者)在 Object-Oriented Software Construction 這本書中提到的一種 命令查詢分離 (Command Query Separation,CQS) 的概念。其基本思想在於,任何一個對象的方法可以分為兩大類:

  • 命令(Command):不返回任何結果(void),但會改變對象的狀態。
  • 查詢(Query):返回結果,但是不會改變對象的狀態,對系統沒有副作用。

根據CQS的思想,任何一個方法都可以拆分為命令和查詢兩部分,比如:

private int i = 0;
private int Increase(int value)
{
    i += value;
    return i;
}

這個方法,我們執行了一個命令即對變量i進行相加,同時又執行了一個Query,即查詢返回了i的值,如果按照CQS的思想,該方法可以拆成Command和Query兩個方法,如下:

private void IncreaseCommand(int value)
{
    i += value;
}
private int QueryValue()
{
    return i;
}

操作和查詢分離使得我們能夠更好的把握對象的細節,能夠更好的理解哪些操作會改變系統的狀態。當然CQS也有一些缺點,比如代碼需要處理多線程的情況。

CQRS是對CQS模式的進一步改進成的一種簡單模式。 它由Greg Young在CQRS, Task Based UIs, Event Sourcing agh! 這篇文章中提出。“CQRS只是簡單的將之前只需要創建一個對象拆分成了兩個對象,這種分離是基於方法是執行命令還是執行查詢這一原則來定的(這個和CQS的定義一致)”。

CQRS使用分離的接口將數據查詢操作(Queries)和數據修改操作(Commands)分離開來,這也意味著在查詢和更新過程中使用的數據模型也是不一樣的。這樣讀和寫邏輯就隔離開來了。

技術分享圖片

使用CQRS分離了讀寫職責之後,可以對數據進行讀寫分離操作來改進性能,可擴展性和安全。如下圖:

技術分享圖片

主數據庫處理CUD,從庫處理R,從庫的的結構可以和主庫的結構完全一樣,也可以不一樣,從庫主要用來進行只讀的查詢操作。在數量上從庫的個數也可以根據查詢的規模進行擴展,在業務邏輯上,也可以根據專題從主庫中劃分出不同的從庫。從庫也可以實現成ReportingDatabase,根據查詢的業務需求,從主庫中抽取一些必要的數據生成一系列查詢報表來存儲。

技術分享圖片

使用ReportingDatabase的一些優點通常可以使得查詢變得更加簡單高效:

  • ReportingDatabase的結構和數據表會針對常用的查詢請求進行設計。
  • ReportingDatabase數據庫通常會去正規化,存儲一些冗余而減少必要的Join等聯合查詢操作,使得查詢簡化和高效,一些在主數據庫中用不到的數據信息,在ReportingDatabase可以不用存儲。
  • 可以對ReportingDatabase重構優化,而不用去改變操作數據庫。
  • 對ReportingDatabase數據庫的查詢不會給操作數據庫帶來任何壓力。
  • 可以針對不同的查詢請求建立不同的ReportingDatabase庫。

當然這也有一些缺點,比如從庫數據的更新。如果使用SQLServer,本身也提供了一些如故障轉移和復制機制來方便部署。

三 什麽時候可以考慮CQRS

CQRS模式有一些優點:

  1. 分工明確,可以負責不同的部分
  2. 將業務上的命令和查詢的職責分離能夠提高系統的性能、可擴展性和安全性。並且在系統的演化中能夠保持高度的靈活性,能夠防止出現CRUD模式中,對查詢或者修改中的某一方進行改動,導致另一方出現問題的情況。
  3. 邏輯清晰,能夠看到系統中的那些行為或者操作導致了系統的狀態變化。
  4. 可以從數據驅動(Data-Driven) 轉到任務驅動(Task-Driven)以及事件驅動(Event-Driven).

在下場景中,可以考慮使用CQRS模式:

  1. 當在業務邏輯層有很多操作需要相同的實體或者對象進行操作的時候。CQRS使得我們可以對讀和寫定義不同的實體和方法,從而可以減少或者避免對某一方面的更改造成沖突
  2. 對於一些基於任務的用戶交互系統,通常這類系統會引導用戶通過一系列復雜的步驟和操作,通常會需要一些復雜的領域模型,並且整個團隊已經熟悉領域驅動設計技術。寫模型有很多和業務邏輯相關的命令操作的堆,輸入驗證,業務邏輯驗證來保證數據的一致性。讀模型沒有業務邏輯以及驗證堆,僅僅是返回DTO對象為視圖模型提供數據。讀模型最終和寫模型相一致。
  3. 適用於一些需要對查詢性能和寫入性能分開進行優化的系統,尤其是讀/寫比非常高的系統,橫向擴展是必須的。比如,在很多系統中讀操作的請求時遠大於寫操作。為適應這種場景,可以考慮將寫模型抽離出來單獨擴展,而將寫模型運行在一個或者少數幾個實例上。少量的寫模型實例能夠減少合並沖突發生的情況
  4. 適用於一些團隊中,一些有經驗的開發者可以關註復雜的領域模型,這些用到寫操作,而另一些經驗較少的開發者可以關註用戶界面上的讀模型。
  5. 對於系統在將來會隨著時間不段演化,有可能會包含不同版本的模型,或者業務規則經常變化的系統
  6. 需要和其他系統整合,特別是需要和事件溯源Event Sourcing進行整合的系統,這樣子系統的臨時異常不會影響整個系統的其他部分。

但是在以下場景中,可能不適宜使用CQRS:

  1. 領域模型或者業務邏輯比較簡單,這種情況下使用CQRS會把系統搞復雜。
  2. 對於簡單的,CRUD模式的用戶界面以及與之相關的數據訪問操作已經足夠的話,沒必要使用CQRS,這些都是一個簡單的對數據進行增刪改查。
  3. 不適合在整個系統中到處使用該模式。在整個數據管理場景中的特定模塊中CQRS可能比較有用。但是在有些地方使用CQRS會增加系統不必要的復雜性。

四 CQRS與Event Sourcing的關系

在CQRS中,查詢方面,直接通過方法查詢數據庫,然後通過DTO將數據返回。在操作(Command)方面,是通過發送Command實現,由CommandBus處理特定的Command,然後由Command將特定的Event發布到EventBus上,然後EventBus使用特定的Handler來處理事件,執行一些諸如,修改,刪除,更新等操作。這裏,所有與Command相關的操作都通過Event實現。這樣我們可以通過記錄Event來記錄系統的運行歷史記錄,並且能夠方便的回滾到某一歷史狀態。Event Sourcing就是用來進行存儲和管理事件的。這裏不展開介紹。

五 CQRS的簡單實現

CQRS模式在思想上比較簡單,但是實現上還是有些復雜。它涉及到DDD,以及Event Sourcing,這裏使用codeproject上的 Introduction to CQRS 這篇文章的例子來說明CQRS模式。這個例子是一個簡單的在線記日誌(Diary)系統,實現了日誌的增刪改查功能。整體結構如下:

技術分享圖片

上圖很清晰的說明了CQRS在讀寫方面的分離,在讀方面,通過QueryFacade到數據庫裏去讀取數據,這個庫有可能是ReportingDB。在寫方面,比較復雜,操作通過Command發送到CommandBus上,然後特定的CommandHandler處理請求,產生對應的Event,將Eevnt持久化後,通過EventBus特定的EevntHandler對數據庫進行修改等操作。

例子代碼可以到codeproject上下載,整體結構如下:

技術分享圖片

由三個項目構成,Diary.CQRS包含了所有的Domain和消息對象。Configuration通過使用一個名為StructMap的IOC來初始化一些變量方便Web調用,Web是一個簡單的MVC3項目,在Controller中有與CQRS交互的代碼。

下面分別看Query和Command方面的實現:

Query方向的實現

查詢方面很簡單,日誌列表和明細獲取就是簡單的查詢。下面先看列表查詢部分的代碼。

public ActionResult Index()
{
    ViewBag.Model = ServiceLocator.ReportDatabase.GetItems();
    return View();
}

public ActionResult Edit(Guid id)
{
    var item = ServiceLocator.ReportDatabase.GetById(id);
    var model = new DiaryItemDto()
    {
        Description = item.Description,
        From = item.From,
        Id = item.Id,
        Title = item.Title,
        To = item.To,
        Version = item.Version
    };
    return View(model);
}

ReportDatabase的GetItems和GetById(id)方法就是簡單的查詢,從命名可以看出他是ReportDatabase。

public class ReportDatabase : IReportDatabase
{
    static List<DiaryItemDto> items = new List<DiaryItemDto>();

    public DiaryItemDto GetById(Guid id)
    {
        return items.Where(a => a.Id == id).FirstOrDefault();
    }

    public void Add(DiaryItemDto item)
    {
        items.Add(item);
    }

    public void Delete(Guid id)
    {
        items.RemoveAll(i => i.Id == id);
    }

    public List<DiaryItemDto> GetItems()
    {
        return items;
    } 
}

ReportDataBase只是在內部維護了一個List的DiaryItemDto列表。在使用的時候,是通過IRepositoryDatabase對其進行操作的,這樣便於mock代碼。

Query方面的代碼很簡單。在實際的應用中,這一塊就是直接對DB進行查詢,然後通過DTO對象返回,這個DB可能是應對特定場景的報表數據庫,這樣可以提升查詢性能。

下面來看Command方向的實現:

Command方向的實現

Command的實現比較復雜,下面以簡單的創建一個新的日誌來說明。

在MVC的Control中,可以看到Add的Controller中只調用了一句話:

[HttpPost]
public ActionResult Add(DiaryItemDto item)
{
    ServiceLocator.CommandBus.Send(new CreateItemCommand(Guid.NewGuid(), item.Title, item.Description, -1, item.From, item.To));

    return RedirectToAction("Index");
}

首先聲明了一個CreateItemCommand,這個Command只是保存了一些必要的信息。

public class CreateItemCommand:Command
{
    public string Title { get; internal set; }
    public string Description { get;internal set; }
    public DateTime From { get; internal set; }
    public DateTime To { get; internal set; }

    public CreateItemCommand(Guid aggregateId, string title, 
        string description,int version,DateTime from, DateTime to)
        : base(aggregateId,version)
    {
        Title = title;
        Description = description;
        From = from;
        To = to;
    }
}

然後將Command發送到了CommandBus上,其實就是讓CommandBus來選擇合適的CommandHandler來處理。

public class CommandBus:ICommandBus
{
    private readonly ICommandHandlerFactory _commandHandlerFactory;

    public CommandBus(ICommandHandlerFactory commandHandlerFactory)
    {
        _commandHandlerFactory = commandHandlerFactory;
    }

    public void Send<T>(T command) where T : Command
    {
        var handler = _commandHandlerFactory.GetHandler<T>();
        if (handler != null)
        {
            handler.Execute(command);
        }
        else
        {
            throw new UnregisteredDomainCommandException("no handler registered");
        }
    }        
}

這個裏面需要值得註意的是CommandHandlerFactory這個類型的GetHandler方法,他接受一個類型為T的泛型,這裏就是我們之前傳入的CreateItemCommand。來看他的GetHandler方法。

public class StructureMapCommandHandlerFactory : ICommandHandlerFactory
{
    public ICommandHandler<T> GetHandler<T>() where T : Command
    {
        var handlers = GetHandlerTypes<T>().ToList();

        var cmdHandler = handlers.Select(handler => 
            (ICommandHandler<T>)ObjectFactory.GetInstance(handler)).FirstOrDefault();
            
        return cmdHandler;
    }
        
    private IEnumerable<Type> GetHandlerTypes<T>() where T : Command
    {
        var handlers = typeof(ICommandHandler<>).Assembly.GetExportedTypes()
            .Where(x => x.GetInterfaces()
                .Any(a => a.IsGenericType && a.GetGenericTypeDefinition() == typeof(ICommandHandler<>) ))
                .Where(h=>h.GetInterfaces()
                    .Any(ii=>ii.GetGenericArguments()
                        .Any(aa=>aa==typeof(T)))).ToList();

           
        return handlers;
    }

}

這裏可以看到,他首先查找當前的程序集中(ICommandHandler)所在的程序集中的所有的實現了ICommandHandler的接口的類型,然後在所有的類型找查找實現了該泛型接口並且泛型的類型參數類型為T類型的所有類型。以上面的代碼為例,就是要找出實現了ICommandHandler<CreateItemCommand>接口的類型。可以看到就是CreateItemCommandHandler類型。

public class CreateItemCommandHandler : ICommandHandler<CreateItemCommand>
{
    private IRepository<DiaryItem> _repository;

    public CreateItemCommandHandler(IRepository<DiaryItem> repository)
    {
        _repository = repository;
    }

    public void Execute(CreateItemCommand command)
    {
        if (command == null)
        {
            throw new ArgumentNullException("command");
        }
        if (_repository == null)
        {
            throw new InvalidOperationException("Repository is not initialized.");
        }
        var aggregate = new DiaryItem(command.Id, command.Title, command.Description, command.From, command.To);
        aggregate.Version = -1;
        _repository.Save(aggregate, aggregate.Version);
    }
}

找到之後然後使用IOC實例化了該對象返回。

現在CommandBus中,找到了處理特定Command的Handler。然後執行該類型的Execute方法。

可以看到在該類型中實例化了一個名為aggregate的DiaryItem對象。這個和我們之前查詢所用到的DiaryItemDto有所不同,這個一個領域對象,裏面包含了一系列事件。

public class DiaryItem : AggregateRoot, 
    IHandle<ItemCreatedEvent>,
    IHandle<ItemRenamedEvent>,
    IHandle<ItemFromChangedEvent>, 
    IHandle<ItemToChangedEvent>,
    IHandle<ItemDescriptionChangedEvent>,
    IOriginator
{
    public string Title { get; set; }

    public DateTime From { get; set; }
    public DateTime To { get; set; }
    public string Description { get; set; }

    public DiaryItem()
    {
            
    }

    public DiaryItem(Guid id,string title, string description,  DateTime from, DateTime to)
    {
        ApplyChange(new ItemCreatedEvent(id, title,description, from, to));
    }

    public void ChangeTitle(string title)
    {
        ApplyChange(new ItemRenamedEvent(Id, title));
    }

    public void Handle(ItemCreatedEvent e)
    {
        Title = e.Title;
        From = e.From;
        To = e.To;
        Id = e.AggregateId;
        Description = e.Description;
        Version = e.Version;
    }

    public void Handle(ItemRenamedEvent e)
    {
        Title = e.Title;
    }
    ...
}

ItemCreatedEvent 事件的定義如下,其實就是用來存儲傳輸過程中需要用到的數據。

public class ItemCreatedEvent:Event
{
    public string Title { get; internal set; }
    public DateTime From { get; internal set; }
    public DateTime To { get; internal set; }
    public string Description { get;internal set; }

    public ItemCreatedEvent(Guid aggregateId, string title ,
        string description, DateTime from, DateTime to)
    {
        AggregateId = aggregateId;
        Title = title;
        From = from;
        To = to;
        Description = description;
    }
}

可以看到在Domain對象中,除了定義基本的字段外,還定義了一些相應的事件,比如在構造函數中,實際上是發起了一個名為ItemCreateEvent的事件,同時還定義了處理時間的邏輯,這些邏輯都放在名為Handle的接口方法發,例如ItemCerateEvent的處理方法為Handle(ItemCreateEvent)方法。

ApplyChange方法在AggregateRoot對象中,他是聚集根,這是DDD中的概念。通過這個根可以串起所有對象。 該類實現了IEventProvider接口,他保存了所有在_changes中的所有沒有提交的變更,其中的ApplyChange的用來為特定的Event查找Eventhandler的方法:

public abstract class AggregateRoot : IEventProvider
{
    private readonly List<Event> _changes;

    public Guid Id { get; internal set; }
    public int Version { get; internal set; }
    public int EventVersion { get; protected set; }

    protected AggregateRoot()
    {
        _changes = new List<Event>();
    }

    public IEnumerable<Event> GetUncommittedChanges()
    {
        return _changes;
    }

    public void MarkChangesAsCommitted()
    {
        _changes.Clear();
    }

    public void LoadsFromHistory(IEnumerable<Event> history)
    {
        foreach (var e in history) ApplyChange(e, false);
        Version = history.Last().Version;
        EventVersion = Version;
    }

    protected void ApplyChange(Event @event)
    {
        ApplyChange(@event, true);
    }

    private void ApplyChange(Event @event, bool isNew)
    {
        dynamic d = this;

        d.Handle(Converter.ChangeTo(@event, @event.GetType()));
        if (isNew)
        {
            _changes.Add(@event);
        }
    }
}

在ApplyChange的實現中,this其實就是對應的實現了AggregateRoot的DiaryItem的Domain對象,調用的Handle方法就是我們之前在DiaryItem中定義的行為。然後將該event保存在內部的未提交的事件列表中。相關的信息及事件都保存在了定義的aggregate對象中並返回。

然後Command繼續執行,然後調用了_repository.Save(aggregate, aggregate.Version);這個方法。先看這個Repository對象。

public class Repository<T> : IRepository<T> where T : AggregateRoot, new()
{
    private readonly IEventStorage _storage;
    private static object _lockStorage = new object();

    public Repository(IEventStorage storage)
    {
        _storage = storage;
    } 

    public void Save(AggregateRoot aggregate, int expectedVersion)
    {
        if (aggregate.GetUncommittedChanges().Any())
        {
            lock (_lockStorage)
            {
                var item = new T();

                if (expectedVersion != -1)
                {
                    item = GetById(aggregate.Id);
                    if (item.Version != expectedVersion)
                    {
                        throw new ConcurrencyException(string.Format("Aggregate {0} has been previously modified",
                                                                        item.Id));
                    }
                }

                _storage.Save(aggregate);
            }
        }
    }

    public T GetById(Guid id)
    {
        IEnumerable<Event> events;
        var memento = _storage.GetMemento<BaseMemento>(id);
        if (memento != null)
        {
            events = _storage.GetEvents(id).Where(e=>e.Version>=memento.Version);
        }
        else
        {
            events = _storage.GetEvents(id);
        }
        var obj = new T();
        if(memento!=null)
            ((IOriginator)obj).SetMemento(memento);
            
        obj.LoadsFromHistory(events);
        return obj;
    }
}

這個方法主要是用來對事件進行持久化的。 所有的聚合的變動都會存在該Repository中,首先,檢查當前的聚合是否和之前存儲在storage中的聚合一致,如果不一致,則表示對象在其他地方被更改過,拋出ConcurrencyException,否則將該變動保存在Event Storage中。

IEventStorage用來存儲所有的事件,其實現類型為InMemoryEventStorage。

public class InMemoryEventStorage:IEventStorage
{
    private List<Event> _events;
    private List<BaseMemento> _mementos;

    private readonly IEventBus _eventBus;

    public InMemoryEventStorage(IEventBus eventBus)
    {
        _events = new List<Event>();
        _mementos = new List<BaseMemento>();
        _eventBus = eventBus;
    }

    public IEnumerable<Event> GetEvents(Guid aggregateId)
    {
        var events = _events.Where(p => p.AggregateId == aggregateId).Select(p => p);
        if (events.Count() == 0)
        {
            throw new AggregateNotFoundException(string.Format("Aggregate with Id: {0} was not found", aggregateId));
        }
        return events;
    }

    public void Save(AggregateRoot aggregate)
    {
        var uncommittedChanges = aggregate.GetUncommittedChanges();
        var version = aggregate.Version;
            
        foreach (var @event in uncommittedChanges)
        {
            version++;
            if (version > 2)
            {
                if (version % 3 == 0)
                {
                    var originator = (IOriginator)aggregate;
                    var memento = originator.GetMemento();
                    memento.Version = version;
                    SaveMemento(memento);
                }
            }
            @event.Version=version;
            _events.Add(@event);
        }
        foreach (var @event in uncommittedChanges)
        {
            var desEvent = Converter.ChangeTo(@event, @event.GetType());
            _eventBus.Publish(desEvent);
        }
    }

    public T GetMemento<T>(Guid aggregateId) where T : BaseMemento
    {
        var memento = _mementos.Where(m => m.Id == aggregateId).Select(m=>m).LastOrDefault();
        if (memento != null)
            return (T) memento;
        return null;
    }

    public void SaveMemento(BaseMemento memento)
    {
        _mementos.Add(memento);
    }
}

在GetEvent方法中,會找到所有的聚合根Id相關的事件。在Save方法中,將所有的事件保存在內存中,然後每隔三個事件建立一個快照。可以看到這裏面使用了備忘錄模式。

然後在foreach循環中,對於所有的沒有提交的變更,EventBus將該事件發布出去。

現在,所有的發生變更的事件已經記錄下來了。事件已經被發布到EventBus上,然後對應的EventHandler再處理對應的事件,然後與DB交互。現在來看EventBus的Publish方法。

public class EventBus:IEventBus
{
    private IEventHandlerFactory _eventHandlerFactory;

    public EventBus(IEventHandlerFactory eventHandlerFactory)
    {
        _eventHandlerFactory = eventHandlerFactory;
    }
        
    public void Publish<T>(T @event) where T : Event
    {
        var handlers = _eventHandlerFactory.GetHandlers<T>();
        foreach (var eventHandler in handlers)
        {
            eventHandler.Handle(@event);
        }
    }
}

可以看到EventBus的Publish和CommandBus中的Send方法很相似,都是首先通過EventHandlerFactory查找對應Event的Handler,然後調用其Handler方法。比如

public class StructureMapEventHandlerFactory : IEventHandlerFactory
{
    public IEnumerable<IEventHandler<T>> GetHandlers<T>() where T : Event
    {
        var handlers = GetHandlerType<T>();
            
        var lstHandlers = handlers.Select(handler => (IEventHandler<T>) ObjectFactory.GetInstance(handler)).ToList();
        return lstHandlers;
    }

    private static IEnumerable<Type> GetHandlerType<T>() where T : Event
    {
           
        var handlers = typeof(IEventHandler<>).Assembly.GetExportedTypes()
            .Where(x => x.GetInterfaces()
                .Any(a => a.IsGenericType && a.GetGenericTypeDefinition() == typeof(IEventHandler<>)))
                .Where(h => h.GetInterfaces()
                    .Any(ii => ii.GetGenericArguments()
                        .Any(aa => aa == typeof(T))))
                 .ToList();
        return handlers;
    }
}

然後返回並實例化了ItemCreatedEventHandler 對象,該對象的實現如下:

public class ItemCreatedEventHandler : IEventHandler<ItemCreatedEvent>
{
    private readonly IReportDatabase _reportDatabase;
    public ItemCreatedEventHandler(IReportDatabase reportDatabase)
    {
        _reportDatabase = reportDatabase;
    }
    public void Handle(ItemCreatedEvent handle)
    {
        DiaryItemDto item = new DiaryItemDto()
            {
                Id = handle.AggregateId,
                Description =  handle.Description,
                From = handle.From,
                Title = handle.Title,
                To=handle.To,
                Version =  handle.Version
            };

        _reportDatabase.Add(item);
    }
}

可以看到在Handler方法中,從事件中獲取參數,然後新建DTO對象,然後將該對象更新到DB中。

到此,整個Command執行完成。

六 結語

CQRS是一種思想很簡單清晰的設計模式,他通過在業務上分離操作和查詢來使得系統具有更好的可擴展性及性能,使得能夠對系統的不同部分進行擴展和優化。在CQRS中,所有的涉及到對DB的操作都是通過發送Command,然後特定的Command觸發對應事件來完成操作,這個過程是異步的,並且所有涉及到對系統的變更行為都包含在具體的事件中,結合Eventing Source模式,可以記錄下所有的事件,而不是以往的某一點的數據信息,這些信息可以作為系統的操作日誌,可以來對系統進行回退或者重放。

CQRS 模式在實現上有些復雜,很多地方比如AggregationRoot、Domain Object都涉及到DDD中的相關概念,本人對DDD不太懂。這裏僅為了演示CQRS模式,所以使用的例子是codeproject上的,末尾列出了一些參考文章,如果您想了解更多,可以有針對性的閱讀。

最後,希望CQRS模式能讓您在設計高性能,可擴展性的程序時能夠多一種選擇和考慮。

七 參考文獻

  1. Introduction to CQRS http://www.codeproject.com/Articles/555855/Introduction-to-CQRS
  2. CQRS http://martinfowler.com/bliki/CQRS.html
  3. CQRS Journey http://msdn.microsoft.com/en-us/library/jj554200.aspx
  4. Command and Query Responsibility Segregation (CQRS) Pattern http://msdn.microsoft.com/en-us/library/dn568103.aspx
  5. EntityFramework之領域驅動設計實踐:CQRS體系結構模式 http://www.cnblogs.com/daxnet/archive/2010/08/02/1790299.html
  6. Event Sourcing Pattern http://msdn.microsoft.com/en-us/library/dn589792.aspx

https://www.cnblogs.com/yangecnu/p/Introduction-CQRS.html

淺談命令查詢職責分離(CQRS)模式