1. 程式人生 > >CQRS之旅——旅程7(增加彈性和優化效能)

CQRS之旅——旅程7(增加彈性和優化效能)

旅程7:增加彈性和優化效能

到達旅程的終點:最後的任務。
“你不能飛的像一隻長著鷦鷯翅膀的老鷹那樣。”亨利·哈德遜

我們旅程的最後階段的三個主要目標是使系統對故障更具彈性,提高UI的響應能力,並確保我們的設計是可伸縮的。加強系統的工作主要集中在訂單和註冊限界上下文中的RegistrationProcessManager類。效能改進工作的重點是當訂單建立時UI與領域域模型的互動方式。

本章的工作術語定義:

本章使用了一些術語,我們將在下面進行描述。有關更多細節和可能的替代定義,請參閱參考指南中的“深入CQRS和ES”。

  • 命令(Command):命令是要求系統執行更改系統狀態的操作。命令是必須服從(執行)的一種指令,例如:MakeSeatReservation。在這個限界上下文中,命令要麼來自使用者發起請求時的UI,要麼來自流程管理器(當流程管理器指示聚合執行某個操作時)。命令由單個接收方只處理一次,命令要麼通過命令匯流排(command bus)傳遞給接收方,要麼直接在程序中傳遞。如果是通過匯流排傳遞的,則該命令是非同步傳送的,在程序中傳遞,命令將同步傳送。

  • 事件(Event):一個事件,比如OrderConfirmed,描述了系統中發生的一些事情,通常是一個命令的結果。領域模型中的聚合引發事件。事件也可以來自其他限界上下文。多個訂閱者可以處理特定的事件。聚合將事件釋出到事件匯流排。處理程式在事件總線上註冊特定型別的事件,然後將事件傳遞給訂閱伺服器。在訂單和註冊限界上下文中,訂閱者是流程管理器和讀取模型生成器。

  • 快照(Snapshots):快照是一種可以應用於事件源的優化。在重新還原(rehydrated)聚合時,不需要重播與聚合相關的所有持久化事件,而是載入聚合狀態的最新副本,然後只重播儲存快照後持久的事件。通過這種方式,可以減少必須從事件儲存里加載的資料量。

  • 冪等性(Idempotency):冪等性是一個操作的特性,這意味著該操作可以多次應用而不改變結果。例如,“將x的值設定為10”的操作是冪等的,而“將x的值加1”的操作不是冪等的。在訊息傳遞環境中,如果訊息可以多次傳遞而不改變結果,則訊息是冪等的:這可能是因為訊息本身的性質,也可能是因為系統處理訊息的方式。

  • 最終一致性(Eventual consistency):最終一致性是一個一致性模型,它不能保證立即訪問更新的值。對資料物件進行更新後,儲存系統不保證對該物件的後續訪問將返回更新後的值。然而,儲存系統確實保證,如果在足夠長的時間內沒有對物件進行新的更新,那麼最終所有訪問都可以返回最後更新的值。

架構

該應用程式旨在部署到Microsoft Azure。在旅程的這個階段,應用程式由兩個角色組成,一個包含ASP.Net MVC Web應用程式的web角色和一個包含訊息處理程式和領域物件的工作角色。應用程式在寫端和讀端都使用Azure SQL DataBase例項進行資料儲存。在某些地方,應用程式還在寫端使用Azure table,在讀端使用Azure blob來儲存資料。應用程式使用Azure服務匯流排來提供其訊息傳遞基礎設施。下圖展示了這個高階體系結構。

在研究和測試解決方案時,可以在本地執行它,可以使用Azure compute emulator,也可以直接執行MVC web應用程式,並執行承載訊息處理程式和領域域物件的控制檯應用程式。在本地執行應用程式時,可以使用本地SQL Server Express資料庫,並使用一個在SQL Server Express資料庫實現的簡單的訊息傳遞基礎設施。

有關執行應用程式的選項的更多資訊,請參見附錄1“釋出說明”。

增加彈性

在旅程的這個階段,團隊研究了加強RegistrationProcessManager類的方法。該類負責管理訂單和註冊上下文中的聚合之間的互動,並確保它們彼此一致。如果要將限界上下文作為一個整體來維護其一致狀態,那麼流程管理器必須能夠適應各種各樣的故障條件。

通常,流程管理器接收傳入的事件,然後在限界上下文內部基於流程管理器的狀態發出一個或多個命令到聚合。當流程管理器發出命令時,它通常會更改自己的狀態。

訂單和註冊限界上下文包含RegistrationProcessManager類。此流程管理器在此限界上下文中和支付限界上下文中負責通過路由事件和命令協調聚合的活動。因此,流程管理器負責確保這些限界上下文中的聚合正確地彼此同步。

Gary(CQRS專家)發言:

一個聚合決定了寫模型中的一致性邊界,這個邊界和系統持久儲存資料的一致性相關。流程管理器管理不同聚合(可能在不同的限界上下文中)之間的關係,並確保聚合最終彼此一致。

註冊過程的失敗可能對系統產生不利後果:聚合可能彼此不同步,這可能導致系統中出現不可預測的行為,或者一些程序可能最終成為殭屍程序,繼續執行並使用資源,但永遠不會完成。團隊確定了以下與RegistrationProcessManager流程管理器相關的特定故障場景。流程管理器也許會:

  • 崩潰或者無法在傳送任何命令之前和接收事件之後持久化其狀態。這樣訊息處理器可能無法將事件標記為完成,因此在超時之後,將事件放回Topic訂閱並重新處理。
  • 在傳送任何命令之前和持久化其狀態之後崩潰。這將使系統處於不一致的狀態,因為流程管理器儲存了其新的狀態,但沒有傳送預期的命令。原始事件被放回Topic訂閱並重新處理。
  • 未能標記事件已被處理。流程管理器將第二次處理該事件,因此在超時之後,系統將把該事件重新放到服務匯流排Topic訂閱中。
  • 在等待它所期望的特定事件時超時。流程管理器無法繼續處理並達到預期的最終狀態。
  • 接收到一個流程管理器處於特定狀態時不期望接收的事件。這可能表明在其他地方存在問題,這意味著流程管理器繼續工作是不安全的。

這些設想可歸納為兩個具體的問題:

  • RegistrationProcessManager成功地處理了一個事件,但是沒有將其標記為完成。在事件自動返回到Azure服務匯流排Topic訂閱之後,RegistrationProcessManager將再次處理該事件。
  • RegistrationProcessManager成功地處理一個事件,並將其標記為完成,但隨後未能傳送命令。

使系統能彈性的重新處理事件

如果流程管理器本身的行為是冪等的,那麼如果它第二次接收並處理一個事件,則不會導致系統中的不一致。使流程管理器的行為具有冪等性,可以避免前三種故障條件中固有的問題。崩潰之後,您可以簡單地重新啟動流程管理器,並第二次重新處理傳入的事件。

