重庆企业网站开发,数字营销证书,网络营销是什么时候出现的,项目管理系统软件开发0、引言 先决条件 本教程假设 RabbitMQ 已安装并且正在 本地主机 的标准端口#xff08;5672#xff09;上运行。如果您使用了不同的主机、端口或凭证#xff0c;则要求调整连接设置。 获取帮助 如果您在阅读本教程时遇到问题#xff0c;可以通过邮件列表或者 RabbitMQ 社区…0、引言 先决条件 本教程假设 RabbitMQ 已安装并且正在 本地主机 的标准端口5672上运行。如果您使用了不同的主机、端口或凭证则要求调整连接设置。 获取帮助 如果您在阅读本教程时遇到问题可以通过邮件列表或者 RabbitMQ 社区 Slack 与 RabbitMQ 官方取得联系。 在上一篇教程中我们构建了一个简单的日志系统得以向许多接收者receiver广播日志消息。
在本教程中我们将会为该系统添加一个特性 —— 我们将使“仅订阅消息的一个子集”成为可能。例如我们将能够只将关键错误消息定向到日志文件以节省磁盘空间与此同时仍然能够在控制台上打印所有的日志消息。 原文链接https://www.rabbitmq.com/tutorials/tutorial-four-dotnet.html 1、绑定
在上一篇例程中我们已经建立过绑定了。您也许能够回想起代码类似于
channel.QueueBind(queue: queueName,exchange: logs,routingKey: string.Empty);绑定 即表征 交换机 与 队列 之间的关系。这可以简单地理解为队列对来自此交换机的消息感兴趣。
绑定可以接收一个额外的 routingKey 路由键参数。为了避免与 BasicPublish 方法的一个参数混淆我们现在将其称之为 binding key 绑定键。下面演示了我们如何创建一个带有键的绑定
channel.QueueBind(queue: queueName,exchange:direct_logs,routingKey: black);binding key 的含义取决于交换机的类型。比如我们之前使用的 fanout 交换机会简单地忽略它的值。
2、直连交换机Direct exchange
在上一篇教程中我们的日志系统向所有的消费者广播所有消息。我们希望对其进行拓展以允许根据消息的严重程度过滤消息。比如我们也许想要向磁盘写入日志消息的脚本文件只接收关键错误而不是在警告或者信息日志消息上浪费磁盘空间。
但我们在上一篇教程使用的 fanout 扇出交换机并没有给我们如此大的灵活性 —— 它只能够进行无意识的广播。
相反我们将会使用一个 direct 直连交换机。其背后的路由算法很简单 —— 将消息路由到其 routing key 与队列的 binding key 完全匹配的队列。
为了说明这一点请考虑如下配置 在这个配置中我们可以看到绑定了两个队列的 direct 直连交换机 X。第一个队列使用 orange 绑定键绑定而第二个队列拥有两个绑定一个绑定键为 black另一个绑定键为 green。
在这样的配置下被发布到交换机中带有 orange 路由键的消息将会被路由到 Q1 队列。而带有 black 或者 green 路由键的消息将会去往 Q2 队列。其他所有消息将会被丢弃。
3、多个绑定 使用相同的绑定键绑定多个队列是完全合法的。在我们的示例中我们可以使用 black 绑定键在 X 与 Q1 之间添加绑定。在那样的情况下direct 直连交换机表现得像是 fanout 扇出交换机向所有匹配的队列广播消息。带有 black 路由键的消息将同时传递给 Q1 和 Q2 双方。
4、发送日志
我们将在日志系统中使用这个模型我们将消息发送至 direct 直连交换机而不是 fanout 扇出交换机。我们提供日志严重程序作为 routing key 路由键。这样接收脚本就能够选择其想要接收的严重程度。让我们首先关注发送日志
如常首先我们需要创建一个交换机
channel.ExchangeDeclare(exchange: direct_logs, type: ExchangeType.Direct);我们已经准备好发送一条消息
var body Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: direct_logs,routingKey: severity,basicProperties: null,body: body);为了简化问题我们假定 ‘严重程度’ 可以是 info、warning、error 中的一个。
5、订阅
接收消息的方式工作与上一篇教程类似但有一点例外 —— 我们将会为我们感兴趣的每个 ‘严重程度’ 创建新的绑定。
var queueName channel.QueueDeclare().QueueName;foreach(var severity in args)
{channel.QueueBind(queue: queueName,exchange: direct_logs,routingKey: severity);
}6、将所有的东西放到一起 EmitLogDirect.cs 类的代码
using System.Text;
using RabbitMQ.Client;var factory new ConnectionFactory { HostName localhost };
using var connection factory.CreateConnection();
using var channel connection.CreateModel();channel.ExchangeDeclare(exchange: direct_logs, type: ExchangeType.Direct);var severity (args.Length 0) ? args[0] : info;
var message (args.Length 1)? string.Join( , args.Skip(1).ToArray()): Hello World!;
var body Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: direct_logs,routingKey: severity,basicProperties: null,body: body);
Console.WriteLine($ [x] Sent {severity}:{message});Console.WriteLine( Press [enter] to exit.);
Console.ReadLine();ReceiveLogsDirect.cs 的代码
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;var factory new ConnectionFactory { HostName localhost };using var connection factory.CreateConnection();
using var channel connection.CreateModel();channel.ExchangeDeclare(exchange: direct_logs, type: ExchangeType.Direct);
// declare a server-named queue
var queueName channel.QueueDeclare().QueueName;if (args.Length 1)
{Console.Error.WriteLine(Usage: {0} [info] [warning] [error],Environment.GetCommandLineArgs()[0]);Console.WriteLine( Press [enter] to exit.);Console.ReadLine();Environment.ExitCode 1;return;
}foreach (var severity in args)
{channel.QueueBind(queue: queueName,exchange: direct_logs,routingKey: severity);
}Console.WriteLine( [*] Waiting for messages.);var consumer new EventingBasicConsumer(channel);
consumer.Received (model, ea)
{var body ea.Body.ToArray();var message Encoding.UTF8.GetString(body);var routingKey ea.RoutingKey;Console.WriteLine($ [x] Received {routingKey}:{message});
};
channel.BasicConsume(queue: queueName,autoAck: true,consumer: consumer);Console.WriteLine( Press [enter] to exit.);
Console.ReadLine();像往常一样创建项目建议参考教程一
如果您只想要保存 warning 和 error没有 info日志消息到文件中只需要打开一个控制台并输入
cd ReceiveLogsDirect
dotnet run warning error logs_from_rabbit.log如果您想要在屏幕上看见所有的日志消息打开一个新的终端并尝试
cd ReceiveLogsDirect
dotnet run info warning error
# [*] Waiting for logs. To exit press CTRLC例如要发送一个 error 日志消息只需要输入
cd EmitLogDirect
dotnet run error Run. Run. Or it will explode.
# [x] Sent error:Run. Run. Or it will explode.
EmitLogDirect.cs 和 ReceiveLogsDirect.cs 的完整源码
运行效果
要了解如何基于特定模式侦听消息请移步至教程五。
5、生产[非]适用性免责声明
请记住本教程和其他教程都是教程。他们一次展示一个新概念可能会有意地过度简化一些东西而忽略其他东西。例如为了简洁起见连接管理、错误处理、连接恢复、并发性和指标收集等主题在很大程度上被省略了。这种简化的代码不应该被认为可以用于生产。
在发布您的应用之前请先查看其他文档。我们特别推荐以下指南发布者确认和消费者确认生产清单和监控。