阜阳哪里做网站,南昌专业网站建设信息,一个公司只能备案一个网站吗,搜索引擎推广一般包括哪些简介
#x1f340;RabbitMQ中的工作队列模式是指将任务分配给多个消费者并行处理。在工作队列模式中#xff0c;生产者将任务发送到RabbitMQ交换器#xff0c;然后交换器将任务路由到一个或多个队列。消费者从队列中获取任务并进行处理。处理完成后#xff0c;消费者可以向…简介
RabbitMQ中的工作队列模式是指将任务分配给多个消费者并行处理。在工作队列模式中生产者将任务发送到RabbitMQ交换器然后交换器将任务路由到一个或多个队列。消费者从队列中获取任务并进行处理。处理完成后消费者可以向RabbitMQ发送一个确认消息表示任务已完成。
优点
工作队列模式的主要优点是能够实现负载均衡和并行处理。通过将任务分配给多个消费者可以提高系统的处理能力和吞吐量。此外工作队列模式还具有很好的扩展性可以根据需要动态添加或删除消费者。 任务流程
生产者Producer将任务发送到RabbitMQ交换器Exchange。交换器根据路由键Routing Key将任务路由到一个或多个队列Queue。消费者Consumer从队列中获取任务并进行处理。处理完成后消费者向RabbitMQ发送一个确认消息表示任务已完成。
生产者代码
在这个代码中我们声明消息队列时第二个参数设置为true表示这个队列是持久化的。接着使用while做一个循环不断读取用户输入的消息内容然后将其转换为字节数组后发布到hello队列中。
class MyClass
{public static void Main(string[] args){var factory new ConnectionFactory();factory.HostName localhost; //RabbitMQ服务在本地运行factory.UserName guest; //用户名factory.Password guest; //密码//创建连接using (var connection factory.CreateConnection()){//创建通道using (var channel connection.CreateModel()){//声明一个名称为hello的消息队列channel.QueueDeclare(hello, true, false, false, null);string msg null;int i 1;Console.WriteLine(请输入要发送的消息内容);while (!string.IsNullOrEmpty(msg Console.ReadLine())){string message $Hello {msg} ! i; //传递的消息内容var body Encoding.UTF8.GetBytes(message);//此处的参数hello 就对应的就是上面声明的消息队列的路由键channel.BasicPublish(, hello, null, body); //开始传递Console.WriteLine(已发送 {0}, message);}}}}
}
消费者代码
这里最关键的一行代码就是channel.BasicQos(0, 1, false);BasicQos方法用于设置消费者的预取计数prefetch count。消费者从队列中获取消息的方式是通过预取计数来控制的。预取计数决定了消费者在没有发送确认信号的情况下可以同时处理多少条未确认的消息。
在Channel.BasicQos()方法中三个参数作用如下
prefetchSize这个参数表示每次从队列中获取的消息的最大大小单位是字节。设置为0表示没有限制。prefetchCount这个参数表示每个消费者同时可以处理的最大未确认消息的数量。设置为1表示每个消费者只能处理一个未确认消息。global这个布尔值表示是否将这两个参数应用于所有的消费者。如果设置为true则这两个参数将应用于所有的消费者如果设置为false则这两个参数仅适用于当前的消费者。
channel.BasicQos(0, 1, false);这行代码设置了消费者的预取计数为1。这意味着消费者在没有发送确认信号的情况下最多只会处理一条未确认的消息。
这样可以提高消费者处理消息的效率因为消费者不需要等待其他消费者发送确认信号后再处理消息。这样可以在一定程度上提高系统的吞吐量。
class MyClass
{static void Main(string[] args){//创建连接工厂var factory new ConnectionFactory();factory.HostName localhost;factory.UserName guest;factory.Password guest;//创建连接using (var connection factory.CreateConnection()){//创建通道using (var channel connection.CreateModel()){//声明队列channel.QueueDeclare(hello, true, false, false, null);channel.BasicQos(0, 1, false);//事件的基本消费者var consumer new EventingBasicConsumer(channel);consumer.Received (model, ea) {var body ea.Body.ToArray();var message Encoding.UTF8.GetString(body);//这里加上睡眠时间模拟耗时任务Thread.Sleep(1000);Console.WriteLine(已接收 {0}, message);//发送消息确认信号手动确认 channel.BasicAck(ea.DeliveryTag,false);};//当 autoAck设置为true时也就是自动确认模式一旦消息队列将消息发送给消息消费者后就会从内存中将这个消息删除。//当autoAck设置为false时也就是手动模式如果此时的有一个消费者宕机消息队列就会将这条消息继续发送给其他的消费者这样数据在消息消费者集群的环境下就不会不丢失了。channel.BasicConsume(hello, false, consumer);Console.ReadKey();}}}
}
代码演示
首先我们将消费者代码发布到本地文件夹中 发布完成后我们找到打包好的程序集双击两次.exe文件运行两个消费者 接着我们运行生产者代码在控制台随意发送6条消息。 再回到我们刚刚运行的两个消费者程序可以看到 消息被分发给两个消费者了