您可以讓流程管理器傳送的所有命令都是冪等的,來替代讓流程管理器冪等。重新啟動流程管理器可能會導致第二次傳送命令,但如果這些命令是冪等的,則不會對流程或系統產生不利影響。要使此方法生效,您仍然需要修改流程管理器,以確保它至少傳送一次所有命令。如果命令是冪等的,那麼多次傳送它們並不重要,但是如果根本不傳送就很重要。

在V1版本中,大多數訊息處理要麼已經是冪等的,要麼系統檢測到重複的訊息並將它們傳送到dead-letter佇列。例外情況是OrderPlaced事件和SeatsReserved事件,因此團隊修改了系統V3版本處理這兩個事件的方式,以解決這個問題。

確保始終傳送命令

需要事務行為來確保當RegistrationProcessManager類儲存其狀態時,系統始終會發送命令。這要求團隊實現一個偽事務,因為將Azure服務匯流排和SQL資料庫表一起放到分散式事務中既不可取也不可行。

團隊為V3版本所採用的解決方案是確保系統持久儲存RegistrationProcessManager生成的所有命令,同時持久儲存RegistrationProcessManager例項的狀態。然後,系統嘗試傳送命令,並在成功傳送之後將它們從儲存中刪除。每當從儲存中載入RegistrationProcessManager例項時,系統還檢查未傳送的訊息。

效能優化

在這個階段,我們使用Visual Studio執行效能和壓力測試,以分析響應時間並確定瓶頸。團隊使用Visual Studio Load Test來模擬訪問應用程式的不同使用者數量,並在程式碼中添加了額外的跟蹤,以記錄時間資訊,以便進行詳細分析。團隊在Azure中建立了效能測試環境,在Azure VM角色例項中執行測試控制器和測試代理。這使我們能夠通過使用測試代理模擬不同數量的虛擬使用者來測試Contoso會議管理系統在不同負載下的執行情況。

作為這項工作的結果,團隊對系統進行了許多更改,以優化其效能。

Gary(CQRS專家)發言:

儘管在旅程中,團隊在專案結束時進行了效能測試和優化工作,但通常在你想做的時候就做這個工作是有意義的,這可以解決可伸縮性問題並儘快加固程式碼。如果您正在構建自己的基礎設施,並且需要能夠處理高吞吐量,則尤其如此。

Markus(軟體開發人員)發言:

因為實現CQRS模式會導致對組成系統的許多不同部分的職責進行非常清晰的分離,所以我們發現新增優化和加強相對容易,因為許多必要的更改在系統中都非常容易定位。

優化前的UI流程

當註冊者建立一個訂單時,她將訪問UI中的以下頁面序列。

  1. 註冊頁面。該頁面根據最終一致的讀模型顯示會議的門票型別和當前可用的座位數量。註冊者選擇她想購買的每種座位型別的數量。
  2. 付款的頁面。此頁面顯示訂單摘要,其中包括一個總價和一個倒計時計時器,它告訴註冊者座位將保留多久。註冊者輸入她的詳細資訊和首選的付款方式。
  3. 支付頁面。這裡模擬了一個第三方支付處理器。
  4. 註冊成功頁面。這將顯示支付是否成功。它向註冊者顯示一個訂單定位器程式碼,並連結到另一個頁面,該頁面允許註冊者為參會者分配座位。

有關UI中的螢幕和流程的更多資訊,請參閱第5章“準備釋出V1版本”中的“基於任務的UI”一節。

在V2版本中,系統必須在註冊頁面付款頁面之間處理以下命令和事件:

  • RegisterToConference
  • OrderPlaced
  • MakeSeatReservation
  • SeatsReserved
  • MarkSeatsAsReserved
  • OrderReservationCompleted
  • OrderTotalsCalculated

此外,MVC控制器在傳送初始RegisterToConference命令之前通過查詢讀模型來填充訂單,從而驗證是否有足夠的座位可用。

當團隊使用Visual Studio Load Test和不同的使用者負載模式來對應用程式做負載測試時,我們注意到高負載常常發生在UI等待領域完成其處理時和讀模型接收寫模型資料時。這樣無法顯示下一個頁面。特別是,隨著V2版本部署到中型的web和工作角色例項後,我們發現:

  • 對於每秒少於5個訂單的恆定負載模式,所有訂單都在5秒的視窗內處理。
  • 對於每秒8到10個訂單之間的恆定負載模式,許多訂單不能在5秒的視窗內處理。
  • 對於每秒8到10個訂單之間的恆定負載模式,角色例項使用得不夠理想(例如CPU使用率很低)。

備註:從UI在服務總線上傳送初始命令到讀模型中出現定價訂單,從而使UI能夠向用戶顯示下一個螢幕。5秒是我們希望看到的最大等待時間。

為了解決這個問題,團隊確定了兩個優化目標:UI和領域之間的互動,以及基礎設施。我們決定首先處理UI和領域之間的互動。當這不能充分提高效能時,我們還進行了基礎設施優化。

優化UI

團隊與領域專家討論了在UI向領域傳送RegisterToConference命令之前,是否總是需要驗證座位可用性。

Gary(CQRS專家)發言:

這個場景說明了與最終一致性相關的一些實際問題。讀端(在本例中是定價訂單檢視模型)最終與寫端保持一致。通常,當您實現CQRS模式時,您應該能夠接受最終的一致性,而不需要在UI中等待更改傳播到讀取端。然而,在這種情況下,UI必須等待寫模型傳播到與特定順序相關的讀端資訊。這可能表明原系統這一部分的分析和設計存在問題。

領域專家明確表示,系統應該在接受付款之前確認座位是否可用。Contoso不希望出售座位之後向註冊人解釋,這些座位是不可用的。因此,該團隊尋找了簡化流程的方法,直到註冊者看到付款螢幕為止。

Beth(業務經理)發言:

這種謹慎的策略並不適用於所有情況。在某些情況下,即使不能立即完成訂單,企業也可能寧願接受這筆錢。企業可能知道庫存很快就會補充,或者客戶很樂意等待。在我們的場景中,儘管Contoso可以在沒有票的情況下將錢退還給註冊者,註冊者也許仍然會購買機票,因為他以為系統已經確認過,這筆錢是沒法退還的。所以這很明顯是一個業務和領域專家要做的決策。

團隊確定了對UI流的以下兩個優化。

UI優化1

大多數情況下,會議有足夠的座位,註冊者不必相互爭奪來預訂座位。隨著大會的門票接近售罄,只有很短的一段時間內,報名者才會爭奪最後幾個座位。

如果會議有足夠的可用座位,那麼註冊者到達付款介面卻發現系統無法預訂座位的風險就很小。在這種情況下,V2版本里,在到達付款頁面之前執行的一些處理可以在付款頁面上當使用者輸入資訊的時候非同步發生,這樣就減少了註冊者在看到付款頁面前經歷延遲的機會。

Jana(軟體架構師)發言:

從本質上講,我們所依賴的事實是,預訂會成功,所以避免了耗時的檢查。但我們仍然要在註冊人付款之前執行檢查,以確保座位是可用的。

但是,如果控制器在傳送RegisterToConference命令之前就檢查並發現沒有足夠的座位來完成訂單,則可以重新顯示註冊螢幕,使註冊者能夠根據當前可用性更新其訂單。

