免费下载ppt模板网站推荐,知名网站建设怎么样,wordpress动态默认参数,建设电影网站的教程EventBus/EventQueue 再思考Intro之前写过两篇文章#xff0c;造轮子系列的 EventBus/ EventQueue#xff0c;回想起来觉得当前的想法有点问题#xff0c;当时对 EvenStore 可能有点误解#xff0c;有兴趣可以参考 动手造轮子#xff1a;实现一个简单的 EventBus动手造轮子… EventBus/EventQueue 再思考Intro之前写过两篇文章造轮子系列的 EventBus/ EventQueue回想起来觉得当前的想法有点问题当时对 EvenStore 可能有点误解有兴趣可以参考 动手造轮子实现一个简单的 EventBus动手造轮子实现简单的 EventQueue最近把 Event 相关的逻辑做了一个重构修改 EventStore重新设计了 Event 相关的组件重构后的 EventEvent: 事件的抽象定义EventHandler事件处理器抽象定义EventHandlerFactory事件处理器工厂用来根据事件类型获取事件处理器新增EventPublisher事件发布器用于事件发布EventSubscriber事件订阅器用于管理事件的订阅EventSubscriptionManager事件订阅管理器在 EventSubscriber 的基础上增加了一个根据事件类型获取事件订阅器类型的方法EventBus事件总线由 EventPubliser 和 EventSubscriber 组合而成用来比较方便的做事件发布和订阅EventQueue事件队列希望某些消息顺序处理的时候可以考虑用 EventQueue 的模式EventStore事件存储事件的持久化存储在之前的版本里EventStore 实际作用是一个 EventSubscriptionManager在最近的版本更新中已修改)以上 EventSubscriber 和 EventSubscriptionManager 一般不直接用一般用 EventBus 来处理即可EventHandlerFactory这次引入了 EventHandlerFactory 用来抽象获取 EventHandler 的逻辑原来的设计里是在处理 Event 的时候获取 EventHandler 的类型然后从依赖注入框架中获取或创建新的 event handler 实例之后再调用 EventHandler 的 Handle 方法处理事件有一些冗余使用 EventHandlerFactory 之后就可以直接获取一个 EventHandler 实例集合具体是实例化还是从依赖注入中获取就由 EventHandlerFactory 来决定了这样就可以对依赖注入很友好对于基于内存的简单 EventBus 来说在服务注册之后就不需要再调用 Subscribe 去显式订阅了因为再注册服务的时候就已经隐式实现了订阅的逻辑这样实际就不需要 EventSubscriptionManager 来管理订阅了订阅信息都在依赖注入框架内部比如说 CounterEvent要获取它的订阅信息我只需要从依赖注入框架中获取 IEventHandlerCounterEvent 的实例即可实际就代替了原先 “EventStoreInMemory”现在的 EventSubscriptionManagerInMemory基于依赖注入的 EventHandlerFactory 定义public sealed class DependencyInjectionEventHandlerFactory : IEventHandlerFactory
{private readonly IServiceProvider _serviceProvider;public DependencyInjectionEventHandlerFactory(IServiceProvider serviceProvider null){_serviceProvider serviceProvider ?? DependencyResolver.Current;}public ICollectionIEventHandler GetHandlers(Type eventType){var eventHandlerType typeof(IEventHandler).MakeGenericType(eventType);return _serviceProvider.GetServices(eventHandlerType).CastIEventHandler().ToArray();}
}
如果不使用依赖注入也可以根据 IEventSubscriptionManager 订阅信息来实现public sealed class DefaultEventHandlerFactory : IEventHandlerFactory
{private readonly IEventSubscriptionManager _subscriptionManager;private readonly ConcurrentDictionaryType, ICollectionIEventHandler _eventHandlers new ConcurrentDictionaryType, ICollectionIEventHandler();private readonly IServiceProvider _serviceProvider;public DefaultEventHandlerFactory(IEventSubscriptionManager subscriptionManager, IServiceProvider serviceProvider null){_subscriptionManager subscriptionManager;_serviceProvider serviceProvider ?? DependencyResolver.Current;}public ICollectionIEventHandler GetHandlers(Type eventType){var eventHandlers _eventHandlers.GetOrAdd(eventType, type {var handlerTypes _subscriptionManager.GetEventHandlerTypes(type);var handlers handlerTypes.Select(t (IEventHandler)_serviceProvider.GetServiceOrCreateInstance(t)).ToArray();return handlers;});return eventHandlers;}
}
EventQueue Demo来看一下 EventQueue 的示例示例基于 asp.net core 的定义了一个 HostedService 来实现一个 EventConsumer 来消费 EventQueue 中的事件信息EventConsumer 定义如下public class EventConsumer : BackgroundService
{private readonly IEventQueue _eventQueue;private readonly IEventHandlerFactory _eventHandlerFactory;public EventConsumer(IEventQueue eventQueue, IEventHandlerFactory eventHandlerFactory){_eventQueue eventQueue;_eventHandlerFactory eventHandlerFactory;}protected override async Task ExecuteAsync(CancellationToken stoppingToken){while (!stoppingToken.IsCancellationRequested){var queues await _eventQueue.GetQueuesAsync();if (queues.Count 0){await queues.Select(async q {var event await _eventQueue.DequeueAsync(q);if (null ! event){var handlers _eventHandlerFactory.GetHandlers(event.GetType());if (handlers.Count 0){await handlers.Select(h h.Handle(event)).WhenAll();}}}).WhenAll();}await Task.Delay(1000, stoppingToken);}}
}
定义 PageViewEvent 和 PageViewEventHandler用来记录和处理请求访问记录public class PageViewEvent : EventBase
{
}
public class PageViewEventHandler : EventHandlerBasePageViewEvent
{public static int Count;public override Task Handle(PageViewEvent event){Interlocked.Increment(ref Count);return Task.CompletedTask;}
}
事件很简单事件处理也只是增加了 PageViewEventHandler 内定义的 Count。服务注册// 注册事件核心组件
// 会注册 EventBus、EventHandlerFactory、EventQueue 等
services.AddEvents()// 注册 EventHanlder.AddEventHandlerPageViewEvent, PageViewEventHandler();
// 注册 EventQueuePubliser默认注册的 IEventPublisher 是 EventBus
services.AddSingletonIEventPublisher, EventQueuePublisher();
// 注册 EventConsumer
services.AddHostedServiceEventConsumer();
事件发布定义了一个中间件来发布 PageViewEvent定义如下// pageView middleware
app.Use((context, next) {var eventPublisher context.RequestServices.GetRequiredServiceIEventPublisher();eventPublisher.Publish(new PageViewEvent());return next();});
然后定义一个接口来获取上面定义的 PageViewEventHandler 中的 Count[Route(api/[controller])]
public class EventsController : ControllerBase
{[HttpGet(pageViewCount)]public IActionResult Count(){return Ok(new { Count PageViewEventHandler.Count });}
}
运行起来之后访问几次接口看上面的接口返回 Count 是否会增加正常的话每访问一次接口就会增加 1并发访问问题也不大因为每个事件都是顺序处理的即使并发访问也没有关系事件发布之后在队列里都是顺序处理的这也就是引入事件队列的目的好像上面的原子递增没什么用了...) 如果没看到了增加稍等一会儿再访问试试事件处理会迟到但总会处理毕竟是异步处理的有些延迟很正常而且上面我们还有一个 1s 的延迟More作者水平有限如果上述有哪些不对的地方还望指出万分感谢Referencehttps://github.com/WeihanLi/WeihanLi.Common/tree/dev/src/WeihanLi.Common/Eventhttps://github.com/WeihanLi/WeihanLi.Common/blob/dev/samples/AspNetCoreSample/Startup.cshttps://www.cnblogs.com/weihanli/p/implement-a-simple-event-bus.htmlhttps://www.cnblogs.com/weihanli/p/implement-event-queue.html