動手造輪子:基於 Redis 實現 EventBus
動手造輪子:基於 Redis 實現 EventBus
Intro
上次我們造了一個簡單的基於記憶體的 EventBus
,但是如果要跨系統的話就不合適了,所以有了這篇基於 Redis
的 EventBus
探索。
本文的實現是基於 StackExchange.Redis
來實現的。
RedisEventStore
實現
既然要實現跨系統的 EventBus
再使用基於記憶體的 EventStore 自然不行,因此這裡基於 Redis 設計了一個 EventStoreInRedis
,基於 redis 的 Hash 來實現,以 Event 的 EventKey 作為 fieldName,以 Event 對應的 EventHandler 作為 value。
EventStoreInRedis
實現:
public class EventStoreInRedis : IEventStore { protected readonly string EventsCacheKey; protected readonly ILogger Logger; private readonly IRedisWrapper Wrapper; public EventStoreInRedis(ILogger<EventStoreInRedis> logger) { Logger = logger; Wrapper = new RedisWrapper(RedisConstants.EventStorePrefix); EventsCacheKey = RedisManager.RedisConfiguration.EventStoreCacheKey; } public bool AddSubscription<TEvent, TEventHandler>() where TEvent : IEventBase where TEventHandler : IEventHandler<TEvent> { var eventKey = GetEventKey<TEvent>(); var handlerType = typeof(TEventHandler); if (Wrapper.Database.HashExists(EventsCacheKey, eventKey)) { var handlers = Wrapper.Unwrap<HashSet<Type>>(Wrapper.Database.HashGet(EventsCacheKey, eventKey)); if (handlers.Contains(handlerType)) { return false; } handlers.Add(handlerType); Wrapper.Database.HashSet(EventsCacheKey, eventKey, Wrapper.Wrap(handlers)); return true; } else { return Wrapper.Database.HashSet(EventsCacheKey, eventKey, Wrapper.Wrap(new HashSet<Type> { handlerType }), StackExchange.Redis.When.NotExists); } } public bool Clear() { return Wrapper.Database.KeyDelete(EventsCacheKey); } public ICollection<Type> GetEventHandlerTypes<TEvent>() where TEvent : IEventBase { var eventKey = GetEventKey<TEvent>(); return Wrapper.Unwrap<HashSet<Type>>(Wrapper.Database.HashGet(EventsCacheKey, eventKey)); } public string GetEventKey<TEvent>() { return typeof(TEvent).FullName; } public bool HasSubscriptionsForEvent<TEvent>() where TEvent : IEventBase { var eventKey = GetEventKey<TEvent>(); return Wrapper.Database.HashExists(EventsCacheKey, eventKey); } public bool RemoveSubscription<TEvent, TEventHandler>() where TEvent : IEventBase where TEventHandler : IEventHandler<TEvent> { var eventKey = GetEventKey<TEvent>(); var handlerType = typeof(TEventHandler); if (!Wrapper.Database.HashExists(EventsCacheKey, eventKey)) { return false; } var handlers = Wrapper.Unwrap<HashSet<Type>>(Wrapper.Database.HashGet(EventsCacheKey, eventKey)); if (!handlers.Contains(handlerType)) { return false; } handlers.Remove(handlerType); Wrapper.Database.HashSet(EventsCacheKey, eventKey, Wrapper.Wrap(handlers)); return true; } }
RedisWrapper
及更具體的程式碼可以參考我的 Redis 的擴充套件的實現 https://github.com/WeihanLi/WeihanLi.Redis
RedisEventBus
實現
RedisEventBus 是基於 Redis 的 PUB/SUB 實現的,實現的感覺還有一些小問題,我想確保每個客戶端註冊的時候每個 EventHandler
即使多次註冊也只註冊一次,但是還沒找到一個好的實現,如果你有什麼想法歡迎指出,和我一起交流。具體的實現細節如下:
public class RedisEventBus : IEventBus { private readonly IEventStore _eventStore; private readonly ISubscriber _subscriber; private readonly IServiceProvider _serviceProvider; public RedisEventBus(IEventStore eventStore, IConnectionMultiplexer connectionMultiplexer, IServiceProvider serviceProvider) { _eventStore = eventStore; _serviceProvider = serviceProvider; _subscriber = connectionMultiplexer.GetSubscriber(); } private string GetChannelPrefix<TEvent>() where TEvent : IEventBase { var eventKey = _eventStore.GetEventKey<TEvent>(); var channelPrefix = $"{RedisManager.RedisConfiguration.EventBusChannelPrefix}{RedisManager.RedisConfiguration.KeySeparator}{eventKey}{RedisManager.RedisConfiguration.KeySeparator}"; return channelPrefix; } private string GetChannelName<TEvent, TEventHandler>() where TEvent : IEventBase where TEventHandler : IEventHandler<TEvent> => GetChannelName<TEvent>(typeof(TEventHandler)); private string GetChannelName<TEvent>(Type eventHandlerType) where TEvent : IEventBase { var channelPrefix = GetChannelPrefix<TEvent>(); var channelName = $"{channelPrefix}{eventHandlerType.FullName}"; return channelName; } public bool Publish<TEvent>(TEvent @event) where TEvent : IEventBase { if (!_eventStore.HasSubscriptionsForEvent<TEvent>()) { return false; } var eventData = @event.ToJson(); var handlerTypes = _eventStore.GetEventHandlerTypes<TEvent>(); foreach (var handlerType in handlerTypes) { var handlerChannelName = GetChannelName<TEvent>(handlerType); _subscriber.Publish(handlerChannelName, eventData); } return true; } public bool Subscribe<TEvent, TEventHandler>() where TEvent : IEventBase where TEventHandler : IEventHandler<TEvent> { _eventStore.AddSubscription<TEvent, TEventHandler>(); var channelName = GetChannelName<TEvent, TEventHandler>(); //// TODO: if current client subscribed the channel //if (true) //{ _subscriber.Subscribe(channelName, async (channel, eventMessage) => { var eventData = eventMessage.ToString().JsonToType<TEvent>(); var handler = _serviceProvider.GetServiceOrCreateInstance<TEventHandler>(); if (null != handler) { await handler.Handle(eventData).ConfigureAwait(false); } }); return true; //} //return false; } public bool Unsubscribe<TEvent, TEventHandler>() where TEvent : IEventBase where TEventHandler : IEventHandler<TEvent> { _eventStore.RemoveSubscription<TEvent, TEventHandler>(); var channelName = GetChannelName<TEvent, TEventHandler>(); //// TODO: if current client subscribed the channel //if (true) //{ _subscriber.Unsubscribe(channelName); return true; //} //return false; } }
使用示例:
使用起來大體上和上一篇使用一致,只是在初始化注入服務的時候,我們需要把 IEventBus
和 IEventStore
替換為對應 Redis 的實現即可。
註冊服務
services.AddSingleton<IEventBus, RedisEventBus>(); services.AddSingleton<IEventStore, EventStoreInRedis>();
註冊
EventHandler
services.AddSingleton<NoticeViewEventHandler>();
訂閱事件
eventBus.Subscribe<NoticeViewEvent, NoticeViewEventHandler>();
釋出事件
[HttpGet("{path}")] public async Task<IActionResult> GetByPath(string path, CancellationToken cancellationToken, [FromServices]IEventBus eventBus) { var notice = await _repository.FetchAsync(n => n.NoticeCustomPath == path, cancellationToken); if (notice == null) { return NotFound(); } eventBus.Publish(new NoticeViewEvent { NoticeId = notice.NoticeId }); return Ok(notice); }
Memo
如果要實現基於訊息佇列的事件處理,需要注意,訊息可能會重複,可能會需要在事件處理中注意一下業務的冪等性或者對訊息對一個去重處理。
我在使用 Redis 的事件處理中使用了一個基於 Redis 原子遞增的特性設計的一個防火牆,從而實現一段時間內某一個訊息id只會被處理一次,實現原始碼:https://github.com/WeihanLi/ActivityReservation/blob/dev/ActivityReservation.Helper/Events/NoticeViewEvent.cs
public class NoticeViewEvent : EventBase
{
public Guid NoticeId { get; set; }
// UserId
// IP
// ...
}
public class NoticeViewEventHandler : IEventHandler<NoticeViewEvent>
{
public async Task Handle(NoticeViewEvent @event)
{
var firewallClient = RedisManager.GetFirewallClient($"{nameof(NoticeViewEventHandler)}_{@event.EventId}", TimeSpan.FromMinutes(5));
if (await firewallClient.HitAsync())
{
await DependencyResolver.Current.TryInvokeServiceAsync<ReservationDbContext>(async dbContext =>
{
//var notice = await dbContext.Notices.FindAsync(@event.NoticeId);
//notice.NoticeVisitCount += 1;
//await dbContext.SaveChangesAsync();
var conn = dbContext.Database.GetDbConnection();
await conn.ExecuteAsync($@"UPDATE tabNotice SET NoticeVisitCount = NoticeVisitCount +1 WHERE NoticeId = @NoticeId", new { @event.NoticeId });
});
}
}
}
Reference
- https://github.com/WeihanLi/ActivityReservation
- https://github.com/WeihanLi/WeihanLi.Redis
- https://redis.io/topics/pubsub