Jana(軟體架構師)發言:

對這一戰略的一個可能改進是,在傳送RegisterToConference命令之前,看看是否可能有足夠的座位可用。這可以減少註冊者在最後幾個座位售罄時調整訂單的次數。然而,這種場景發生的頻率很低,可能不值得實現。

UI優化2

在V2版本中,MVC控制器不顯示付款頁面,直到領域釋出OrderTotalsCalculated事件,並且系統更新了price-order檢視模型。此事件是控制器顯示螢幕之前發生的最後一個事件。

如果系統更早地計算總數並更新價格訂單檢視模型,控制器就可以更早地顯示付款頁面。團隊確定,訂單聚合可以在訂單下單時計算總數,而不是在預訂完成時計算總數。這將使UI流比V2版本更快的走到付款頁面。

優化基礎設施

“每天都有一些新的事實浮現,一些新的障礙那些威脅著我們的最嚴重的障礙。我想這就是為什麼這款遊戲如此值得一玩的原因。” 羅伯特·弗爾肯·斯科特

團隊在旅程的這個階段新增的第二組優化與系統的基礎設施相關。這些更改同時處理了系統的效能和可伸縮性。下面的部分描述了我們在這裡所做的最重要的更改。

非同步傳送和接收命令和事件

作為優化過程的一部分,團隊更新了系統,以確保在服務總線上傳送的所有訊息都是非同步傳送的。這種優化旨在提高應用程式的總體響應能力,並提高訊息的吞吐量。作為此更改的一部分,團隊還使用了Transient Fault Handling Application Block來處理使用服務匯流排時遇到的任何瞬時錯誤。

Markus(軟體開發人員)發言:

這種優化導致了對基礎設施程式碼的重大更改。將非同步呼叫與Transient Fault Handling Application Block相結合是複雜的,我們將受益於c# 4.5中的一些新的簡化語法!

Jana(軟體架構師)發言:

有關在使用Azure服務匯流排時幫助優化效能的其他經過驗證的實踐,請參閱本指南:Best Practices for Performance Improvements Using Service Bus Brokered Messaging

優化命令處理

V2版本對命令和事件使用相同的訊息傳遞基礎設施——Azure服務匯流排。團隊評估了Contoso會議管理系統是否需要使用相同的基礎設施傳送所有命令訊息。

在決定是否繼續使用Azure服務匯流排傳輸所有命令訊息時,我們考慮了許多因素。

  • 哪些命令(如果有的話)可以在程序中處理?
  • 如果處理一些程序中的命令,系統的彈性會降低嗎?
  • 如果在程序中處理一些命令,會有顯著的效能提升嗎?

我們確定了一組命令,系統可以從會議web應用程式在程序中同步地傳送這些命令。為了實現這種優化,我們必須向會議web應用程式新增一些基礎設施元素(事件儲存庫、事件匯流排和事件釋出者)。以前,這些基礎設施元素只在系統的工作角色中。

非同步命令是不存在的,它實際上是另一個事件。如果我必須接受一個你發給我的訊息並且如果我不同意必須發出一個事件。那這就不是你要我做什麼,而是你告訴我什麼已經做完了。乍一看,這似乎只有一點點不同,但它有很多含義。

Why do lots of developers use one-way command messaging (async handling) when it's not needed? Greg Young - DDD/CQRS Group

對事件源使用快照

效能測試還發現了使用可用座位(SeatsAvailability)聚合的瓶頸,我們使用快照的形式解決了這個瓶頸。

Jana(軟體架構師)發言:

一旦團隊確定了這個瓶頸,就很容易實現和測試這個解決方案。我們在實現CQRS模式時所遵循的方法的優點之一是:我們可以在系統中進行小的區域性更改。更新不需要我們去跨系統的多個部分進行復雜的更改。

當系統從事件儲存中重新還原(rehydrates)聚合例項時,它必須載入並重播與該聚合例項關聯的所有事件。這裡可能的優化是儲存聚合狀態在最近某個時間點的滾動快照,以便系統只需要載入快照和後續事件,從而減少必須重新載入和重播的事件數量。在Contoso會議管理系統中,隨著時間的推移,唯一可能會累積大量事件的聚合是可用座位(SeatsAvailability)聚合。我們決定使用Memento模式作為快照解決方案的基礎,以便與可用座位(SeatsAvailability)聚合一起使用。我們實現的解決方案使用一個memento來捕獲座位可用性聚合的狀態,然後在快取中儲存一個memento的副本。然後,系統嘗試處理快取的資料,而不是總是從事件儲存中重新載入聚合。

Gary(CQRS專家)發言:

通常,在事件源上下文中,快照是持久化的,而不是我們在專案中實現的臨時本地快取。

並行釋出事件

就提高系統中事件訊息的吞吐量而言,並行釋出事件被證明是最重要的優化之一。為了得到最好的結果,團隊進行了多次迭代:

  • 迭代1:這種方法使用並行。使用Parallel.ForEach方法和自定義分割槽(把訊息分配到分割槽中),並設定並行度的上限。它還使用同步的Azure服務匯流排API呼叫來發布訊息。
  • 迭代2:這種方法使用了一些非同步API呼叫。它需要使用基於自定義訊號量的節流來正確處理非同步回撥。
  • 迭代3:這種方法使用動態節流,它考慮到順時故障,這些故障表明向特定Topic傳送了太多的訊息。這種方法使用非同步的Azure服務匯流排API呼叫。

Jana(軟體架構師)發言:

當系統從服務匯流排檢索訊息時,我們在SubscriptionReceiver和SessionSubscriptionReceiver類中採用了相同的動態節流方法。

在訂閱中過濾訊息

另一個優化是向Azure服務匯流排Topic訂閱新增過濾器,以避免讀取那些稍後將被與訂閱關聯的處理程式忽略的訊息。

Markus(軟體開發人員)發言:

這裡我們利用了Azure服務匯流排提供的特性。

為可用座位(SeatsAvailability)聚合建立專用接收器

這使可用座位(SeatsAvailability)聚合的接收者能夠使用支援會話的訂閱。這是為了確保每個聚合例項只有一個寫入者,因為可用座位(SeatsAvailability)聚合是一個高爭用的聚合。這阻止了我們在擴充套件時接收大量併發異常。

Jana(軟體架構師)發言:

在其他地方,我們使用帶有會話的訂閱來保證事件的順序。在本例中,我們使用會話是出於不同的原因——以確保每個聚合例項只有一個寫入者。

快取會議資訊

這個優化快取了會議web網站到處使用的幾個讀模型。它包含邏輯來決定如何基於特定會議的可用座位的數量來保持快取中的資料:如果有很多空位,系統可以快取資料很長一段時間,但是如果很少有空位就不快取資料。

劃分服務匯流排

團隊還對服務匯流排進行了劃分,以使應用程式更具可伸縮性,並避免在系統傳送的訊息量接近服務匯流排能夠處理的最大吞吐量時進行節流。每個服務匯流排Topic可以由Azure中的不同節點處理,因此通過使用多個Topic,我們可以增加潛在的吞吐量。我們考慮了以下分割槽方案:

  • 為不同的訊息型別使用不同的Topic。
  • 使用多個相似的Topic,並以迴圈方式監聽和讀取它們,以分散負載。

