[Abp vNext 原始碼分析] - 13. 本地事件匯流排與分散式事件匯流排 (Rabbit MQ)
一、簡要介紹
ABP vNext 封裝了兩種事件匯流排結構,第一種是 ABP vNext 自己實現的本地事件匯流排,這種事件匯流排無法跨專案釋出和訂閱。第二種則是分散式事件匯流排,ABP vNext 自己封裝了一個抽象層進行定義,並使用 RabbitMQ 編寫了一個基本實現。
在使用方式上,兩種事件匯流排的作用基本相同。
事件匯流排分佈在兩個模組,在 Volo.Abp.EventBus 模組內部,定義了事件匯流排的抽象介面,以及本地事件匯流排 (ILocalEventBus
) 的實現。分散式事件匯流排的具體實現,是在 Volo.Abp.EventBus.RabbitMQ 模組內部進行定義,從專案名稱可以看出來,這個模組是基於 RabbitMQ 訊息佇列實現的。
但是該專案並不是直接引用 RabbitMQ.Client 包,而是在 Volo.Abp.RabbitMQ 專案內部引用。這是因為除了分散式事件匯流排以外,ABP 還基於 RabbitMQ 實現了一個後臺作業管理器。
ABP vNext 框架便將一些物件抽象出來,放在 Volo.Abp.RabbitMQ 專案內部進行定義和實現。
二、原始碼分析
2.1 事件處理器的註冊
分析原始碼,首先從一個專案的模組開始,Volo.Abp.EventBus 庫的模組 AbpEventBusModule
只幹了一件事情。在元件註冊的時候,根據元件的實現介面 (ILocalEventHandler
IDistributedEventHandler
) 不同,將其賦值給 AbpLocalEventBusOptions
與 AbpDistributedEventBusOptions
的 Handlers
屬性。
也就是說,開發人員定義的事件處理程式 (Handler
) 都會在依賴注入的時候,都會將其型別 (Type
) 新增到事件匯流排的配置類當中,方便後續進行使用。
2.2 事件匯流排的介面
通過事件匯流排模組的單元測試我們可以知道,事件的釋出與訂閱都是通過 IEventBus
的兩個子介面 (ILocalEventBus
/IDistributedEventBus
) 進行的。在 IEventBus
對於 ILocalEventBus
介面和 IDistributedEventBus
介面來說,它們都提供了一個,針對本地事件處理器和分散式處理器的特殊訂閱方法。
ILocalEventBus
:
/// <summary>
/// Defines interface of the event bus.
/// </summary>
public interface ILocalEventBus : IEventBus
{
/// <summary>
/// Registers to an event.
/// Same (given) instance of the handler is used for all event occurrences.
/// </summary>
/// <typeparam name="TEvent">Event type</typeparam>
/// <param name="handler">Object to handle the event</param>
IDisposable Subscribe<TEvent>(ILocalEventHandler<TEvent> handler)
where TEvent : class;
}
IDistributedEventBus
:
public interface IDistributedEventBus : IEventBus
{
/// <summary>
/// Registers to an event.
/// Same (given) instance of the handler is used for all event occurrences.
/// </summary>
/// <typeparam name="TEvent">Event type</typeparam>
/// <param name="handler">Object to handle the event</param>
IDisposable Subscribe<TEvent>(IDistributedEventHandler<TEvent> handler)
where TEvent : class;
}
2.3 事件匯流排基本流程和實現
同其他模組一樣,因為有分散式事件匯流排和本地事件匯流排,ABP vNext 同樣抽象了一個 EventBusBase
型別,作為它們的基類實現。
一般的流程,我們是先定義某個事件,然後訂閱該事件並指定事件處理器,最後在某個時刻釋出事件。例如下面的程式碼:
首先定義了一個事件處理器,專門用於處理 EntityChangedEventData<MyEntity>
事件。
public class MyEventHandler : ILocalEventHandler<EntityChangedEventData<MyEntity>>
{
public int EntityChangedEventCount { get; set; }
public Task HandleEventAsync(EntityChangedEventData<MyEntity> eventData)
{
EntityChangedEventCount++;
return Task.CompletedTask;
}
}
var handler = new MyEventHandler();
LocalEventBus.Subscribe<EntityChangedEventData<MyEntity>>(handler);
await LocalEventBus.PublishAsync(new EntityCreatedEventData<MyEntity>(new MyEntity()));
2.3.1 事件的訂閱
可以看到,這裡使用的是 ILocalEventBus
定義的訂閱方法,跳轉到內部實現,它還是呼叫的 EventBus
的方法。
public virtual IDisposable Subscribe<TEvent>(ILocalEventHandler<TEvent> handler) where TEvent : class
{
// 呼叫基類的 Subscribe 方法,並傳遞 TEvent 的型別,和事件處理器。
return Subscribe(typeof(TEvent), handler);
}
public virtual IDisposable Subscribe(Type eventType, IEventHandler handler)
{
return Subscribe(eventType, new SingleInstanceHandlerFactory(handler));
}
可以看到,這裡傳遞了一個 SingleInstanceHandlerFactory
物件,這玩意兒是幹嘛用的呢?從名字可以看出來,這是一個工廠,是用來建立 Handler (事件處理器) 的工廠,並且是一個單例項的事件處理器工廠。
下面就是 IEventHandlerFactory
介面的定義,以及 SingleInstanceHandlerFactory
實現。
public interface IEventHandlerFactory
{
// 獲得一個事件處理器包裝物件,即事件處理器執行完畢之後,可以呼叫
// IEventHandlerDisposeWrapper.Dispose() 進行釋放。
IEventHandlerDisposeWrapper GetHandler();
// 判斷在已有的事件處理器工廠集合中,是否已經存在了相同的事件處理器。
bool IsInFactories(List<IEventHandlerFactory> handlerFactories);
}
public class SingleInstanceHandlerFactory : IEventHandlerFactory
{
// 構造工廠時,傳遞的事件處理器例項。
public IEventHandler HandlerInstance { get; }
public SingleInstanceHandlerFactory(IEventHandler handler)
{
HandlerInstance = handler;
}
// 通過 EventHandlerDisposeWrapper 包裝事件處理器例項。
public IEventHandlerDisposeWrapper GetHandler()
{
return new EventHandlerDisposeWrapper(HandlerInstance);
}
// 判斷針對 HandlerInstance 的事件處理器是否已經存在。
public bool IsInFactories(List<IEventHandlerFactory> handlerFactories)
{
return handlerFactories
.OfType<SingleInstanceHandlerFactory>()
.Any(f => f.HandlerInstance == HandlerInstance);
}
}
針對 IEventHandlerFactory
工廠,還擁有 3 個不同的實現,下表分別說明它們的應用場景。
實現型別 | 作用 |
---|---|
IocEventHandlerFactory |
每個工廠對應一個事件處理器的的型別,並通過 ScopeFactory 解析具體的事件處理器。生命週期由 scope 控制,當 scope 釋放時,對應的事件處理器例項也會被銷燬。 |
SingleInstanceHandlerFactory |
每個工廠對應單獨的一個事件處理器例項,事件處理器例項是由建立者控制的。 |
TransientEventHandlerFactory |
每個工廠對應一個事件處理器的型別,區別是它不由 IoC 解析例項,而是使用的 Activator.CreateInstance() 方法構造例項,是一個瞬時物件,呼叫包裝器的 Dispose 即會進行釋放。 |
TransientEventHandlerFactory<THandler> |
每個工廠對應指定的 THandler 事件處理器,生命週期同上面的工廠一樣。 |
這幾種工廠都是在訂閱操作時,不同的訂閱過載使用不同的工廠,或者是自己指定事件處理器的工廠均可。
public virtual IDisposable Subscribe<TEvent, THandler>()
where TEvent : class
where THandler : IEventHandler, new()
{
return Subscribe(typeof(TEvent), new TransientEventHandlerFactory<THandler>());
}
public virtual IDisposable Subscribe(Type eventType, IEventHandler handler)
{
return Subscribe(eventType, new SingleInstanceHandlerFactory(handler));
}
不過有一種特殊的行為,開發人員可以 不用顯式訂閱。在 EventBus
型別中,定義了一個 SubscribeHandlers(ITypeList<IEventHandler> handlers)
方法。該方法接收一個型別集合,通過遍歷集合,從事件處理器的定義當中,取得事件處理器監聽的事件型別 TEvent
。
在取得了事件型別,並知曉了事件處理器型別以後,事件匯流排就可以訂閱 TEvent
型別的事件,並使用 IocEventHandlerFactory
工廠來構造事件處理器。
protected virtual void SubscribeHandlers(ITypeList<IEventHandler> handlers)
{
// 遍歷事件處理器的型別,其實這裡的就是模組啟動時,傳遞給 XXXOptions 的集合。
foreach (var handler in handlers)
{
// 獲得事件處理器的所有介面定義,並遍歷介面進行檢查。
var interfaces = handler.GetInterfaces();
foreach (var @interface in interfaces)
{
// 如果介面沒有實現 IEventHandler 型別,則忽略。
if (!typeof(IEventHandler).GetTypeInfo().IsAssignableFrom(@interface))
{
continue;
}
// 從泛型引數當中,獲得定義的事件型別。
var genericArgs = @interface.GetGenericArguments();
// 泛型引數完全匹配 1 時,才進行訂閱操作。
if (genericArgs.Length == 1)
{
Subscribe(genericArgs[0], new IocEventHandlerFactory(ServiceScopeFactory, handler));
}
}
}
}
這個訂閱方法在 EventBus
當中是一個抽象方法,分別在本地事件匯流排和分散式事件匯流排有實現,這裡我們首先講解本地事件的邏輯。
public class LocalEventBus : EventBusBase, ILocalEventBus, ISingletonDependency
{
protected ConcurrentDictionary<Type, List<IEventHandlerFactory>> HandlerFactories { get; }
public LocalEventBus(
IOptions<AbpLocalEventBusOptions> options,
IServiceScopeFactory serviceScopeFactory)
: base(serviceScopeFactory)
{
Options = options.Value;
Logger = NullLogger<LocalEventBus>.Instance;
HandlerFactories = new ConcurrentDictionary<Type, List<IEventHandlerFactory>>();
// 呼叫父類的方法,將模組初始化時掃描到的事件處理器,都嘗試進行訂閱。
SubscribeHandlers(Options.Handlers);
}
public override IDisposable Subscribe(Type eventType, IEventHandlerFactory factory)
{
GetOrCreateHandlerFactories(eventType)
// 鎖住集合,以確保執行緒安全。
.Locking(factories =>
{
// 如果在集合內部,已經有了對應的工廠,則不進行新增。
if (!factory.IsInFactories(factories))
{
factories.Add(factory);
}
}
);
// 返回一個事件處理器工廠登出器,當呼叫 Dispose() 方法時,會取消之前訂閱的事件。
return new EventHandlerFactoryUnregistrar(this, eventType, factory);
}
private List<IEventHandlerFactory> GetOrCreateHandlerFactories(Type eventType)
{
// 根據事件的型別,從字典中獲得該型別的所有事件處理器工廠。
return HandlerFactories.GetOrAdd(eventType, (type) => new List<IEventHandlerFactory>());
}
}
上述流程結合 EventBus
和 LocalEventBus
講解了事件的訂閱流程,事件的訂閱操作都是對 HandlerFactories
的操作,往裡面新增指定事件的事件處理器工廠,而每個工廠都是跟具體的事件處理器例項/型別進行關聯的。
2.3.2 事件的釋出
當開發人員需要釋出事件的時候,一般都是通過對應的 EventBus
,呼叫響應的 PublishAsync
方法,傳遞要觸發的事件型別與事件資料。介面和基類當中,定義了兩種釋出方法的簽名與實現:
public virtual Task PublishAsync<TEvent>(TEvent eventData) where TEvent : class
{
return PublishAsync(typeof(TEvent), eventData);
}
public abstract Task PublishAsync(Type eventType, object eventData);
第二種方法一共也分為本地事件匯流排的實現,和分散式事件匯流排的實現,本地事件比較簡單,我們先分析本地事件匯流排的實現。
public override async Task PublishAsync(Type eventType, object eventData)
{
// 定義了一個異常集合,用於接收多個事件處理器執行時,產生的所有異常。
var exceptions = new List<Exception>();
// 觸發事件處理器。
await TriggerHandlersAsync(eventType, eventData, exceptions);
// 如果有任何異常產生,則丟擲到之前的呼叫棧。
if (exceptions.Any())
{
if (exceptions.Count == 1)
{
exceptions[0].ReThrow();
}
throw new AggregateException("More than one error has occurred while triggering the event: " + eventType, exceptions);
}
}
可以看到真正的觸發行為是在 TriggerHandlersAsync(Type eventType, object eventData, List<Exception> exceptions)
內部進行實現的。
protected virtual async Task TriggerHandlersAsync(Type eventType, object eventData, List<Exception> exceptions)
{
// 針對於這個的作用,等同於 ConfigureAwait(false) 。
// 具體可以參考 https://blogs.msdn.microsoft.com/benwilli/2017/02/09/an-alternative-to-configureawaitfalse-everywhere/。
await new SynchronizationContextRemover();
// 根據事件的型別,得到它的所有事件處理器工廠。
foreach (var handlerFactories in GetHandlerFactories(eventType))
{
// 遍歷所有的事件處理器工廠,通過 Factory 獲得事件處理器,呼叫 Handler 的 HandleEventAsync 方法。
foreach (var handlerFactory in handlerFactories.EventHandlerFactories)
{
await TriggerHandlerAsync(handlerFactory, handlerFactories.EventType, eventData, exceptions);
}
}
// 如果型別繼承了 IEventDataWithInheritableGenericArgument 介面,那麼會檢測泛型引數是否有父類。
// 如果有父類,則會使用當前的事件資料,為其父類釋出一個事件。
if (eventType.GetTypeInfo().IsGenericType &&
eventType.GetGenericArguments().Length == 1 &&
typeof(IEventDataWithInheritableGenericArgument).IsAssignableFrom(eventType))
{
var genericArg = eventType.GetGenericArguments()[0];
var baseArg = genericArg.GetTypeInfo().BaseType;
if (baseArg != null)
{
// 構造基類的事件型別,使用當前一樣的泛型定義,只是泛型引數使用基類。
var baseEventType = eventType.GetGenericTypeDefinition().MakeGenericType(baseArg);
// 構建型別的構造引數。
var constructorArgs = ((IEventDataWithInheritableGenericArgument)eventData).GetConstructorArgs();
// 通過事件型別和構造引數,構造一個新的事件資料例項。
var baseEventData = Activator.CreateInstance(baseEventType, constructorArgs);
// 釋出父類的同類事件。
await PublishAsync(baseEventType, baseEventData);
}
}
}
在上述程式碼內部,都還沒有真正執行事件處理器,真正的事件處理器執行程式是在下面的方法進行執行的。ABP vNext 通過引入 IEventDataWithInheritableGenericArgument
介面,實現了 型別繼承事件 的觸發,該介面提供了一個 GetConstructorArgs()
方法定義,方便後面生成構造引數。
例如有一個基礎事件叫做 EntityEventData<Student>
,如果 Student
繼承自 Person
,那麼在觸發該事件的時候,也會發佈一個 EntityEventData<Person>
事件。
2.3.3 事件處理器的執行
真正事件處理器的執行,是通過下面的方法實現的,大概思路就是通過事件匯流排工廠,構建了事件處理器的例項。通過反射,呼叫事件處理器的 HandleEventAsync()
方法。如果在處理過程當中,出現了異常,則將異常資料放置在 List<Exception>
集合當中。
protected virtual async Task TriggerHandlerAsync(IEventHandlerFactory asyncHandlerFactory, Type eventType, object eventData, List<Exception> exceptions)
{
using (var eventHandlerWrapper = asyncHandlerFactory.GetHandler())
{
try
{
// 獲得事件處理器的型別。
var handlerType = eventHandlerWrapper.EventHandler.GetType();
// 判斷事件處理器是本地事件還是分散式事件。
if (ReflectionHelper.IsAssignableToGenericType(handlerType, typeof(ILocalEventHandler<>)))
{
// 獲得方法定義。
var method = typeof(ILocalEventHandler<>)
.MakeGenericType(eventType)
.GetMethod(
nameof(ILocalEventHandler<object>.HandleEventAsync),
new[] { eventType }
);
// 使用工廠建立的例項呼叫方法。
await (Task)method.Invoke(eventHandlerWrapper.EventHandler, new[] { eventData });
}
else if (ReflectionHelper.IsAssignableToGenericType(handlerType, typeof(IDistributedEventHandler<>)))
{
var method = typeof(IDistributedEventHandler<>)
.MakeGenericType(eventType)
.GetMethod(
nameof(IDistributedEventHandler<object>.HandleEventAsync),
new[] { eventType }
);
await (Task)method.Invoke(eventHandlerWrapper.EventHandler, new[] { eventData });
}
else
{
// 如果都不是,則說明型別不正確,丟擲異常。
throw new AbpException("The object instance is not an event handler. Object type: " + handlerType.AssemblyQualifiedName);
}
}
// 捕獲到異常都統一新增到異常集合當中。
catch (TargetInvocationException ex)
{
exceptions.Add(ex.InnerException);
}
catch (Exception ex)
{
exceptions.Add(ex);
}
}
}
2.4 分散式事件匯流排
分散式事件匯流排的實現都存放在 Volo.Abp.EventBus.RabbitMQ,該專案的程式碼比較少,由三個檔案構成。
在 RabbitMQ 模組的內部,只幹了兩件事情。首先從 JSON 配置檔案當中,獲取 AbpRabbitMqEventBusOptions
配置的三個引數,然後解析 RabbitMqDistributedEventBus
例項,並呼叫初始化方法 (Initialize()
)。
[DependsOn(
typeof(AbpEventBusModule),
typeof(AbpRabbitMqModule))]
public class AbpEventBusRabbitMqModule : AbpModule
{
public override void ConfigureServices(ServiceConfigurationContext context)
{
var configuration = context.Services.GetConfiguration();
// 從配置檔案讀取配置。
Configure<AbpRabbitMqEventBusOptions>(configuration.GetSection("RabbitMQ:EventBus"));
}
public override void OnApplicationInitialization(ApplicationInitializationContext context)
{
// 呼叫初始化方法。
context
.ServiceProvider
.GetRequiredService<RabbitMqDistributedEventBus>()
.Initialize();
}
}
2.4.1 分散式事件匯流排的初始化
public void Initialize()
{
// 建立一個消費者,並配置交換器和佇列。
Consumer = MessageConsumerFactory.Create(
new ExchangeDeclareConfiguration(
AbpRabbitMqEventBusOptions.ExchangeName,
type: "direct",
durable: true
),
new QueueDeclareConfiguration(
AbpRabbitMqEventBusOptions.ClientName,
durable: true,
exclusive: false,
autoDelete: false
),
AbpRabbitMqEventBusOptions.ConnectionName
);
// 消費者在消費訊息的時候,具體的執行邏輯。
Consumer.OnMessageReceived(ProcessEventAsync);
// 呼叫基類的方法,自動訂閱對應的事件。
SubscribeHandlers(AbpDistributedEventBusOptions.Handlers);
}
2.4.2 分散式事件的訂閱
在定義分散式事件的時候,我們必須使用 EventNameAttribute
為事件宣告路由鍵。
public override IDisposable Subscribe(Type eventType, IEventHandlerFactory factory)
{
var handlerFactories = GetOrCreateHandlerFactories(eventType);
if (factory.IsInFactories(handlerFactories))
{
return NullDisposable.Instance;
}
handlerFactories.Add(factory);
if (handlerFactories.Count == 1) //TODO: Multi-threading!
{
// 為消費者繫結一個路由鍵,在收到對應的事件時,就會觸發之前繫結的方法。
Consumer.BindAsync(EventNameAttribute.GetNameOrDefault(eventType));
}
return new EventHandlerFactoryUnregistrar(this, eventType, factory);
}
訂閱的時候,除了 Consumer.BindAsync()
以外,基本流程和本地事件匯流排基本一致。
2.4.3 分散式事件的釋出
分散式事件匯流排一樣重寫了釋出方法,內部首先使用 IRabbitMqSerializer
序列化器 (基於 JSON.NET) 將事件資料進行序列化,然後將訊息投遞出去。
public override Task PublishAsync(Type eventType, object eventData)
{
var eventName = EventNameAttribute.GetNameOrDefault(eventType);
// 序列化事件資料。
var body = Serializer.Serialize(eventData);
// 建立一個通道用於通訊。
using (var channel = ConnectionPool.Get(AbpRabbitMqEventBusOptions.ConnectionName).CreateModel())
{
channel.ExchangeDeclare(
AbpRabbitMqEventBusOptions.ExchangeName,
"direct",
durable: true
);
// 更改投遞模式為持久化模式。
var properties = channel.CreateBasicProperties();
properties.DeliveryMode = RabbitMqConsts.DeliveryModes.Persistent;
// 釋出一個新的事件。
channel.BasicPublish(
exchange: AbpRabbitMqEventBusOptions.ExchangeName,
routingKey: eventName,
mandatory: true,
basicProperties: properties,
body: body
);
}
return Task.CompletedTask;
}
2.4.4 分散式事件的執行
執行邏輯都存放在 ProcessEventAsync(IModel channel, BasicDeliverEventArgs ea)
方法內部,基本就是監聽到指定的訊息,首先反序列化訊息,呼叫父類的 TriggerHandlersAsync
去執行具體的事件處理器。
private async Task ProcessEventAsync(IModel channel, BasicDeliverEventArgs ea)
{
var eventName = ea.RoutingKey;
var eventType = EventTypes.GetOrDefault(eventName);
if (eventType == null)
{
return;
}
var eventData = Serializer.Deserialize(ea.Body, eventType);
await TriggerHandlersAsync(eventType, eventData);
}
三、總結
ABP vNext 為我們實現了比較完善的本地事件匯流排,和基於 RabbitMQ 的分散式事件匯流排。在平時開發過程中,我們本地事件匯流排的使用頻率應該還是比較高,而分散式事件匯流排目前仍處於一個半成品,很多高階特性還沒實現,例如重試策略等。所以分散式事件匯流排要使用的話,建議使用較為成熟的 CAP 庫替代 ABP vNext 的分散式事件匯流排。
四、其他
360 大病救助 : 在這裡向大家求助一下,病人是我親戚,情況屬實。對於他們家庭來說,經濟壓力很大,希望大家能幫助或轉發一下,謝謝大家。