为网站优势,即墨做网站的,免费虚拟机安卓版,台州建设质量监督网站文章目录 前言一、WorkQueues模型消息发送消息接收能者多劳 二、交换机类型1.Fanout交换机消息发送消息接收 2.Direct交换机消息接收消息发送 3.Topic交换机消息发送消息接收 三、编程式声明队列和交换机fanout示例direct示例基于注解 四、消息转换器总结 前言
WorkQueues模型… 文章目录 前言一、WorkQueues模型消息发送消息接收能者多劳 二、交换机类型1.Fanout交换机消息发送消息接收 2.Direct交换机消息接收消息发送 3.Topic交换机消息发送消息接收 三、编程式声明队列和交换机fanout示例direct示例基于注解 四、消息转换器总结 前言
WorkQueues模型、Fanout交换机、Direct交换机、Topic交换机、基于SpringBoot注解声明队列和交换机、消息转换器。 一、WorkQueues模型 Work queues任务模型。简单来说就是让多个消费者绑定到一个队列共同消费队列中的消息。当消息处理比较耗时的时候可能生产消息的速度会远远大于消息的消费速度。长此以往消息就会堆积越来越多无法及时处理。此时就可以使用work 模型多个消费者共同处理消息处理消息处理的速度就能大大提高了。 在控制台创建一个work.queue的队列
消息发送 循环发送模拟大量消息堆积现象。 /*** workQueue* 向队列中不停发送消息模拟消息堆积。*/
Test
public void testWorkQueue() throws InterruptedException {// 队列名称String queueName simple.queue;// 消息String message hello, message_;for (int i 0; i 50; i) {// 发送消息每20毫秒发送一次相当于每秒发送50条消息rabbitTemplate.convertAndSend(queueName, message i);Thread.sleep(20);}
}消息接收 模拟多个消费者绑定同一个队列我们添加2个方法并且设置不同睡眠时间模拟不同性能读取 RabbitListener(queues work.queue)
public void listenWorkQueue1(String msg) throws InterruptedException {System.out.println(消费者1接收到消息【 msg 】 LocalTime.now());Thread.sleep(20);
}RabbitListener(queues work.queue)
public void listenWorkQueue2(String msg) throws InterruptedException {System.err.println(消费者2........接收到消息【 msg 】 LocalTime.now());Thread.sleep(200);
}消费者1很快完成了自己的25条消息消费者2却在缓慢的处理自己的25条消息。 也就是说消息是平均分配给每个消费者并没有考虑到消费者的处理能力。导致1个消费者空闲另一个消费者忙的不可开交。没有充分利用每一个消费者的能力最终消息处理的耗时远远超过了1秒。这样显然是有问题的。 能者多劳
在spring中有一个简单的配置可以解决这个问题。我们修改consumer服务的application.yml文件添加配置
spring:rabbitmq:listener:simple:prefetch: 1 # 每次只能获取一条消息处理完成才能获取下一个消息再次测试发现结果如下 可以发现由于消费者1处理速度较快所以处理了更多的消息消费者2处理速度较慢只处理了6条消息。而最终总的执行耗时也在1秒左右大大提升。 正所谓能者多劳这样充分利用了每一个消费者的处理能力可以有效避免消息积压问题。 Work模型的使用
多个消费者绑定到一个队列同一条消息只会被一个消费者处理通过设置prefetch来控制消费者预取的消息数量
二、交换机类型
1.Fanout交换机
发送流程
可以有多个队列每个队列都要绑定到Exchange交换机生产者发送的消息只能发送到交换机交换机把消息发送给绑定过的所有队列订阅队列的消费者都能拿到消息
在控制台创建fanout.queue1、fanoutqueue2队列和dragon.fanout交换机并将队列绑定到交换机 消息发送
Test
public void testFanoutExchange() {// 交换机名称String exchangeName dragon.fanout;// 消息String message hello, everyone!;rabbitTemplate.convertAndSend(exchangeName, , message);
}消息接收
RabbitListener(queues fanout.queue1)
public void listenFanoutQueue1(String msg) {System.out.println(消费者1接收到Fanout消息【 msg 】);
}RabbitListener(queues fanout.queue2)
public void listenFanoutQueue2(String msg) {System.out.println(消费者2接收到Fanout消息【 msg 】);
}交换机的作用是什么
接收publisher发送的消息将消息按照规则路由到与之绑定的队列不能缓存消息路由失败消息丢失FanoutExchange的会将消息路由到每个绑定的队列
2.Direct交换机 在Fanout模式中一条消息会被所有订阅的队列都消费。但是在某些场景下我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。 在Direct模型下
队列与交换机的绑定不能是任意绑定了而是要指定一个RoutingKey路由key消息的发送方在 向 Exchange发送消息时也必须指定消息的 RoutingKey。Exchange不再把消息交给每一个绑定的队列而是根据消息的Routing Key进行判断只有队列的Routingkey与消息的 Routing key完全一致才会接收到消息
在控制台声明两个队列direct.queue1和direct.queue2然后声明一个direct类型的交换机绑定队列和交换机使用red和blue作为key绑定direct.queue1到dragon.direct使用red和yellow作为key绑定direct.queue2到dragon.direct 消息接收
RabbitListener(queues direct.queue1)
public void listenDirectQueue1(String msg) {System.out.println(消费者1接收到direct.queue1的消息【 msg 】);
}RabbitListener(queues direct.queue2)
public void listenDirectQueue2(String msg) {System.out.println(消费者2接收到direct.queue2的消息【 msg 】);
}消息发送
Test
public void testSendDirectExchange() {// 交换机名称String exchangeName dragon.direct;// 消息String message 红色警报日本乱排核废水导致海洋生物变异惊现哥斯拉;// 发送消息rabbitTemplate.convertAndSend(exchangeName, red, message);
}绑定红色的都会接收到信息
描述下Direct交换机与Fanout交换机的差异
Fanout交换机将消息路由给每一个与之绑定的队列Direct交换机根据RoutingKey判断路由给哪个队列如果多个队列具有相同的RoutingKey则与Fanout功能类似
3.Topic交换机 Topic类型的Exchange与Direct相比都是可以根据RoutingKey把消息路由到不同的队列。 只不过Topic类型Exchange可以让队列在绑定BindingKey 的时候使用通配符BindingKey 一般都是有一个或多个单词组成多个单词之间以.分割例如 item.insert通配符规则 #匹配一个或多个词*匹配不多不少恰好1个词举例 dragon.#能够匹配dragon.stu.insert 或者 dragon.stu dragon.*只能匹配dragon.stu 假如此时publisher发送的消息使用的RoutingKey共有四种
china.news 代表有中国的新闻消息china.weather 代表中国的天气消息japan.news 则代表日本新闻japan.weather 代表日本的天气消息
解释
topic.queue1绑定的是china.# 凡是以 china.开头的routing key 都会被匹配到包括 china.newschina.weather topic.queue2绑定的是#.news 凡是以 .news结尾的 routing key 都会被匹配。包括: china.newsjapan.news 消息发送
/*** topicExchange*/
Test
public void testSendTopicExchange() {// 交换机名称String exchangeName dragon.topic;// 消息String message 喜报孙悟空大战哥斯拉胜!;// 发送消息rabbitTemplate.convertAndSend(exchangeName, china.news, message);
}消息接收
RabbitListener(queues topic.queue1)
public void listenTopicQueue1(String msg){System.out.println(消费者1接收到topic.queue1的消息【 msg 】);
}RabbitListener(queues topic.queue2)
public void listenTopicQueue2(String msg){System.out.println(消费者2接收到topic.queue2的消息【 msg 】);
}描述下Direct交换机与Topic交换机的差异
Topic交换机接收的消息RoutingKey必须是多个单词以 . 分割Topic交换机与队列绑定时的bindingKey可以指定通配符#代表0个或多个词*代表1个词
三、编程式声明队列和交换机 SpringAMQP提供了一个Queue类用来创建队列 fanout示例
Configuration
public class FanoutConfiguration {/*** 声明交换机* return Fanout类型交换机*/Beanpublic FanoutExchange fanoutExchange(){
// 还可以是 return ExchangeBuilder.fanoutExchange(dragon.fanout).build();return new FanoutExchange(dragon.fanout);}/*** 第1个队列*/Beanpublic Queue fanoutQueue1(){
// 还可以是 return QueueBuilder.durable(fanout.queue1).build();return new Queue(fanout.queue1);}/*** 绑定队列和交换机*/Beanpublic Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}/*** 第2个队列*/Beanpublic Queue fanoutQueue2(){return new Queue(fanout.queue2);}/*** 绑定队列和交换机*/Beanpublic Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}
//绑定队列和交换机的另一方法
// Bean
// public Binding bindingQueue2(){
// return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());
// }
}direct示例
Configuration
public class DirectConfiguration {/*** 声明交换机* return Direct类型交换机*/Beanpublic DirectExchange directExchange(){return ExchangeBuilder.directExchange(dragon.direct).build();}/*** 第1个队列*/Beanpublic Queue directQueue1(){return new Queue(direct.queue1);}/*** 绑定队列和交换机*/Beanpublic Binding bindingQueue1WithRed(Queue directQueue1, DirectExchange directExchange){return BindingBuilder.bind(directQueue1).to(directExchange).with(red);}/*** 绑定队列和交换机*/Beanpublic Binding bindingQueue1WithBlue(Queue directQueue1, DirectExchange directExchange){return BindingBuilder.bind(directQueue1).to(directExchange).with(blue);}/*** 第2个队列*/Beanpublic Queue directQueue2(){return new Queue(direct.queue2);}/*** 绑定队列和交换机*/Beanpublic Binding bindingQueue2WithRed(Queue directQueue2, DirectExchange directExchange){return BindingBuilder.bind(directQueue2).to(directExchange).with(red);}/*** 绑定队列和交换机*/Beanpublic Binding bindingQueue2WithYellow(Queue directQueue2, DirectExchange directExchange){return BindingBuilder.bind(directQueue2).to(directExchange).with(yellow);}
}基于注解 上面的代码还是很多的基于注解的方式也能够代替上面的繁杂配置下面演示direct和topic交换机队列的代码 RabbitListener(bindings QueueBinding(value Queue(name direct.queue1),exchange Exchange(name dragon.direct, type ExchangeTypes.DIRECT),key {red, blue}
))
public void listenDirectQueue1(String msg){System.out.println(消费者1接收到direct.queue1的消息【 msg 】);
}RabbitListener(bindings QueueBinding(value Queue(name direct.queue2),exchange Exchange(name dragon.direct, type ExchangeTypes.DIRECT),key {red, yellow}
))
public void listenDirectQueue2(String msg){System.out.println(消费者2接收到direct.queue2的消息【 msg 】);
}RabbitListener(bindings QueueBinding(value Queue(name topic.queue1),exchange Exchange(name hmall.topic, type ExchangeTypes.TOPIC),key china.#
))
public void listenTopicQueue1(String msg){System.out.println(消费者1接收到topic.queue1的消息【 msg 】);
}RabbitListener(bindings QueueBinding(value Queue(name topic.queue2),exchange Exchange(name hmall.topic, type ExchangeTypes.TOPIC),key #.news
))
public void listenTopicQueue2(String msg){System.out.println(消费者2接收到topic.queue2的消息【 msg 】);
}四、消息转换器 随便创建一个队列然后发送Map对象你会发现消息格式很不友好 Test
public void testSendMap() throws InterruptedException {// 准备消息MapString,Object msg new HashMap();msg.put(name, 柳岩);msg.put(age, 21);// 发送消息rabbitTemplate.convertAndSend(object.queue, msg);
}控制台查看 Spring的消息发送代码接收的消息体是一个Object,而在数据传输时它会把你发送的消息序列化为字节发送给MQ接收消息的时候还会把字节反序列化为Java对象。只不过默认情况下Spring采用的序列化方式是JDK序列化。众所周知JDK序列化存在下列问题 数据体积过大 有安全漏洞 可读性差 根据上面测试显然JDK序列化方式并不合适。我们希望消息体的体积更小、可读性更高因此可以使用JSON方式来做序列化和反序列化。 j解决 引入依赖
dependencygroupIdcom.fasterxml.jackson.dataformat/groupIdartifactIdjackson-dataformat-xml/artifactIdversion2.9.10/version
/dependency注意如果项目中引入了spring-boot-starter-web依赖则无需再次引入Jackson依赖。其转换器配置。 添加配置
Bean
public MessageConverter messageConverter(){// 1.定义消息转换器Jackson2JsonMessageConverter jackson2JsonMessageConverter new Jackson2JsonMessageConverter();// 2.配置自动创建消息id用于识别不同消息也可以在业务中基于ID判断是否是重复消息jackson2JsonMessageConverter.setCreateMessageIds(true);return jackson2JsonMessageConverter;
}总结
以上就是所有讲解。