有關這些劃分方案的詳細討論,請參閱Martin L. Abbott和Michael T. Fisher所寫的《可伸縮性規則:Web站點伸縮的50個原則》(Addison-Wesley, 2011)中的第11章“非同步通訊和訊息匯流排”。

我們決定為訂單聚合和可用聚合釋出的事件使用單獨的Topic,因為這些聚合負責了通過服務匯流排流動的大多數事件。

Gary(CQRS專家)發言:

並不是所有的資訊都具有相同的重要性。您還可以使用訊息匯流排來處理單獨的、按優先順序排列的不同的訊息型別,甚至可以考慮不為某些訊息使用訊息匯流排。

Jana(軟體架構師)發言:

將服務匯流排與系統的任何其他關鍵元件一樣對待。這意味著您應該確保您的服務匯流排可以伸縮。此外,請記住,並非所有資料對您的業務都具有相同的價值。僅僅因為您有一個服務匯流排,並不意味著所有東西都必須經過它。明智的做法是消除低價值、高成本的流量。

其他的一些優化

團隊還執行了一些額外的優化,這些優化在下面的實現細節部分中列出。團隊在這一階段的主要目標是優化系統,以確保UI呈現對使用者有足夠好的響應。我們還可以執行其他優化,這將有助於進一步提高效能,並優化系統使用資源的方式。例如,團隊考慮的進一步優化是擴充套件檢視模型生成器,該生成器填充系統中的各種讀取模型。每個承載檢視模型生成器例項的web角色都必須通過建立對Azure服務匯流排主題的訂閱來處理寫端釋出的事件。

提高效能的進一步更改

除了在提高應用程式效能的旅程的最後階段所做的更改之外,團隊還確定了一些其他更改,這些更改將導致進一步的改進。但是,這個旅程的可用時間有限,所以不可能在V3版本中進行這些更改。

  • 我們嚮應用程式的許多地方添加了非同步行為,特別是在應用程式對Azure服務匯流排的呼叫中。然而,應用程式還有其他地方仍然執行阻塞呼叫。我們可以把那些同步呼叫改成非同步:例如,當系統訪問資料儲存時。此外,我們將使用新的語言特性,如async和await。
  • 通過採用儲存轉發設計,可以批量處理訊息,並減少往返於資料儲存的次數。例如,利用Azure服務匯流排會話將使我們能夠從服務匯流排接收一個會話,從資料儲存區讀取多個條目、處理多個訊息、一次儲存到資料儲存區,然後完成所有訊息。

Markus(軟體開發人員)發言:

通過接受一個服務匯流排會話,只要您保持鎖,就只有一個會話的寫入者和監聽者。這減少了樂觀併發異常。這種設計特別適合可用座位聚合的讀和寫模型。對於具有非常小分割槽的訂單聚合關聯的讀模型,您可以從服務匯流排獲取多個小會話,並在每個會話上使用儲存轉發方法。儘管系統中的讀和寫模型都可以從這種方法中受益,但是在我們期望資料最終是一致的、而不是完全一致的讀模型中實現起來更容易。

  • 該網站已經快取了一些經常訪問的讀模型資料,但是我們可以將快取的使用擴充套件到系統的其他區域。CQRS模式意味著我們可以將快取視為最終一致的讀模型的一部分,如果需要,還可以使用不同的快取或根本不使用快取來訪問來自系統不同部分的讀模型資料。
  • 我們可以改進可用座位(SeatsAvailability)聚合的快取快照實現。本章稍後將詳細描述當前實現,其目的是始終檢查事件儲存,以查詢在系統建立最新快取快照之後到達的事件。當我們接收到要處理的新命令時,如果我們可以檢查是否仍然使用與系統建立最新快取快照時相同的服務匯流排會話,那麼我們就可以知道事件儲存中是否還有其他事件。如果會話沒有更改,那麼我們就知道自己是惟一的寫入者,因此沒有必要檢查事件儲存。如果會話已經更改,那麼其他人可能已經將與聚合相關的事件寫入到儲存中,我們需要進行檢查。
  • 應用程式當前使用相同的優先順序監聽所有服務匯流排訂閱上的所有訊息。在實踐中,有些資訊比其他資訊更重要。因此,當應用程式處於壓力之下時,我們應該優先處理一些訊息,以最小化對核心應用程式功能的影響。例如,我們可以識別某些願意接受更多延遲的讀模型。

    Poe(IT運維人員)發言:

    我們還可以在負載增加時使用自動縮放來擴充套件應用程式(例如使用Autoscaling Application Block),但是新增新例項需要時間。通過確定某些訊息型別的優先順序,我們可以在自動縮放新增資源的同時,繼續在應用程式的關鍵領域提供效能。

  • 當前實現使用隨機生成的Guid作為儲存在SQL資料庫例項中的所有實體的鍵。當系統處於高負載下時,如果使用順序Guid,特別是與聚集索引相關的Guid,它的效能可能會更好。有關順序Guid的討論,請參見The Cost of GUIDs as Primary Keys。
  • 作為系統優化的一部分,我們現在在程序中處理一些命令,而不是通過服務匯流排傳送它們。我們可以將此擴充套件到其他命令,並可能擴充套件到流程管理器。
  • 在當前實現中,流程管理器處理傳入訊息,然後儲存庫嘗試同步傳送傳出訊息(如果服務匯流排由於節流行為引發任何異常,則使用 Transient Fault Handling Application Block重試傳送命令)。我們可以替代使用一種類似於EventStoreBusPublisher類的機制以讓流程管理器儲存一個訊息列表,這些訊息必須在一個事務裡連同它的狀態一起傳送,然後通知系統的另一部分,這個部分的職責是當有一些新訊息準備好要傳送的時候負責來發送訊息。

    Markus(軟體開發人員)發言:

    負責傳送訊息的系統部分可以非同步傳送訊息。它還可以為傳送訊息實現動態節流,並動態控制要使用多少個並行傳送器。

  • 我們當前的事件儲存實現是:為儲存在事件儲存裡的每一個事件釋出一個單獨的,小的訊息到訊息總線上。我們可以將其中一些訊息組合在一起,以減少服務總線上的I/O操作總數。例如,大型會議的可用座位(SeatsAvailability)聚合例項釋出大量事件,訂單(Order)聚合以突發方式釋出事件(當建立訂單(Order)聚合時,它同時釋出OrderPlaced事件和OrderTotalsCalculated事件)。這還將有助於減少系統中的延遲,因為目前,在那些順序很重要的場景中,我們必須在傳送下一個事件之前等待一個事件已被髮送的確認。將事件序列分組到一條訊息中意味著我們不需要在釋出單個事件之間等待確認。

增強可伸縮性的進一步更改

