北京网站建设联系电话,微商代理0元0投入,公司的网址格式,网站建设的目标和需求1. 引言事件总线这个概念对你来说可能很陌生#xff0c;但提到观察者#xff08;发布-订阅#xff09;模式#xff0c;你也许就很熟悉。事件总线是对发布-订阅模式的一种实现。它是一种集中式事件处理机制#xff0c;允许不同的组件之间进行彼此通信而又不需要相互依赖但提到观察者发布-订阅模式你也许就很熟悉。事件总线是对发布-订阅模式的一种实现。它是一种集中式事件处理机制允许不同的组件之间进行彼此通信而又不需要相互依赖达到一种解耦的目的。从上图可知核心就4个角色事件事件源事件处理事件发布者事件订阅者事件总线实现事件总线的关键是事件总线维护一个事件源与事件处理的映射字典通过单例模式确保事件总线的唯一入口利用反射完成事件源与事件处理的初始化绑定提供统一的事件注册、取消注册和触发接口。以上源于我在事件总线知多少1中对于EventBus的分析和简单总结。基于以上的简单认知我们来梳理下eShopOnContainers中EventBus的实现机制。2. 高屋建瓴--看类图我们直接以上帝视角来看下其实现机制上类图。我们知道事件的本质是事件源事件处理。 针对事件源其定义了 Handle方法用于响应事件。不同之处在于方法参数的类型 第一个接受的是一个强类型的 dynamic。 为什么要单独提供一个事件源为 dynamic可以简化事件源的构建更趋于灵活。有了事件源和事件处理接下来就是事件的注册和订阅了。为了方便进行订阅管理系统提供了额外的一层抽象 InMemoryEventBusSubscriptionsManager就是使用内存进行存储事件源和事件处理的映射字典。 从类图中看 SubscriptionInfo其主要用于表示事件订阅方的订阅类型和事件处理的类型。我们来近距离看下 //InMemoryEventBusSubscriptionsManager.cs//定义的事件名称和事件订阅的字典映射1:Nprivate readonly Dictionarystring, ListSubscriptionInfo _handlers;//保存所有的事件处理类型private readonly ListType _eventTypes;//定义事件移除后事件public event EventHandlerstring OnEventRemoved;//构造函数初始化public InMemoryEventBusSubscriptionsManager(){ _handlers new Dictionarystring, ListSubscriptionInfo(); _eventTypes new ListType();}//添加动态类型事件订阅需要手动指定事件名称public void AddDynamicSubscriptionTH(string eventName) where TH : IDynamicIntegrationEventHandler{ DoAddSubscription(typeof(TH), eventName, isDynamic: true);}//添加强类型事件订阅事件名称为事件源类型public void AddSubscriptionT, TH() where T : IntegrationEvent where TH : IIntegrationEventHandlerT{ var eventName GetEventKeyT(); DoAddSubscription(typeof(TH), eventName, isDynamic: false); if (!_eventTypes.Contains(typeof(T))) { _eventTypes.Add(typeof(T)); }}//移除动态类型事件订阅public void RemoveDynamicSubscriptionTH(string eventName) where TH : IDynamicIntegrationEventHandler{ var handlerToRemove FindDynamicSubscriptionToRemoveTH(eventName); DoRemoveHandler(eventName, handlerToRemove);}//移除强类型事件订阅public void RemoveSubscriptionT, TH() where TH : IIntegrationEventHandlerT where T : IntegrationEvent{ var handlerToRemove FindSubscriptionToRemoveT, TH(); var eventName GetEventKeyT(); DoRemoveHandler(eventName, handlerToRemove);}添加了这么一层抽象即符合了单一职责原则又完成了代码重用。 IEventBusSubscriptionsManager的依赖即可完成订阅管理。 你这里可能会好奇为什么要暴露一个 EventBusRabbitMQ源码亲密接触。3.3.1. 构造函数定义IRabbitMQPersistentConnection以便连接到对应的Broke。使用空对象模式注入 OnEventRemoved事件取消队列的绑定。这也就回答了上面遗留的问题3.3.2. 事件订阅的逻辑public void Publish(IntegrationEvent event){ if (!_persistentConnection.IsConnected) { _persistentConnection.TryConnect(); } var policy RetryPolicy.HandleBrokerUnreachableException() .OrSocketException() .WaitAndRetry(_retryCount, retryAttempt TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, time) { _logger.LogWarning(ex.ToString()); }); using (var channel _persistentConnection.CreateModel()) { var eventName event.GetType() .Name; channel.ExchangeDeclare(exchange: BROKER_NAME, type: direct); var message JsonConvert.SerializeObject(event); var body Encoding.UTF8.GetBytes(message); policy.Execute(() { var properties channel.CreateBasicProperties(); properties.DeliveryMode 2; // persistent channel.BasicPublish(exchange: BROKER_NAME, routingKey: eventName, mandatory:true, basicProperties: properties, body: body); }); }}这里面有以下几个知识点使用Polly以2的阶乘的时间间隔进行重试。第一次2s后第二次4s后第三次8s后...重试使用direct全匹配、单播形式的路由机制进行消息分发消息主体是格式化的json字符串指定 mandatory:true告知服务器当根据指定的routingKey和消息找不到对应的队列时直接返回消息给生产者。3.3.4. 然后看看事件消息的监听Received事件委托处理消息接收事件调用 以上代码主要包括以下知识点4. EventBus的集成和使用以上介绍了EventBus的实现要点那各个微服务是如何集成呢1. 注册 2. 注册单例模式的 services.AddSingletonIEventBusSubscriptionsManager,InMemoryEventBusSubscriptionsManager();3. 注册单例模式的 完成了以上集成就可以在代码中使用事件总线进行事件的发布和订阅。4. 发布事件若要发布事件需要根据是否需要事件源参数传递来决定是否需要申明相应的集成事件需要则继承自 IEventBus的实例的 IIntegrationEventHandler或 IEventBus的实例调用 TestEvent事件B服务订阅该事件同样需要在B服务复制定义一个 code classprettyprint code-in-text prettyprinted stylebox-sizing: border-box;background: rgb(243, 241, 241);color: rgb(88, 88, 88);line-height: 18px;font-family: consolas, menlo, courier, monospace, initial microsoft !important; 0pxTestEvent。 这也是微服务的一个通病重复代码。5. 最后通过一步一步的源码梳理我们发现eShopOnContainers中事件总线的总体实现思路与引言部分的介绍十分契合。所以对于事件总线不要觉得高深明确参与的几个角色以及基本的实现步骤那么不管是基于RabbitMQ实现也好还是基于Azure Service Bus也好万变不离其宗//定义事件处理public class ProductPriceChangedIntegrationEventHandler : IIntegrationEventHandlerProductPriceChangedIntegrationEvent{ public async Task Handle(ProductPriceChangedIntegrationEvent event) { //do something }}//事件源的声明public class ProductPriceChangedIntegrationEvent : IntegrationEvent{ public int ProductId { get; private set; } public decimal NewPrice { get; private set; } public decimal OldPrice { get; private set; } public ProductPriceChangedIntegrationEvent(int productId, decimal newPrice, decimal oldPrice) { ProductId productId; NewPrice newPrice; OldPrice oldPrice; }}services.AddSingletonIEventBus, EventBusRabbitMQ(sp { var rabbitMQPersistentConnection sp.GetRequiredServiceIRabbitMQPersistentConnection(); var iLifetimeScope sp.GetRequiredServiceILifetimeScope(); var logger sp.GetRequiredServiceILoggerEventBusRabbitMQ(); var eventBusSubcriptionsManager sp.GetRequiredServiceIEventBusSubscriptionsManager(); var retryCount 5; if (!string.IsNullOrEmpty(Configuration[EventBusRetryCount])) { retryCount int.Parse(Configuration[EventBusRetryCount]); } return new EventBusRabbitMQ(rabbitMQPersistentConnection, logger, iLifetimeScope, eventBusSubcriptionsManager, subscriptionClientName, retryCount);});services.AddSingletonIRabbitMQPersistentConnection(sp { var logger sp.GetRequiredServiceILoggerDefaultRabbitMQPersistentConnection(); //... return new DefaultRabbitMQPersistentConnection(factory, logger, retryCount);});Json字符串的反序列化利用依赖注入容器解析集成事件Integration Event和事件处理Event Handler类型反射调用具体的事件处理方法private async Task ProcessEvent(string eventName, string message){ if (_subsManager.HasSubscriptionsForEvent(eventName)) { using (var scope _autofac.BeginLifetimeScope(AUTOFAC_SCOPE_NAME)) { var subscriptions _subsManager.GetHandlersForEvent(eventName); foreach (var subscription in subscriptions) { if (subscription.IsDynamic) { var handler scope.ResolveOptional(subscription.HandlerType) as IDynamicIntegrationEventHandler; dynamic eventData JObject.Parse(message); await handler.Handle(eventData); } else { var eventType _subsManager.GetEventTypeByName(eventName); var integrationEvent JsonConvert.DeserializeObject(message, eventType); var handler scope.ResolveOptional(subscription.HandlerType); var concreteType typeof(IIntegrationEventHandler).MakeGenericType(eventType); await (Task)concreteType.GetMethod(Handle).Invoke(handler, new object[] { integrationEvent }); } } } }}