1. 程式人生 > >eShopOnContainers 知多少[6]:持久化事件日誌

eShopOnContainers 知多少[6]:持久化事件日誌

1. 引言

事件匯流排解決了微服務間如何基於整合事件進行非同步通訊的問題。然而只有事件匯流排正常執行,微服務之間基於事件的通訊才得以運轉。
而現實情況是,總有這樣或那樣的問題,導致事件匯流排不穩定或不可用,比如:網路中斷,系統斷電等等,這都可能導致微服務間的不一致性問題。
那如何解決事件匯流排故障導致的不一致問題呢?

  1. 事件溯源
  2. 事件日誌挖掘
  3. 發件箱模式

    2. 問題

    既然上面提到了一致性問題,那具體的問題是什麼呢,在什麼情況才會發生呢?我想我有必要簡單舉例。上程式碼:
var oldPrice = item.Price;
item.Price = product.Price;
_context.CatalogItems.Update(item);
var @event = new ProductPriceChangedIntegrationEvent(item.Id, item.Price, oldPrice);
// Commit changes in original transaction
await _context.SaveChangesAsync();

// Publish integration event to the event bus
// (RabbitMQ or a service bus underneath)
_eventBus.Publish(@event);

當產品價格更改後,程式碼將資料提交給資料庫,然後釋出ProductPriceChangedIntegrationEvent 事件。
如果服務在資料庫更新後崩潰(奔潰發生在_context.SaveChangesAsync()程式碼執行之後,但又發生在整合事件成功釋出前),就會導致本地微服務價格已成功更新,但整合事件未釋出的問題。就會導致目錄微服務中定義的價格和顧客購物車中快取的價格不一致。

3. 分析問題

以上問題的關鍵在於是如何確保兩個獨立的操作的原子性。如果單從單體應用的角度來處理的話,我們完全是可以將他們放到同一個事務中去保證。然而在微服務中,就違背了其高可用的基本要求。因為一旦事件匯流排處於癱瘓狀態,那麼整個目錄微服務就不可用了。這種強制通過事務保證的一致性,就引入了太多的問題依賴。

如果從微服務的角度來看,每個微服務負責各自的業務邏輯,對於目錄微服務來說,它的關注點是產品的更新是否成功。至於藉助事件匯流排通過非同步事件實現微服務間的通訊,並不是其關注點。這也就是關注點分離。換句話說,產品的更新不應該依賴外部狀態。在這裡,外部狀態就是事件匯流排的可用性。

你可能會說了,既然不允許通過強事務保證一致性,那麼如何解決一致性問題呢(好像繞了半天又回到了原點)?

這裡就要引入強一致性和最終一致性的概念了。
強一致性:也就是事務一致性,將多個操作放到單一事務處理。要麼全部成功,要麼全部失敗。
事務一致性
最終一致性:通過將某些操作的執行延遲到稍後的時間來執行。若前面的操作執行成功,後續操作將延後執行。若前面的操作失敗,後續的操作就不會執行。


最終一致性

到這裡,我們實際要解決的問題就明確了:如何確保事件匯流排能夠正確進行事件轉發?

換句話說:事件匯流排掛了,但是事件訊息不能丟失。只要事件訊息不丟,後面我們還有機會挽救(重新發布訊息)。

如何保證事件訊息不丟失呢?當然是持久化了。

4. 持久化事件源

eShopOnContainers已經考慮了這一點,集成了事件日誌用於持久化。我們直接來看類圖:
事件日誌
從類圖中看其實現邏輯也很簡單,主要是定義了一個IntegrationEventLogEntry實體、EventStateEnum事件狀態列舉和IntegrationEventLogContextEF上下文用於事件日誌持久化。暴露IIntegrationEventLogService用於事件狀態的更新。

其他微服務通過在啟動類中註冊IntegrationEventLogContext即可完成事件日誌的整合。

services.AddDbContext<IntegrationEventLogContext>(options =>
{
    options.UseSqlServer(configuration["ConnectionString"],
        sqlServerOptionsAction: sqlOptions =>
        {
            sqlOptions.MigrationsAssembly(typeof(Startup)
                .GetTypeInfo().Assembly.GetName().Name);
            sqlOptions.EnableRetryOnFailure(maxRetryCount: 10,
                maxRetryDelay: TimeSpan.FromSeconds(30), 
                errorNumbersToAdd: null);
        });
});

使用EF進行資料庫遷移後,就會生成IntergrationEventLog表。如下圖所示:

5. 如何藉助事件日誌確保高可用

主要分兩步走:

  1. 應用程式開始本地資料庫事務,然後更新領域實體狀態,並將整合事件插入整合事件日誌表中,最後提交事務來確保領域實體更新和儲存事件日誌所需的原子性。
  2. 釋出事件

第一步毋庸置疑,第二步釋出事件,我們又有多種實現方式:

  1. 在提交事務後立即釋出整合事件,並將其標記為已釋出。當微服務發生故障時,可以通過遍歷儲存的整合事件(未釋出)執行補救措施。
  2. 將事件日誌表用作一種佇列。使用單獨的執行緒或程序查詢事件日誌表,將事件釋出到事件總
    線,然後將事件標記為已釋出。

通過單獨的程序,將事件日誌表作為佇列進行事件釋出

這裡很顯然第二種方式更為穩妥。而eShopOnContainers出於簡單考慮,採用了第一種方案,具體程式碼如下:

using (var transaction = _catalogContext.Database.BeginTransaction())
{
 _catalogContext.CatalogItems.Update(catalogItem);
 await _catalogContext.SaveChangesAsync();
 // Save to EventLog only if product price changed
 if(raiseProductPriceChangedEvent)
 await
_integrationEventLogService.SaveEventAsync(priceChangedEvent);
 transaction.Commit();
}
// Publish the intergation event through the event bus
_eventBus.Publish(priceChangedEvent);
integrationEventLogService.MarkEventAsPublishedAsync( priceChangedEvent); 

至此,eShopOnContainers確保事件匯流排能夠正確轉發訊息的解決方案闡述完畢。你可能會問,這對應的是引言中的哪一種方案?都不是,你可以看作其是基於事件日誌的簡化版的事件溯源。

6. 僅此而已?

通過持久化事件日誌來避免事件釋出失敗導致的一致性問題,是一種有效措施。然而訊息從傳送到接收再到正常消費的過程中,每一個環節都可能故障,所以僅僅在訊息傳送端使用事件日誌只是確保最終一致性的一小步。還有很多問題有待完善:

  1. 訊息傳送成功了,但未被成功接收
  2. 訊息傳送且成功接收,但未被正確消費
  3. 訊息重複傳送,導致多次消費問題
  4. 訊息被多個微服務訂閱,如何確保每個微服務都成功接收並消費
  5. 等等

而這些問題就留給大家思考吧。