Contoso會議管理系統允許您部署web和工作者角色的多個例項,從而擴充套件應用程式以處理更大的負載。然而,該設計並不是完全可伸縮的,因為系統的其他一些元素,例如訊息匯流排和資料儲存對最大可實現的吞吐量有限制。本節概述了我們可以對系統進行的一些更改,以刪除其中的一些約束,並顯著提高系統的可伸縮性。這次旅程的可用時間有限,所以沒能在V3版本中進行這些更改。

  • 資料分割槽:系統在不同的分割槽中儲存不同型別的資料。在啟動程式碼中,您可以看到不同的限界上下文如何使用不同的連線字串連線到SQL資料庫例項。但是,每個限界上下文目前使用一個SQL資料庫例項,我們可以將其更改為使用多個不同的例項,每個例項都包含系統使用的特定資料集。例如,訂單和註冊限界上下文可以為不同的讀取模型使用不同的SQL資料庫例項。我們還可以考慮使用federations特性來使用分片擴充套件一些SQL資料庫例項。

    “資料永續性是大多數可伸縮SaaS企業面臨的最困難的技術問題。”
    -Evan Cooke, CTO, Twilio,Scaling High-Availability Infrastructure in the Cloud

    Jana(軟體架構師)發言:

    在系統將資料儲存在Azure表儲存中的地方,我們選擇用鍵對資料進行分割槽以實現可伸縮性。作為使用SQL資料庫federations對資料進行切分的替代方法,我們可以將SQL資料庫例項中當前的一些讀模型資料移動到Azure表儲存或blob儲存中。

  • 進一步劃分服務匯流排:通過為不同的事件釋出者使用不同的Topic,我們已經對服務匯流排進行了劃分,以避免在系統傳送的訊息量接近服務匯流排能夠處理的最大吞吐量時進行節流。我們可以使用多個相似的Topic來進一步劃分主題,並通過迴圈監聽它們來分擔負載。有關此方法的詳細描述,請參見Abbott和Fisher在Scalability Rules: 50 Principles for Scaling Web Sites, (Addison-Wesley, 2011)中的第11章"Asynchronous Communication and Message Buses"
  • 儲存和轉發:我們在前面關於效能改進的小節中介紹了儲存和轉發設計。通過批處理多個操作,您不僅減少了到資料儲存的往返次數,並減少了系統中的延遲,還增強了系統的可伸縮性,因為發出更少的請求可以減少對資料儲存的壓力。
  • 監聽節流指示器並對其作出反應:目前,系統使用Transient Fault Handling Application Block來檢測瞬時錯誤條件,比如從Azure服務匯流排、SQL資料庫例項和Azure表儲存中檢測節流指示器。系統使用Block在這些場景中實現重試,通常使用指數回退策略。目前,我們在單個訂閱級別使用動態節流,但是,我們希望修改它來對特定主題的所有訂閱執行動態節流。類似地,我們希望在SQL資料庫例項級和Azure儲存帳戶級實現動態節流。

    Jana(軟體架構師)發言:

    在應用程式裡實現動態節流的一個例子是從服務阻止節流,看EventStoreBusPublisher SubscriptionReceiver, SessionSubscriptionReceiver類是怎樣使用DynamicThrottling類來管理他們所使用的並行程度來發送或接收訊息的。

    Poe(IT運維人員)發言:

    每一個服務(Azure服務匯流排, SQL資料庫,Azure storage)都有自己獨特的方式來實現節流行為,並在負載過重時通知您。例如,請參見SQL Azure Throttling。重要的是要了解應用程式使用的不同服務可能會對您的應用程式造成的所有節流。

    Poe(IT運維人員)發言:

    團隊還考慮使用Azure SQL資料庫商業版來取代Azure SQL資料庫Web版,但經過調查,我們確定目前版本之間的唯一區別是最大資料庫大小。不同版本沒有進行調優以支援不同型別的工作負載,而且兩個版本實現了相同的節流行為。

有關可伸縮性的其他資訊,請參閱:

  • Microsoft Azure Storage Abstractions and their Scalability Targets
  • Best Practices for Performance Improvements Using Service Bus Brokered Messaging

在談到可伸縮性和高可用性時,重要的是不要抱有錯誤的樂觀態度。儘管使用許多建議的實踐,應用程式往往可以更有效地伸縮,並且對失敗更有彈性,但它們仍然容易出現高需求瓶頸。確保為效能測試和實現效能目標分配足夠的時間。

不停機遷移

“我常說,任何冒險工作的三分之二都是做準備” Amelia Earhart

團隊計劃在Azure中進行從V2到V3版本的無停機遷移。為了實現這一點,遷移過程使用一個執行在Azure工作者角色中的特殊處理器來執行一些遷移步驟。

遷移過程仍然需要您完成一個配置步驟來關閉V2處理器並開啟V3處理器。回想起來,我們應該使用一種不同的機制來簡化從V2到V3處理器的轉換,該轉換基於處理程式本身的反饋,以指示它們何時完成了處理。

有關這些步驟的詳細資訊,請參見附錄1“釋出說明”。

Poe(IT運維人員)發言:

在生產環境中執行遷移之前,應該始終在測試環境中演練遷移。

重建讀模型

在從V2遷移到V3期間,我們必須執行的步驟之一是通過重播事件日誌中的事件來重新構建DraftOrder和PricedOrder檢視模型,以填充新的V3讀模型表。我們可以非同步執行此操作。然而,在某個時候,我們需要開始將事件從活動的應用程式傳送到這些讀模型。此外,我們需要保持這些讀模型的V2和V3版本都是最新的,直到遷移過程完成,因為V2前端web角色需要V2的讀取模型資料可用,直到切換到V3前端web角色。在切換到V3前端時,我們必須確保V3讀取的模型完全是最新的。

為了使這些讀取模型保持最新,我們建立了一個作為Azure工作者角色的臨時處理器,它在遷移過程中執行。有關更多細節,請參閱會Conference解決方案中的MigrationToV3專案。該處理器執行的步驟是:

  • 建立一組新的Topic訂閱,這些訂閱將接收活動事件,這些活動事件將用於填充新的V3讀模型。這些訂閱將開始累積V3應用程式部署時將處理的事件。
  • 重播事件日誌中的事件,用歷史資料填充新的V3讀取模型。
  • 處理活動事件並使V2的讀模型保持最新,直到V3前端是活動的,此時我們不再需要V2的讀模型。

遷移過程首先從事件儲存中重播事件,以填充新的V3讀模型。當這一切完成時,我們停止包含事件處理程式的V2處理器,並在V3處理器中啟動新的處理程式。當它們執行並跟蹤新Topic訂閱中積累的事件時,ad-hoc處理器還使V2的讀模型保持最新,因為此時我們仍然擁有V2前端。當V3工作者角色準備好時,我們可以執行一個VIP切換來使用新的V3前端。在V3前端執行之後,我們不再需要V2讀模型。

使用這種方法要解決的問題之一是,如何確定新的V3處理器應該在什麼時候從處理事件日誌中的存檔事件切換到處理實時的事件流。在將事件寫入事件日誌的過程中存在一些延遲,因此瞬時切換可能導致一些事件的丟失。團隊決定允許V3處理器暫時可以同時處理存檔事件和實時事件,這意味著可能會有重複的事件,相同的事件存在於事件儲存區和由新訂閱累積的事件列表中。但是,我們可以檢測這些副本並相應地處理它們。

