云南网站开发培训机构排行,软件app开发制作多少钱,湖南专业的关键词优化,好看的网站 你知道的2021MassTransit 是一个自由、开源、轻量级的消息总线基于.Net框架, 用于创建分布式应用程序。方便搭建基于消息的松耦合异步通信的应用程序和服务。MassTransit 在现有消息传输上提供了一组广泛的功能, 从而使开发人员能够友好地使用基于消息的会话模式异步连接服务。基于消息的通… MassTransit 是一个自由、开源、轻量级的消息总线基于.Net框架, 用于创建分布式应用程序。方便搭建基于消息的松耦合异步通信的应用程序和服务。MassTransit 在现有消息传输上提供了一组广泛的功能, 从而使开发人员能够友好地使用基于消息的会话模式异步连接服务。基于消息的通信是实现面向服务的体系结构的可靠和可扩展的方式。官网地址http://masstransit-project.com/发布订阅模式这种场景十分常见发送一个消息(或事件)到消息队列中有一个或是多个订阅方对预期的消息接收处理。基于需要搭建了两个WebApi程序用于模拟发送方和订阅方其中的RabbitMQ已预先搭建好了只在程序中引用包配置下即可。PackageReference IncludeMassTransit Version7.2.0 /
PackageReference IncludeMassTransit.AspNetCore Version7.2.0 /
PackageReference IncludeMassTransit.RabbitMQ Version7.2.0 /
发布端配置在Startup中增加MassTransit需要的服务及初始化配置。对RabbitMQ的连接地址端口、虚拟主机、访问账号密码等系列配置。对发送方需要发送的消息初始化一个请求客户端配置请求信息及推送到MQ的地址。services.AddMassTransit(x
{x.UsingRabbitMq((context, cfg) {cfg.Host(Configuration[RabbitmqConfig:HostIP], ushort.Parse(Configuration[RabbitmqConfig:HostPort]), Configuration[RabbitmqConfig:VirtualHost], h {h.Username(Configuration[RabbitmqConfig:Username]);h.Password(Configuration[RabbitmqConfig:Password]);});});x.AddRequestClientValueEntered(new Uri(GetServiceAddress(events-valueentered)));
});
services.AddMassTransitHostedService();
为了快速了解使用Controller在Action中发起对MQ的消息推送[ApiController]
[Route([controller])]
public class ValueController : ControllerBase
{readonly IPublishEndpoint _publishEndpoint;public ValueController(IPublishEndpoint publishEndpoint){_publishEndpoint publishEndpoint;}[HttpPost]public async TaskActionResult Post(string value){await _publishEndpoint.PublishValueEntered(new{Value value});return Ok();}
}
订阅端配置订阅端也创建一个WebApi应用在Startup中增加MassTransit的服务使用到的Nuget包和发布端一样。对RabbitMQ的连接地址端口、虚拟主机、访问账号密码等系列配置。为订阅端增加一个订阅处理的Handler即如下的ValueEnteredEventConsumer增加一个接受点指定队列名称即发送端发送的队列名称设置该队列消费处理的Consumer即ValueEnteredEventConsumerservices.AddMassTransit(x
{x.AddConsumerValueEnteredEventConsumer();x.UsingRabbitMq((context, cfg) {cfg.Host(Configuration[RabbitmqConfig:HostIP], ushort.Parse(Configuration[RabbitmqConfig:HostPort]), Configuration[RabbitmqConfig:VirtualHost], h {h.Username(Configuration[RabbitmqConfig:Username]);h.Password(Configuration[RabbitmqConfig:Password]);});cfg.ReceiveEndpoint(events-valueentered, e {e.ConfigureConsumerValueEnteredEventConsumer(context);});});
});
services.AddMassTransitHostedService();
如此一来通过Postman发送一个请求经发布端发布一个消息到RabbitMQ订阅端侦听消息处理消息一切都很熟悉。请求响应模式在发布订阅的基础上改变以往的习惯当发布一个消息后等待订阅方的处理并将消息推送回RabbitMQ发送方接受到处理后的消息继续执行。请求端在Startup中新加上一个用于发送消息(CheckOrderStatus)的请求客户端及指定消息队列名称(为每一个消息创建一个单独的队列)。x.AddRequestClientCheckOrderStatus(new Uri(GetServiceAddress(events-checkorderstatus)));
增加一个Controller及Action来请求及获取处理结果(OrderStatusResult)。[ApiController]
[Route([controller])]
public class OrderController : ControllerBase
{private readonly IRequestClientCheckOrderStatus _client;public OrderController(IRequestClientCheckOrderStatus client){_client client;}public async TaskOrderStatusResult Get(string id){var response await _client.GetResponseOrderStatusResult(new { OrderId id });return response.Message;}
}
响应端同样在响应端Startup中对新的消息设置下消息侦听队列以及相应的Handler如下的ValueEnteredEventConsumer去消费消息并返回处理结果。x.AddConsumerCheckOrderStatusConsumer ();
x.UsingRabbitMq((context, cfg)
{// ...cfg.ReceiveEndpoint(events-checkorderstatus, e {e.ConfigureConsumerCheckOrderStatusConsumer (context);});
});
Consumer中获取请求参数执行请求返回执行结果。public class CheckOrderStatusConsumer : IConsumerCheckOrderStatus
{public async Task Consume(ConsumeContextCheckOrderStatus context){if (context.Message.OrderId 9527){throw new InvalidOperationException(Order not found);}Console.WriteLine($OrderId:{context.Message.OrderId});await context.RespondAsyncOrderStatusResult(new{OrderId context.Message.OrderId,Timestamp Guid.NewGuid().ToString(),StatusCode 1,StatusText Close});}
}
这样一来当请求端发起一个消息(事件)到RabbitMQ响应端侦听并处理完毕返回处理结果到RabbitMQ请求端依照响应结果继续执行后续请求。HTTP方式差异与以往的Http请求方式有所不同通过httpClient.PostAsync发送请求接收端处理并返回结果而走requestClient发送请求到RabbitMQ再由RabbitMQ推送到侦听节点消费并返回结果如下第一二部分结构。