特产网站建设的目的,哪个网站做视频钱多,国外素材网站推荐,让互联网之光点亮生活RabbitMQ简介AMQP#xff0c;即Advanced Message Queuing Protocol#xff0c;高级消息队列协议#xff0c;是应用层协议的一个开放标准#xff0c;为面向消息的中间件设计。消息中间件主要用于组件之间的解耦#xff0c;消息的发送者无需知道消息使用者的存在#xff0c… RabbitMQ简介AMQP即Advanced Message Queuing Protocol高级消息队列协议是应用层协议的一个开放标准为面向消息的中间件设计。消息中间件主要用于组件之间的解耦消息的发送者无需知道消息使用者的存在反之亦然。AMQP的主要特征是面向消息、队列、路由包括点对点和发布/订阅、可靠性、安全。RabbitMQ是一个开源的AMQP实现服务器端用Erlang语言编写支持多种客户端如Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等支持AJAX。用于在分布式系统中存储转发消息在易用性、扩展性、高可用性等方面表现不俗。RabbitMQ提供了可靠的消息机制、跟踪机制和灵活的消息路由支持消息集群和分布式部署。适用于排队算法、秒杀活动、消息分发、异步处理、数据同步、处理耗时任务、CQRS等应用场景。RabbitMQ安装RabbitMQ安装网上已经有许多教程了这里简单介绍一下在CentOS下安装RabbitMQ。使用的版本为3.6.12最新版。1.首先安装erlangrpm -Uvh https://www.rabbitmq.com/releases/erlang/erlang-19.0.4-1.el7.centos.x86_64.rpm2.然后安装socatyun install socat3.最后安装RabbitMQrpm -Uvh https://www.rabbitmq.com/releases/rabbitmq-server/v3.6.12/rabbitmq-server-3.6.12-1.el7.noarch.rpmRabbitMQ常用命令启用Web控制台rabbitmq-plugins enable rabbitmq_management开启服务systemctl start rabbitmq-server.service停止服务systemctl stop rabbitmq-server.service查看服务状态systemctl status rabbitmq-server.service查看RabbitMQ状态rabbitmqctl status添加用户赋予管理员权限rabbitmqctl add_user username password
rabbitmqctl set_user_tags username administrator查看用户列表rabbitmqctl list_users删除用户rabbitmqctl delete_user username修改用户密码rabbitmqctl oldPassword Username newPassword访问Web控制台http://服务器ip:15672/ 注意配置防火墙默认用户名密码都是guest若新建用户一定要记得配置权限。.NET Core 使用RabbitMQ定义生产者//创建连接工厂ConnectionFactory factory new ConnectionFactory
{UserName admin,//用户名Password admin,//密码HostName 192.168.157.130//rabbitmq ip};//创建连接var connection factory.CreateConnection();//创建通道var channel connection.CreateModel();//声明一个队列channel.QueueDeclare(hello, false, false, false, null);Console.WriteLine(\nRabbitMQ连接成功请输入消息输入exit退出);string input;do{input Console.ReadLine(); var sendBytes Encoding.UTF8.GetBytes(input); //发布消息channel.BasicPublish(, hello, null, sendBytes);} while (input.Trim().ToLower()!exit);
channel.Close();
connection.Close();定义消费者 //创建连接工厂ConnectionFactory factory new ConnectionFactory{UserName admin,//用户名Password admin,//密码HostName 192.168.157.130//rabbitmq ip}; //创建连接var connection factory.CreateConnection(); //创建通道var channel connection.CreateModel(); //事件基本消费者EventingBasicConsumer consumer new EventingBasicConsumer(channel); //接收到消息事件consumer.Received (ch, ea) { var message Encoding.UTF8.GetString(ea.Body);Console.WriteLine($收到消息 {message}); //确认该消息已被消费channel.BasicAck(ea.DeliveryTag, false);}; //启动消费者 设置为手动应答消息channel.BasicConsume(hello, false, consumer);Console.WriteLine(消费者已启动);Console.ReadKey();channel.Dispose();connection.Close();运行启动了一个生产者两个消费者可以看见两个消费者都能收到消息消息投递到哪个消费者是由RabbitMQ决定的。RabbitMQ消费失败的处理RabbitMQ采用消息应答机制即消费者收到一个消息之后需要发送一个应答然后RabbitMQ才会将这个消息从队列中删除如果消费者在消费过程中出现异常断开连接切没有发送应答那么RabbitMQ会将这个消息重新投递。修改一下消费者的代码//接收到消息事件consumer.Received (ch, ea)
{ var message Encoding.UTF8.GetString(ea.Body);Console.WriteLine($收到消息 {message});Console.WriteLine($收到该消息[{ea.DeliveryTag}] 延迟10s发送回执);Thread.Sleep(10000); //确认该消息已被消费channel.BasicAck(ea.DeliveryTag, false);Console.WriteLine($已发送回执[{ea.DeliveryTag}]);
};演示从图中可以看出设置了消息应答延迟10s如果在这10s中该消费者断开了连接那么消息会被RabbitMQ重新投递。使用RabbitMQ的Exchange前面我们可以看到生产者将消息投递到Queue中实际上这在RabbitMQ中这种事情永远都不会发生。实际的情况是生产者将消息发送到Exchange交换器由Exchange将消息路由到一个或多个Queue中或者丢弃AMQP协议中的核心思想就是生产者和消费者隔离生产者从不直接将消息发送给队列。生产者通常不知道是否一个消息会被发送到队列中只是将消息发送到一个交换机。先由Exchange来接收然后Exchange按照特定的策略转发到Queue进行存储。同理消费者也是如此。Exchange 就类似于一个交换机转发各个消息分发到相应的队列中。RabbitMQ提供了四种Exchange模式direct,fanout,topic,header 。但是 header模式在实际使用中较少所以这里只介绍前三种模式。Exchange不是消费者关心的所以消费者的代码完全不用变用上面的消费者就行了。由于避免文章过长影响阅读所以只贴了部分代码但是demo里面是完整可运行的详细代码请查看demo。Direct Exchange所有发送到Direct Exchange的消息被转发到具有指定RouteKey的Queue。Direct模式,可以使用rabbitMQ自带的Exchangedefault Exchange 。所以不需要将Exchange进行任何绑定(binding)操作 。消息传递时RouteKey必须完全匹配才会被队列接收否则该消息会被抛弃。//创建连接var connection factory.CreateConnection();//创建通道var channel connection.CreateModel();//定义一个Direct类型交换机channel.ExchangeDeclare(exchangeName, ExchangeType.Direct, false, false, null);//定义一个队列channel.QueueDeclare(queueName, false, false, false, null);//将队列绑定到交换机channel.QueueBind(queueName, exchangeName, routeKey, null);运行Fanout Exchange所有发送到Fanout Exchange的消息都会被转发到与该Exchange 绑定(Binding)的所有Queue上。Fanout Exchange 不需要处理RouteKey 。只需要简单的将队列绑定到exchange 上。这样发送到exchange的消息都会被转发到与该交换机绑定的所有队列上。类似子网广播每台子网内的主机都获得了一份复制的消息。所以Fanout Exchange 转发消息是最快的。为了演示效果定义了两个队列分别为hello1hello2每个队列都拥有一个消费者。static void Main(string[] args){ string exchangeName TestFanoutChange; string queueName1 hello1; string queueName2 hello2; string routeKey ; //创建连接工厂ConnectionFactory factory new ConnectionFactory{UserName admin,//用户名Password admin,//密码HostName 192.168.157.130//rabbitmq ip}; //创建连接var connection factory.CreateConnection(); //创建通道var channel connection.CreateModel(); //定义一个Direct类型交换机channel.ExchangeDeclare(exchangeName, ExchangeType.Fanout, false, false, null); //定义队列1channel.QueueDeclare(queueName1, false, false, false, null); //定义队列2channel.QueueDeclare(queueName2, false, false, false, null); //将队列绑定到交换机channel.QueueBind(queueName1, exchangeName, routeKey, null);channel.QueueBind(queueName2, exchangeName, routeKey, null); //生成两个队列的消费者ConsumerGenerator(queueName1);ConsumerGenerator(queueName2);Console.WriteLine($\nRabbitMQ连接成功\n\n请输入消息输入exit退出); string input; do{input Console.ReadLine(); var sendBytes Encoding.UTF8.GetBytes(input); //发布消息channel.BasicPublish(exchangeName, routeKey, null, sendBytes);} while (input.Trim().ToLower() ! exit);channel.Close();connection.Close();
}/// summary/// 根据队列名称生成消费者/// /summary/// param namequeueName/paramstatic void ConsumerGenerator(string queueName){ //创建连接工厂ConnectionFactory factory new ConnectionFactory{UserName admin,//用户名Password admin,//密码HostName 192.168.157.130//rabbitmq ip}; //创建连接var connection factory.CreateConnection(); //创建通道var channel connection.CreateModel(); //事件基本消费者EventingBasicConsumer consumer new EventingBasicConsumer(channel); //接收到消息事件consumer.Received (ch, ea) { var message Encoding.UTF8.GetString(ea.Body);Console.WriteLine($Queue:{queueName}收到消息 {message}); //确认该消息已被消费channel.BasicAck(ea.DeliveryTag, false);}; //启动消费者 设置为手动应答消息channel.BasicConsume(queueName, false, consumer);Console.WriteLine($Queue:{queueName}消费者已启动);
}运行Topic Exchange所有发送到Topic Exchange的消息被转发到能和Topic匹配的Queue上Exchange 将路由进行模糊匹配。可以使用通配符进行模糊匹配符号“#”匹配一个或多个词符号“”匹配不多不少一个词。因此“XiaoChen.#”能够匹配到“XiaoChen.pets.cat”但是“XiaoChen.” 只会匹配到“XiaoChen.money”。所以Topic Exchange 使用非常灵活。string exchangeName TestTopicChange;string queueName hello;string routeKey TestRouteKey.*;//创建连接工厂ConnectionFactory factory new ConnectionFactory
{UserName admin,//用户名Password admin,//密码HostName 192.168.157.130//rabbitmq ip};//创建连接var connection factory.CreateConnection();//创建通道var channel connection.CreateModel();//定义一个Direct类型交换机channel.ExchangeDeclare(exchangeName, ExchangeType.Topic, false, false, null);//定义队列1channel.QueueDeclare(queueName, false, false, false, null);//将队列绑定到交换机channel.QueueBind(queueName, exchangeName, routeKey, null);
Console.WriteLine($\nRabbitMQ连接成功\n\n请输入消息输入exit退出);string input;do{input Console.ReadLine(); var sendBytes Encoding.UTF8.GetBytes(input); //发布消息channel.BasicPublish(exchangeName, TestRouteKey.one, null, sendBytes);} while (input.Trim().ToLower() ! exit);
channel.Close();
connection.Close();运行Demo下载DotNetCore.RabbitMQ相关文章 .net core 使用Redis的发布订阅RabbitMQ知多少RabbitMQ系列教程之四路由RoutingRabbitMQ系列教程之三发布/订阅Publish/SubscribeRabbitMQ系列教程之二工作队列Work Queues如何优雅的使用RabbitMQ.NET 使用 RabbitMQ 图文简介RabbitMQ 高可用集群搭建及电商平台使用经验总结原文地址http://www.cnblogs.com/stulzq/p/7551819.html.NET社区新闻深度好文微信中搜索dotNET跨平台或扫描二维码关注