Markus(軟體開發人員)發言:

通常,我們依賴於基礎設施來檢測重複的訊息。在這個重複事件可能來自不同來源的特定場景中,我們不能依賴於基礎設施,必須顯式地將重複檢測邏輯新增到程式碼中。

我們考慮的另一種方法是在V3處理器中同時包含V2和V3處理。使用這種方法,在遷移期間不需要一個特別的工作人員角色來處理V2事件。但是,我們決定將特定於遷移的程式碼儲存在一個單獨的專案中,以避免V3發行版由於包含只在遷移期間需要的功能而膨脹。

Jana(軟體架構師)發言:

如果我們在V3處理器中同時包含V2和V3處理,遷移過程會稍微容易一些。但我們認為,這種方法的好處被不必在V3處理器中維護重複功能的好處所抵消。

遷移的每個步驟之間的間隔需要一些時間來完成,因此遷移不會導致停機,但是使用者確實會遇到延遲。我們可以從處理切換開關的一些更快的機制中獲益,比如停止V2處理器並啟動V3處理器。

實現細節

本節描述訂單和註冊限界上下文的實現的一些重要功能。您可能會發現擁有一份程式碼拷貝很有用,這樣您就可以繼續學習了。您可以從Download center下載一個副本,或者在GitHub上檢視儲存庫:https://github.com/mspnp/cqrs-journey-code。您可以從GitHub上的Tags頁面下載V3版本的程式碼。

備註:不要期望程式碼示例與參考實現中的程式碼完全匹配。本章描述了CQRS過程中的一個步驟,隨著我們瞭解更多並重構程式碼,實現可能會發生變化。

增強RegistrationProcessManager類

本節描述了團隊如何通過檢查SeatsReserved和OrderPlaced訊息的重複例項來強化RegistrationProcessManager流程管理器。

檢測無序的SeatsReserved事件

通常,RegistrationProcessManager類向SeatAvailability聚合傳送一個MakeSeatReservation命令,SeatAvailability聚合在進行預訂時釋出一個SeatsReserved事件,RegistrationProcessManager接收此通知。RegistrationProcessManager在建立訂單和更新訂單時都發送一條MakeSeatReservation命令。SeatsReserve事件到達的時候可能不是按順序的,但是,系統應該尊重與最後傳送的命令相關的事件。本節描述的解決方案使RegistrationProcessManager能夠識別最新的SeatsReserved訊息,然後忽略任何較早的訊息,而不是重新處理它們。

在RegistrationProcessManager類傳送MakeSeatReservation命令之前,它將該命令的Id儲存在SeatReservationCommandId變數中,如下面的程式碼示例所示:

public void Handle(OrderPlaced message)
{
    if (this.State == ProcessState.NotStarted)
    {
        this.ConferenceId = message.ConferenceId;
        this.OrderId = message.SourceId;
        // Use the order id as an opaque reservation id for the seat reservation. 
        // It could be anything else, as long as it is deterministic from the    
        // OrderPlaced event.
        this.ReservationId = message.SourceId;
        this.ReservationAutoExpiration = message.ReservationAutoExpiration;
        var expirationWindow = 
            message.ReservationAutoExpiration.Subtract(DateTime.UtcNow);

        if (expirationWindow > TimeSpan.Zero)
        {
            this.State = ProcessState.AwaitingReservationConfirmation;
            var seatReservationCommand =
                new MakeSeatReservation
                {
                    ConferenceId = this.ConferenceId,
                    ReservationId = this.ReservationId,
                    Seats = message.Seats.ToList()
                };
            this.SeatReservationCommandId = seatReservationCommand.Id;

            this.AddCommand(new Envelope<ICommand>(seatReservationCommand)
            {
                TimeToLive = expirationWindow.Add(TimeSpan.FromMinutes(1)),
            });

            ...
}

然後,當它處理SeatsReserved事件時,它檢查該事件的CorrelationId屬性是否匹配SeatReservationCommandId變數的最新值,如下面的程式碼示例所示:

public void Handle(Envelope<SeatsReserved> envelope)
{
    if (this.State == ProcessState.AwaitingReservationConfirmation)
    {
        if (envelope.CorrelationId != null)
        {
            if (string.CompareOrdinal(this.SeatReservationCommandId.ToString(), envelope.CorrelationId) != 0)
            {
                // Skip this event.
                Trace.TraceWarning("Seat reservation response for reservation id {0} does not match the expected correlation id.", envelope.Body.ReservationId);
                return;
            }
        }

        ...
}

注意這個Handle方法如何處理Envelope例項而不是SeatsReserved例項。作為V3版本的一部分,事件被封裝在一個包含CorrelationId屬性的Envelope例項中。EventDispatcher中的DoDispatchMessage方法分配關聯Id的值。

Markus(軟體開發人員)發言:

作為新增此功能的副作用,EventProcessor類在將事件轉發給處理程式時,不能再使用dynamic關鍵字。現在在V3中,它使用了新的EventDispatcher類,該類使用反射來標識給定訊息型別的正確處理程式。

在效能測試期間,團隊發現了這個特定的SeatsReserved事件的另一個問題。由於系統在載入時其他地方出現了延遲,因此第二份SeatsReserved事件被髮布了。然後,這個Handle方法丟擲一個異常,導致系統在將訊息傳送到dead-letter佇列之前多次重試處理該訊息。為了解決這個特定的問題,團隊修改了這個方法,添加了else if子句,如下面的程式碼示例所示:

public void Handle(Envelope<SeatsReserved> envelope)
{
    if (this.State == ProcessState.AwaitingReservationConfirmation)
    {
        ...
    }
    else if (string.CompareOrdinal(this.SeatReservationCommandId.ToString(), envelope.CorrelationId) == 0)
    {
        Trace.TraceInformation("Seat reservation response for request {1} for reservation id {0} was already handled. Skipping event.", envelope.Body.ReservationId, envelope.CorrelationId);
    }
    else
    {
        throw new InvalidOperationException("Cannot handle seat reservation at this stage.");
    }
}

Markus(軟體開發人員)發言:

此優化僅應用於此特定訊息。注意,它使用了之前儲存在例項中的SeatReservationCommandId屬性的值。如果希望對其他訊息執行這種檢查,則需要在流程管理器中儲存更多資訊。

檢測重複的OrderPlaced事件

為了檢測重複的OrderPlaced事件,RegistrationProcessManagerRouter類現在執行一個檢查,以檢視事件是否已經被處理。V3版本的新程式碼如下面的程式碼示例所示:

public void Handle(OrderPlaced @event)
{
    using (var context = this.contextFactory.Invoke())
    {
        var pm = context.Find(x => x.OrderId == @event.SourceId);
        if (pm == null)
        {
            pm = new RegistrationProcessManager();
        }

        pm.Handle(@event);
        context.Save(pm);
    }
}

當RegistrationProcessManager類儲存狀態併發送命令時建立偽事務

Azure中不可能有包含將RegistrationProcessManager持久化到儲存裡併發送命令的事務。因此,團隊決定儲存流程管理器生成的所有命令,以便在流程崩潰時不會丟失這些命令,它們可以稍後傳送。我們使用另一個程序來可靠地處理髮送命令。

Markus(軟體開發人員)發言:

已經遷移到V3版本的遷移實用程式更新了資料庫模式,以適應新的儲存需求。

下面來自SqlProcessDataContext類的程式碼示例顯示了系統如何持久化所有命令以及程序管理器的狀態:

public void Save(T process)
{
    var entry = this.context.Entry(process);

    if (entry.State == System.Data.EntityState.Detached)
        this.context.Set<T>().Add(process);

    var commands = process.Commands.ToList();
    UndispatchedMessages undispatched = null;
    if (commands.Count > 0)
    {
        // If there are pending commands to send, we store them as undispatched.
        undispatched = new UndispatchedMessages(process.Id)
                            {
                                Commands = this.serializer.Serialize(commands)
                            };
        this.context.Set<UndispatchedMessages>().Add(undispatched);
    }

    try
    {
        this.context.SaveChanges();
    }
    catch (DbUpdateConcurrencyException e)
    {
        throw new ConcurrencyException(e.Message, e);
    }

    this.DispatchMessages(undispatched, commands);
}

下面來自SqlProcessDataContext類的程式碼示例展示了系統如何傳送命令訊息:

private void DispatchMessages(UndispatchedMessages undispatched, List<Envelope<ICommand>> deserializedCommands = null)
{
if (undispatched != null)
{
if (deserializedCommands == null)
{
deserializedCommands = this.serializer.Deserialize<IEnumerable<Envelope<ICommand>>>(undispatched.Commands).ToList();
}

var originalCommandsCount = deserializedCommands.Count;
try
{
while (deserializedCommands.Count > 0)
{
this.commandBus.Send(deserializedCommands.First());
deserializedCommands.RemoveAt(0);
}
}
catch (Exception)
{
// We catch a generic exception as we don't know what implementation of ICommandBus we might be using.
if (originalCommandsCount != deserializedCommands.Count)
{
// If we were able to send some commands, then update the undispatched messages.
undispatched.Commands = this.serializer.Serialize(deserializedCommands);
try
{
this.context.SaveChanges();
}
catch (DbUpdateConcurrencyException)
{
// If another thread already dispatched the messages, ignore and surface original exception instead.
}
}

throw;
}

// We remove all the undispatched messages for this process manager.
this.context.Set<UndispatchedMessages>().Remove(undispatched);
this.retryPolicy.ExecuteAction(() => this.context.SaveChanges());
}
}

DispatchMessages方法還從SqlProcessDataContext類中的Find方法呼叫,以便當系統重新還原(rehydrates)RegistrationProcessManager例項時,它會嘗試傳送任何未傳送的訊息。

優化UI流程

第一個優化是允許UI直接導航到註冊者頁面,前提是會議還有很多座位可用。RegistrationController類的StartRegistration方法介紹了這個變化,它現在會在建立預定併發送RegisterToConference命令之前執行一個額外的檢查,確認有足夠的剩餘座位,如下面的程式碼示例所示:

[HttpPost]
public ActionResult StartRegistration(RegisterToConference command, int orderVersion)
{
    var existingOrder = orderVersion != 0 ? this.orderDao.FindDraftOrder(command.OrderId) : null;
    var viewModel = existingOrder == null ? this.CreateViewModel() : this.CreateViewModel(existingOrder);
    viewModel.OrderId = command.OrderId;

    if (!ModelState.IsValid)
    {
        return View(viewModel);
    }

    // Checks that there are still enough available seats, and the seat type IDs submitted are valid.
    ModelState.Clear();
    bool needsExtraValidation = false;
    foreach (var seat in command.Seats)
    {
        var modelItem = viewModel.Items.FirstOrDefault(x => x.SeatType.Id == seat.SeatType);
        if (modelItem != null)
        {
            if (seat.Quantity > modelItem.MaxSelectionQuantity)
            {
                modelItem.PartiallyFulfilled = needsExtraValidation = true;
                modelItem.OrderItem.ReservedSeats = modelItem.MaxSelectionQuantity;
            }
        }
        else
        {
            // Seat type no longer exists for conference.
            needsExtraValidation = true;
        }
    }

    if (needsExtraValidation)
    {
        return View(viewModel);
    }

    command.ConferenceId = this.ConferenceAlias.Id;
    this.commandBus.Send(command);

    return RedirectToAction(
        "SpecifyRegistrantAndPaymentDetails",
        new { conferenceCode = this.ConferenceCode, orderId = command.OrderId, orderVersion = orderVersion });
}

如果沒有足夠的可用座位,控制器將重新顯示當前螢幕,顯示當前可用的座位數量,以便註冊者修改其訂單。

更改的其餘部分在RegistrationController類中的SpecifyRegistrantAndPaymentDetails方法中。下面來自V2版本的程式碼示例顯示,在優化之前,控制器在繼續跳轉到註冊頁面之前呼叫WaitUntilSeatsAreConfirmed方法:

[HttpGet]
[OutputCache(Duration = 0, NoStore = true)]
public ActionResult SpecifyRegistrantAndPaymentDetails(Guid orderId, int orderVersion)
{
    var order = this.WaitUntilSeatsAreConfirmed(orderId, orderVersion);
    if (order == null)
    {
        return View("ReservationUnknown");
    }

    if (order.State == DraftOrder.States.PartiallyReserved)
    {
        return this.RedirectToAction("StartRegistration", new { conferenceCode = this.ConferenceCode, orderId, orderVersion = order.OrderVersion });
    }

    if (order.State == DraftOrder.States.Confirmed)
    {
        return View("ShowCompletedOrder");
    }

    if (order.ReservationExpirationDate.HasValue && order.ReservationExpirationDate < DateTime.UtcNow)
    {
        return RedirectToAction("ShowExpiredOrder", new { conferenceCode = this.ConferenceAlias.Code, orderId = orderId });
    }

    var pricedOrder = this.WaitUntilOrderIsPriced(orderId, orderVersion);
    if (pricedOrder == null)
    {
        return View("ReservationUnknown");
    }

    this.ViewBag.ExpirationDateUTC = order.ReservationExpirationDate;

    return View(
        new RegistrationViewModel
        {
            RegistrantDetails = new AssignRegistrantDetails { OrderId = orderId },
            Order = pricedOrder
        });
}

下面的程式碼示例顯示了這個方法的V3版本,它不再等待預訂被確認:

[HttpGet]
[OutputCache(Duration = 0, NoStore = true)]
public ActionResult SpecifyRegistrantAndPaymentDetails(Guid orderId, int orderVersion)
{
    var pricedOrder = this.WaitUntilOrderIsPriced(orderId, orderVersion);
    if (pricedOrder == null)
    {
        return View("PricedOrderUnknown");
    }

    if (!pricedOrder.ReservationExpirationDate.HasValue)
    {
        return View("ShowCompletedOrder");
    }

    if (pricedOrder.ReservationExpirationDate < DateTime.UtcNow)
    {
        return RedirectToAction("ShowExpiredOrder", new { conferenceCode = this.ConferenceAlias.Code, orderId = orderId });
    }

    return View(
        new RegistrationViewModel
        {
            RegistrantDetails = new AssignRegistrantDetails { OrderId = orderId },
            Order = pricedOrder
        });
}
備註:我們將在稍後的旅程中使這個方法非同步。

UI流程的第二個優化是在流程的前面執行訂單總數的計算。在上面的程式碼示例中,SpecifyRegistrantAndPaymentDetails方法仍然呼叫WaitUntilOrderIsPriced方法,這將暫停介面流直到系統計算出訂單的總數並使其可用於控制器(在讀端儲存在priced-order檢視模型中)。

實現此功能的關鍵變更是在訂單(Order)聚合裡。Order類中的建構函式現在呼叫CalculateTotal方法並引發OrderTotalsCalculated事件,如下面的程式碼示例所示:

public Order(Guid id, Guid conferenceId, IEnumerable<OrderItem> items, IPricingService pricingService)
    : this(id)
{
    var all = ConvertItems(items);
    var totals = pricingService.CalculateTotal(conferenceId, all.AsReadOnly());

    this.Update(new OrderPlaced
    {
        ConferenceId = conferenceId,
        Seats = all,
        ReservationAutoExpiration = DateTime.UtcNow.Add(ReservationAutoExpiration),
        AccessCode = HandleGenerator.Generate(6)
    });
    this.Update(new OrderTotalsCalculated { Total = totals.Total, Lines = totals.Lines != null ? totals.Lines.ToArray() : null, IsFreeOfCharge = totals.Total == 0m });
}

之前,在V2版本中,訂單(Order)聚合一直等到收到MarkAsReserved命令才呼叫CalculateTotal方法。

非同步接收、完成和傳送訊息

本節概述了系統現在如何非同步地在Azure服務總線上執行所有I/O。

非同步接收訊息

SubscriptionReceiver和SessionSubscriptionReceiver類現在非同步接收訊息,而不是在ReceiveMessages方法的迴圈中同步接收訊息。

有關詳細資訊,請參閱SubscriptionReceiver類中的ReceiveMessages方法或SessionSubscriptionReceiver類中的ReceiveMessagesAndCloseSession方法。

Markus(軟體開發人員)發言:

此程式碼示例還展示瞭如何使用Transient Fault Handling Application Block來可靠地非同步接收來自服務匯流排Topic的訊息。非同步迴圈使程式碼更難以讀取,但效率更高。這是推薦的最佳實踐。這段程式碼將受益於c# 4中新的async關鍵字。

非同步完成訊息

系統使用peek/lock機制從服務匯流排Topic訂閱中檢索訊息。要了解系統如何非同步執行這些操作,請參閱SubscriptionReceiver和SessionSubscriptionReceiver類中的ReceiveMessages方法。這提供了一個系統如何使用非同步api的例子。

非同步傳送訊息

應用程式現在非同步傳送服務總線上的所有訊息。有關詳細資訊,請參見TopicSender類。

在程序中同步處理命令

在V2版本中,系統使用Azure服務匯流排將所有命令傳遞給它們的接收者。這意味著系統非同步地交付命令。在V3版本中,MVC控制器現在同步地在程序中傳送命令,以便通過繞過命令匯流排並將命令直接傳遞給處理程式來改進UI中的響應時間。此外,在ConferenceProcessor工作者角色中,傳送到訂單(Order)聚合的命令使用相同的機制在程序中同步傳送。

Markus(軟體開發人員)發言:

我們仍然非同步地向可用座位(SeatsAvailability)聚合傳送命令,因為隨著RegistrationProcessManager的多個例項並行執行,將會出現爭用,因為多個執行緒都試圖訪問可用座位(SeatsAvailability)聚合的同一個例項。

團隊實現這種行為通過新增SynchronousCommandBusDecorator和CommandDispatcher類到基礎設施並且在web角色啟動的時候註冊它們,如下面的程式碼展示了Global.asax.Azure.cs檔案裡的OnCreateContainer方法**:

var commandBus = new CommandBus(new TopicSender(settings.ServiceBus, "conference/commands"), metadata, serializer);
var synchronousCommandBus = new SynchronousCommandBusDecorator(commandBus);

container.RegisterInstance<ICommandBus>(synchronousCommandBus);
container.RegisterInstance<ICommandHandlerRegistry>(synchronousCommandBus);


container.RegisterType<ICommandHandler, OrderCommandHandler>("OrderCommandHandler");
container.RegisterType<ICommandHandler, ThirdPartyProcessorPaymentCommandHandler>("ThirdPartyProcessorPaymentCommandHandler");
container.RegisterType<ICommandHandler, SeatAssignmentsHandler>("SeatAssignmentsHandler");
備註:在Conference.Azure.cs檔案中也有類似的程式碼,用於配置工作角色,以便在程序中傳送一些命令。

下面的程式碼示例展示了SynchronousCommandBusDecorator類如何實現命令訊息的傳送:

public class SynchronousCommandBusDecorator : ICommandBus, ICommandHandlerRegistry
{
    private readonly ICommandBus commandBus;
    private readonly CommandDispatcher commandDispatcher;

    public SynchronousCommandBusDecorator(ICommandBus commandBus)
    {
        this.commandBus = commandBus;
        this.commandDispatcher = new CommandDispatcher();
    }

    ...

    public void Send(Envelope<ICommand> command)
    {
        if (!this.DoSend(command))
        {
            Trace.TraceInformation("Command with id {0} was not handled locally. Sending it through the bus.", command.Body.Id);
            this.commandBus.Send(command);
        }
    }

    ...

    private bool DoSend(Envelope<ICommand> command)
    {
        bool handled = false;

        try
        {
            var traceIdentifier = string.Format(CultureInfo.CurrentCulture, " (local handling of command with id {0})", command.Body.Id);
            handled = this.commandDispatcher.ProcessMessage(traceIdentifier, command.Body, command.MessageId, command.CorrelationId);

        }
        catch (Exception e)
        {
            Trace.TraceWarning("Exception handling command with id {0} synchronously: {1}", command.Body.Id, e.Message);
        }

        return handled;
    }
}

注意這個類是如何嘗試在不使用服務匯流排的情況下同步傳送命令,但是如果它找不到該命令的處理程式,它將返回到使用服務匯流排。下面的程式碼示例展示了CommandDispatcher類如何試圖定位處理程式並傳遞命令訊息:

public bool ProcessMessage(string traceIdentifier, ICommand payload, string messageId, string correlationId)
{
    var commandType = payload.GetType();
    ICommandHandler handler = null;

    if (this.handlers.TryGetValue(commandType, out handler))
    {
        Trace.WriteLine("-- Handled by " + handler.GetType().FullName + traceIdentifier);
        ((dynamic)handler).Handle((dynamic)payload);
        return true;
    }
    else
    {
        return false;
    }
}

使用memento模式實現快照

在Contoso會議管理系統中,唯一有事件源的聚合就是可用座位