旅游区网站建设,php网站开发技术期末题库,广州推广策划公司,建立简单网站一、初始MQ
首先了解一下微服务间通讯有同步和异步两种方式#xff1a;- 同步通讯#xff1a;是指两个或多个系统在进行信息交换时#xff0c;必须在同一时刻进行操作
- 异步通讯#xff1a;是指两个或多个系统之间的通讯方式#xff0c;其中发送方和接收方不是在同一时刻…一、初始MQ
首先了解一下微服务间通讯有同步和异步两种方式- 同步通讯是指两个或多个系统在进行信息交换时必须在同一时刻进行操作
- 异步通讯是指两个或多个系统之间的通讯方式其中发送方和接收方不是在同一时刻进行操作。同步调用的优点- 时效性较强可以立即得到结果同步调用的缺点- 多个系统间耦合扩展及后续维护繁琐
- 性能和吞吐能力下降
- 有额外的资源消耗
- 有级联失败风险异步通讯
优势一服务解耦
优势二性能提升吞吐量提高
优势三服务没有强依赖不担心级联失败问题
优势四流量削峰缺点- 架构复杂了业务没有明显的流程线不好管理对程序员的技术要求高了
- 需要依赖于Broker的可靠、安全、性能搭建集群1. 技术对比
MQ中文是消息队列Message Queue字面来看就是存放消息的队列。
比较常见的MQ实现也被称为消息中间件:- ActiveMQ
- **RabbitMQ**
- **RocketMQ**
- Kafka几种常见MQ的对比对比RabbitMQActiveMQRocketMQKafka公司/社区RabbitApache阿里Apache开发语言ErlangJavaJavaScalaJava协议支持AMQPXMPP SMTPSTOMPOpenWire,STOMP REST,XMPP,AMQP自定义协议自定义协议可用性高一般高高单机吞吐量一般差高非常高消息延迟微秒级毫秒级毫秒级毫秒以内消息可靠性高一般高一般
选择原则- 追求可用性Kafka、 RocketMQ 、RabbitMQ
- 追求可靠性RabbitMQ、RocketMQ
- 追求吞吐能力RocketMQ、Kafka
- 追求消息低延迟RabbitMQ、Kafka2. 原生JavaAPI实现MQ
在这之前先认识RabbitMQ中的一些角色- publisher生产者使用Java代码发送消息
- consumer消费者使用Java代码接收消息
- exchange交换机负责消息路由
- queue队列存储消息
- virtualHost虚拟主机隔离不同租户的exchange、queue、消息2.1 MQ的消息模型
- 简单队列
- 工作队列模式
- 发布订阅模式
- Fanout广播
- Direct定向模式
- Topic主题
- 消息转换器下面使用原生API只展示简单队列模式2.2 原生JavaAPI实现简单队列
简单队列模式的模型图#mermaid-svg-YAGfkK1OHNSrUdxa {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-YAGfkK1OHNSrUdxa .error-icon{fill:#552222;}#mermaid-svg-YAGfkK1OHNSrUdxa .error-text{fill:#552222;stroke:#552222;}#mermaid-svg-YAGfkK1OHNSrUdxa .edge-thickness-normal{stroke-width:2px;}#mermaid-svg-YAGfkK1OHNSrUdxa .edge-thickness-thick{stroke-width:3.5px;}#mermaid-svg-YAGfkK1OHNSrUdxa .edge-pattern-solid{stroke-dasharray:0;}#mermaid-svg-YAGfkK1OHNSrUdxa .edge-pattern-dashed{stroke-dasharray:3;}#mermaid-svg-YAGfkK1OHNSrUdxa .edge-pattern-dotted{stroke-dasharray:2;}#mermaid-svg-YAGfkK1OHNSrUdxa .marker{fill:#333333;stroke:#333333;}#mermaid-svg-YAGfkK1OHNSrUdxa .marker.cross{stroke:#333333;}#mermaid-svg-YAGfkK1OHNSrUdxa svg{font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;}#mermaid-svg-YAGfkK1OHNSrUdxa .label{font-family:"trebuchet ms",verdana,arial,sans-serif;color:#333;}#mermaid-svg-YAGfkK1OHNSrUdxa .cluster-label text{fill:#333;}#mermaid-svg-YAGfkK1OHNSrUdxa .cluster-label span{color:#333;}#mermaid-svg-YAGfkK1OHNSrUdxa .label text,#mermaid-svg-YAGfkK1OHNSrUdxa span{fill:#333;color:#333;}#mermaid-svg-YAGfkK1OHNSrUdxa .node rect,#mermaid-svg-YAGfkK1OHNSrUdxa .node circle,#mermaid-svg-YAGfkK1OHNSrUdxa .node ellipse,#mermaid-svg-YAGfkK1OHNSrUdxa .node polygon,#mermaid-svg-YAGfkK1OHNSrUdxa .node path{fill:#ECECFF;stroke:#9370DB;stroke-width:1px;}#mermaid-svg-YAGfkK1OHNSrUdxa .node .label{text-align:center;}#mermaid-svg-YAGfkK1OHNSrUdxa .node.clickable{cursor:pointer;}#mermaid-svg-YAGfkK1OHNSrUdxa .arrowheadPath{fill:#333333;}#mermaid-svg-YAGfkK1OHNSrUdxa .edgePath .path{stroke:#333333;stroke-width:2.0px;}#mermaid-svg-YAGfkK1OHNSrUdxa .flowchart-link{stroke:#333333;fill:none;}#mermaid-svg-YAGfkK1OHNSrUdxa .edgeLabel{background-color:#e8e8e8;text-align:center;}#mermaid-svg-YAGfkK1OHNSrUdxa .edgeLabel rect{opacity:0.5;background-color:#e8e8e8;fill:#e8e8e8;}#mermaid-svg-YAGfkK1OHNSrUdxa .cluster rect{fill:#ffffde;stroke:#aaaa33;stroke-width:1px;}#mermaid-svg-YAGfkK1OHNSrUdxa .cluster text{fill:#333;}#mermaid-svg-YAGfkK1OHNSrUdxa .cluster span{color:#333;}#mermaid-svg-YAGfkK1OHNSrUdxa div.mermaidTooltip{position:absolute;text-align:center;max-width:200px;padding:2px;font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:12px;background:hsl(80, 100%, 96.2745098039%);border:1px solid #aaaa33;border-radius:2px;pointer-events:none;z-index:100;}#mermaid-svg-YAGfkK1OHNSrUdxa :root{--mermaid-font-family:"trebuchet ms",verdana,arial,sans-serif;} Publisher Queue Consumer - publisher消息发布者将消息发送到队列queue
- queue消息队列负责接收并缓存消息
- consumer订阅队列处理队列中的消息下面使用的是官方提供的原生JavaAPI完成的不用自己手敲代码练习下面有利用Spring简化开发的方案//生产端publisher实现
public class PublisherTest {Testpublic void testSendMessage() throws IOException, TimeoutException {// 1.建立连接ConnectionFactory factory new ConnectionFactory();// 1.1.设置连接参数分别是主机名、端口号、vhost、用户名、密码factory.setHost(192.168.200.130);factory.setPort(5672);factory.setVirtualHost(/);factory.setUsername(用户名);//设置自己的用户名和密码factory.setPassword(*****);// 1.2.建立连接Connection connection factory.newConnection();// 2.创建通道ChannelChannel channel connection.createChannel();// 3.创建队列String queueName simple.queue;channel.queueDeclare(queueName, false, false, false, null);// 4.发送消息String message hello, rabbitmq!;channel.basicPublish(, queueName, null, message.getBytes());System.out.println(发送消息成功【 message 】);// 5.关闭通道和连接channel.close();connection.close();}
}
/***********************************************************************************************/
//消费端consumer实现public class ConsumerTest {public static void main(String[] args) throws IOException, TimeoutException {// 1.建立连接ConnectionFactory factory new ConnectionFactory();// 1.1.设置连接参数分别是主机名、端口号、vhost、用户名、密码factory.setHost(192.168.200.130);factory.setPort(5672);factory.setVirtualHost(/);factory.setUsername(用户); //用户密码和上面的生产端保持一致factory.setPassword(*****);// 1.2.建立连接Connection connection factory.newConnection();// 2.创建通道ChannelChannel channel connection.createChannel();// 3.创建队列String queueName simple.queue;channel.queueDeclare(queueName, false, false, false, null);// 4.订阅消息channel.basicConsume(queueName, true, new DefaultConsumer(channel){Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) throws IOException {// 5.处理消息String message new String(body);System.out.println(接收到消息【 message 】);}});System.out.println(等待接收消息。。。。);}
}小结
基本消息队列的消息发送流程1. 建立connection2. 创建channel3. 利用channel声明队列4. 利用channel向队列发送消息基本消息队列的消息接收流程1. 建立connection2. 创建channel3. 利用channel声明队列4. 定义consumer的消费行为handleDelivery()5. 利用channel将消费者与队列绑定
3.基于SpringAMQP实现MQ
SpringAMQP是基于RabbitMQ封装的一套模板并且利用SpringBoot对其实现了自动装配使用起来非常方便。SpringAmqp的官方地址https://spring.io/projects/spring-amqp
SpringAMQP提供了三个功能- 自动声明队列、交换机及其绑定关系代码注解
- 封装了RabbitTemplate工具用于发送消息 rabbitTemplate.convertAndSend()
- 基于注解的监听器模式异步接收消息RabbitListener#mermaid-svg-MzkSIFbIZQNVduTB {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-MzkSIFbIZQNVduTB .error-icon{fill:#552222;}#mermaid-svg-MzkSIFbIZQNVduTB .error-text{fill:#552222;stroke:#552222;}#mermaid-svg-MzkSIFbIZQNVduTB .edge-thickness-normal{stroke-width:2px;}#mermaid-svg-MzkSIFbIZQNVduTB .edge-thickness-thick{stroke-width:3.5px;}#mermaid-svg-MzkSIFbIZQNVduTB .edge-pattern-solid{stroke-dasharray:0;}#mermaid-svg-MzkSIFbIZQNVduTB .edge-pattern-dashed{stroke-dasharray:3;}#mermaid-svg-MzkSIFbIZQNVduTB .edge-pattern-dotted{stroke-dasharray:2;}#mermaid-svg-MzkSIFbIZQNVduTB .marker{fill:#333333;stroke:#333333;}#mermaid-svg-MzkSIFbIZQNVduTB .marker.cross{stroke:#333333;}#mermaid-svg-MzkSIFbIZQNVduTB svg{font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;}#mermaid-svg-MzkSIFbIZQNVduTB .label{font-family:"trebuchet ms",verdana,arial,sans-serif;color:#333;}#mermaid-svg-MzkSIFbIZQNVduTB .cluster-label text{fill:#333;}#mermaid-svg-MzkSIFbIZQNVduTB .cluster-label span{color:#333;}#mermaid-svg-MzkSIFbIZQNVduTB .label text,#mermaid-svg-MzkSIFbIZQNVduTB span{fill:#333;color:#333;}#mermaid-svg-MzkSIFbIZQNVduTB .node rect,#mermaid-svg-MzkSIFbIZQNVduTB .node circle,#mermaid-svg-MzkSIFbIZQNVduTB .node ellipse,#mermaid-svg-MzkSIFbIZQNVduTB .node polygon,#mermaid-svg-MzkSIFbIZQNVduTB .node path{fill:#ECECFF;stroke:#9370DB;stroke-width:1px;}#mermaid-svg-MzkSIFbIZQNVduTB .node .label{text-align:center;}#mermaid-svg-MzkSIFbIZQNVduTB .node.clickable{cursor:pointer;}#mermaid-svg-MzkSIFbIZQNVduTB .arrowheadPath{fill:#333333;}#mermaid-svg-MzkSIFbIZQNVduTB .edgePath .path{stroke:#333333;stroke-width:2.0px;}#mermaid-svg-MzkSIFbIZQNVduTB .flowchart-link{stroke:#333333;fill:none;}#mermaid-svg-MzkSIFbIZQNVduTB .edgeLabel{background-color:#e8e8e8;text-align:center;}#mermaid-svg-MzkSIFbIZQNVduTB .edgeLabel rect{opacity:0.5;background-color:#e8e8e8;fill:#e8e8e8;}#mermaid-svg-MzkSIFbIZQNVduTB .cluster rect{fill:#ffffde;stroke:#aaaa33;stroke-width:1px;}#mermaid-svg-MzkSIFbIZQNVduTB .cluster text{fill:#333;}#mermaid-svg-MzkSIFbIZQNVduTB .cluster span{color:#333;}#mermaid-svg-MzkSIFbIZQNVduTB div.mermaidTooltip{position:absolute;text-align:center;max-width:200px;padding:2px;font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:12px;background:hsl(80, 100%, 96.2745098039%);border:1px solid #aaaa33;border-radius:2px;pointer-events:none;z-index:100;}#mermaid-svg-MzkSIFbIZQNVduTB :root{--mermaid-font-family:"trebuchet ms",verdana,arial,sans-serif;} Publisher Queue Consumer 在父工程中引入依赖
!--AMQP依赖包含RabbitMQ--
dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId
/dependency1.简单队列
消息发送 首先配置MQ地址在publisher服务的application.yml中添加配置
spring:rabbitmq:host: 192.168.200.130 # 主机名port: 5672 # 端口virtual-host: / # 虚拟主机username: 用户名 # 自己的用户名不能为中文和密码password: *****在publisher服务中编写测试类SpringAmqpTest并利用RabbitTemplate实现消息发送。代码实现如下SpringBootTest
public class SpringAmqpTest {Autowiredprivate RabbitTemplate rabbitTempslate;Test //不要导错包用比较长的import org.junit.jupiter.api.Test;public void testSimpleQueue() {// 队列名称String queueName simple.queue;// 消息String message hello, spring amqp!;// 发送消息此处并不会自动创建队列rabbitTemplate.convertAndSend(queueName, message);}
}消息接收:首先配置MQ地址在consumer服务的application.yml中添加配置spring:rabbitmq:host: 192.168.200.130 # 主机名port: 5672 # 端口virtual-host: / # 虚拟主机username: itcast # 用户名password: 123321 # 密码 在consumer服务的中新建一个类SpringRabbitListenerComponent
public class SpringRabbitListener {RabbitListener(queues simple.queue)public void listenSimpleQueueMessage(String msg) {System.out.println(spring 消费者接收到消息【 msg 】);}
}2.工作队列(Work queues) 当消息处理比较耗时的时候可能生产消息的速度会远远大于消息的消费速度。长此以往消息就会堆积越来越多无法及时处理。如何解决呢- 那我们可以让多个消费者绑定到一个队列共同消费队列中的消息。这个就称为Work queues也被称为Task queues任务模型。可以使用work 模型多个消费者共同处理消息处理速度就能大大提高了。#mermaid-svg-BhHwtiW7S8WcL10n {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-BhHwtiW7S8WcL10n .error-icon{fill:#552222;}#mermaid-svg-BhHwtiW7S8WcL10n .error-text{fill:#552222;stroke:#552222;}#mermaid-svg-BhHwtiW7S8WcL10n .edge-thickness-normal{stroke-width:2px;}#mermaid-svg-BhHwtiW7S8WcL10n .edge-thickness-thick{stroke-width:3.5px;}#mermaid-svg-BhHwtiW7S8WcL10n .edge-pattern-solid{stroke-dasharray:0;}#mermaid-svg-BhHwtiW7S8WcL10n .edge-pattern-dashed{stroke-dasharray:3;}#mermaid-svg-BhHwtiW7S8WcL10n .edge-pattern-dotted{stroke-dasharray:2;}#mermaid-svg-BhHwtiW7S8WcL10n .marker{fill:#333333;stroke:#333333;}#mermaid-svg-BhHwtiW7S8WcL10n .marker.cross{stroke:#333333;}#mermaid-svg-BhHwtiW7S8WcL10n svg{font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;}#mermaid-svg-BhHwtiW7S8WcL10n .label{font-family:"trebuchet ms",verdana,arial,sans-serif;color:#333;}#mermaid-svg-BhHwtiW7S8WcL10n .cluster-label text{fill:#333;}#mermaid-svg-BhHwtiW7S8WcL10n .cluster-label span{color:#333;}#mermaid-svg-BhHwtiW7S8WcL10n .label text,#mermaid-svg-BhHwtiW7S8WcL10n span{fill:#333;color:#333;}#mermaid-svg-BhHwtiW7S8WcL10n .node rect,#mermaid-svg-BhHwtiW7S8WcL10n .node circle,#mermaid-svg-BhHwtiW7S8WcL10n .node ellipse,#mermaid-svg-BhHwtiW7S8WcL10n .node polygon,#mermaid-svg-BhHwtiW7S8WcL10n .node path{fill:#ECECFF;stroke:#9370DB;stroke-width:1px;}#mermaid-svg-BhHwtiW7S8WcL10n .node .label{text-align:center;}#mermaid-svg-BhHwtiW7S8WcL10n .node.clickable{cursor:pointer;}#mermaid-svg-BhHwtiW7S8WcL10n .arrowheadPath{fill:#333333;}#mermaid-svg-BhHwtiW7S8WcL10n .edgePath .path{stroke:#333333;stroke-width:2.0px;}#mermaid-svg-BhHwtiW7S8WcL10n .flowchart-link{stroke:#333333;fill:none;}#mermaid-svg-BhHwtiW7S8WcL10n .edgeLabel{background-color:#e8e8e8;text-align:center;}#mermaid-svg-BhHwtiW7S8WcL10n .edgeLabel rect{opacity:0.5;background-color:#e8e8e8;fill:#e8e8e8;}#mermaid-svg-BhHwtiW7S8WcL10n .cluster rect{fill:#ffffde;stroke:#aaaa33;stroke-width:1px;}#mermaid-svg-BhHwtiW7S8WcL10n .cluster text{fill:#333;}#mermaid-svg-BhHwtiW7S8WcL10n .cluster span{color:#333;}#mermaid-svg-BhHwtiW7S8WcL10n div.mermaidTooltip{position:absolute;text-align:center;max-width:200px;padding:2px;font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:12px;background:hsl(80, 100%, 96.2745098039%);border:1px solid #aaaa33;border-radius:2px;pointer-events:none;z-index:100;}#mermaid-svg-BhHwtiW7S8WcL10n :root{--mermaid-font-family:"trebuchet ms",verdana,arial,sans-serif;} Publisher Queue Consumer 1 Consumer 2 消息发送:在publisher服务中的SpringAmqpTest类中添加一个测试方法/*** workQueue* 向队列中不停发送消息模拟消息堆积。*/
Test
public void testWorkQueue() throws InterruptedException {// 队列名称String queueName simple.queue;// 消息String message hello, message_;for (int i 1; i 50; i) {// 发送消息rabbitTemplate.convertAndSend(queueName, message i);Thread.sleep(20);}
}消息接收:要模拟多个消费者绑定同一个队列我们在consumer中添加2个新的方法//RabbitListener(queues simple.queue)
//public void listenSimpleQueueMessage(String msg) {
// System.out.println(msg);
//}RabbitListener(queues simple.queue)
public void listenWorkQueue1(String msg) throws InterruptedException {System.out.println(LocalTime.now() 消费者1 msg);Thread.sleep(20);
}RabbitListener(queues simple.queue)
public void listenWorkQueue2(String msg) throws InterruptedException {System.err.println(LocalTime.now() 消费者2 msg);Thread.sleep(200);
}运算之后得到结果消息是平均分配给每个消费者并没有考虑到消费者的处理能力。这样显然是有问题的。
怎样解决这个问题呢我们可以修改consumer服务的application.yml文件添加配置spring:rabbitmq:host: 192.168.200.130 # 主机名port: 5672 # 端口virtual-host: / # 虚拟主机username: 用户名 # 自己的用户名和密码password: **** listener: #监听simple: #简单消息模型prefetch: 1 #每次只能获取一条消息处理完成才能获取下一个消息Work模型的使用- 多个消费者绑定到一个队列同一条消息只会被一个消费者处理
- 通过设置prefetch来控制消费者预取的消息数量3.发布/订阅
发布订阅的模型如图#mermaid-svg-mQYIwQn3tiTRsoYG {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-mQYIwQn3tiTRsoYG .error-icon{fill:#552222;}#mermaid-svg-mQYIwQn3tiTRsoYG .error-text{fill:#552222;stroke:#552222;}#mermaid-svg-mQYIwQn3tiTRsoYG .edge-thickness-normal{stroke-width:2px;}#mermaid-svg-mQYIwQn3tiTRsoYG .edge-thickness-thick{stroke-width:3.5px;}#mermaid-svg-mQYIwQn3tiTRsoYG .edge-pattern-solid{stroke-dasharray:0;}#mermaid-svg-mQYIwQn3tiTRsoYG .edge-pattern-dashed{stroke-dasharray:3;}#mermaid-svg-mQYIwQn3tiTRsoYG .edge-pattern-dotted{stroke-dasharray:2;}#mermaid-svg-mQYIwQn3tiTRsoYG .marker{fill:#333333;stroke:#333333;}#mermaid-svg-mQYIwQn3tiTRsoYG .marker.cross{stroke:#333333;}#mermaid-svg-mQYIwQn3tiTRsoYG svg{font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;}#mermaid-svg-mQYIwQn3tiTRsoYG .label{font-family:"trebuchet ms",verdana,arial,sans-serif;color:#333;}#mermaid-svg-mQYIwQn3tiTRsoYG .cluster-label text{fill:#333;}#mermaid-svg-mQYIwQn3tiTRsoYG .cluster-label span{color:#333;}#mermaid-svg-mQYIwQn3tiTRsoYG .label text,#mermaid-svg-mQYIwQn3tiTRsoYG span{fill:#333;color:#333;}#mermaid-svg-mQYIwQn3tiTRsoYG .node rect,#mermaid-svg-mQYIwQn3tiTRsoYG .node circle,#mermaid-svg-mQYIwQn3tiTRsoYG .node ellipse,#mermaid-svg-mQYIwQn3tiTRsoYG .node polygon,#mermaid-svg-mQYIwQn3tiTRsoYG .node path{fill:#ECECFF;stroke:#9370DB;stroke-width:1px;}#mermaid-svg-mQYIwQn3tiTRsoYG .node .label{text-align:center;}#mermaid-svg-mQYIwQn3tiTRsoYG .node.clickable{cursor:pointer;}#mermaid-svg-mQYIwQn3tiTRsoYG .arrowheadPath{fill:#333333;}#mermaid-svg-mQYIwQn3tiTRsoYG .edgePath .path{stroke:#333333;stroke-width:2.0px;}#mermaid-svg-mQYIwQn3tiTRsoYG .flowchart-link{stroke:#333333;fill:none;}#mermaid-svg-mQYIwQn3tiTRsoYG .edgeLabel{background-color:#e8e8e8;text-align:center;}#mermaid-svg-mQYIwQn3tiTRsoYG .edgeLabel rect{opacity:0.5;background-color:#e8e8e8;fill:#e8e8e8;}#mermaid-svg-mQYIwQn3tiTRsoYG .cluster rect{fill:#ffffde;stroke:#aaaa33;stroke-width:1px;}#mermaid-svg-mQYIwQn3tiTRsoYG .cluster text{fill:#333;}#mermaid-svg-mQYIwQn3tiTRsoYG .cluster span{color:#333;}#mermaid-svg-mQYIwQn3tiTRsoYG div.mermaidTooltip{position:absolute;text-align:center;max-width:200px;padding:2px;font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:12px;background:hsl(80, 100%, 96.2745098039%);border:1px solid #aaaa33;border-radius:2px;pointer-events:none;z-index:100;}#mermaid-svg-mQYIwQn3tiTRsoYG :root{--mermaid-font-family:"trebuchet ms",verdana,arial,sans-serif;} Publisher exchange Queue1 Consumer 1 Consumer 2 Queue2 Consumer 3 在订阅模型中多了一个exchange角色而且过程略有变化- Publisher生产者也就是要发送消息的程序但是不再发送到队列中而是发给exchage交换机- Consumer消费者与以前一样订阅队列没有变化- Queue消息队列也与以前一样接收消息、缓存消息。- Exchange交换机消息路由。一方面接收生产者发送的消息。另一方面知道如何处理消息
- 例如递交给某个特别队列、递交给所有队列、或将消息丢弃。到底如何操作取决于Exchange的类型。Exchange有以下3种类型- Fanout扇出广播将消息交给所有绑定到交换机的队列- Direct定向把消息交给符合指定routing key 的队列- Topic通配符把消息交给符合routing pattern路由模式 的队列Exchange交换机只负责转发消息不具备存储消息的能力因此如果没有任何队列与Exchange绑定
或者没有符合路由规则的队列那么消息会丢失4.Fanout广播
Fanout英文翻译是扇出在MQ中理解成广播更合适。#mermaid-svg-pcgPRCxjOYK0o7bB {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-pcgPRCxjOYK0o7bB .error-icon{fill:#552222;}#mermaid-svg-pcgPRCxjOYK0o7bB .error-text{fill:#552222;stroke:#552222;}#mermaid-svg-pcgPRCxjOYK0o7bB .edge-thickness-normal{stroke-width:2px;}#mermaid-svg-pcgPRCxjOYK0o7bB .edge-thickness-thick{stroke-width:3.5px;}#mermaid-svg-pcgPRCxjOYK0o7bB .edge-pattern-solid{stroke-dasharray:0;}#mermaid-svg-pcgPRCxjOYK0o7bB .edge-pattern-dashed{stroke-dasharray:3;}#mermaid-svg-pcgPRCxjOYK0o7bB .edge-pattern-dotted{stroke-dasharray:2;}#mermaid-svg-pcgPRCxjOYK0o7bB .marker{fill:#333333;stroke:#333333;}#mermaid-svg-pcgPRCxjOYK0o7bB .marker.cross{stroke:#333333;}#mermaid-svg-pcgPRCxjOYK0o7bB svg{font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;}#mermaid-svg-pcgPRCxjOYK0o7bB .label{font-family:"trebuchet ms",verdana,arial,sans-serif;color:#333;}#mermaid-svg-pcgPRCxjOYK0o7bB .cluster-label text{fill:#333;}#mermaid-svg-pcgPRCxjOYK0o7bB .cluster-label span{color:#333;}#mermaid-svg-pcgPRCxjOYK0o7bB .label text,#mermaid-svg-pcgPRCxjOYK0o7bB span{fill:#333;color:#333;}#mermaid-svg-pcgPRCxjOYK0o7bB .node rect,#mermaid-svg-pcgPRCxjOYK0o7bB .node circle,#mermaid-svg-pcgPRCxjOYK0o7bB .node ellipse,#mermaid-svg-pcgPRCxjOYK0o7bB .node polygon,#mermaid-svg-pcgPRCxjOYK0o7bB .node path{fill:#ECECFF;stroke:#9370DB;stroke-width:1px;}#mermaid-svg-pcgPRCxjOYK0o7bB .node .label{text-align:center;}#mermaid-svg-pcgPRCxjOYK0o7bB .node.clickable{cursor:pointer;}#mermaid-svg-pcgPRCxjOYK0o7bB .arrowheadPath{fill:#333333;}#mermaid-svg-pcgPRCxjOYK0o7bB .edgePath .path{stroke:#333333;stroke-width:2.0px;}#mermaid-svg-pcgPRCxjOYK0o7bB .flowchart-link{stroke:#333333;fill:none;}#mermaid-svg-pcgPRCxjOYK0o7bB .edgeLabel{background-color:#e8e8e8;text-align:center;}#mermaid-svg-pcgPRCxjOYK0o7bB .edgeLabel rect{opacity:0.5;background-color:#e8e8e8;fill:#e8e8e8;}#mermaid-svg-pcgPRCxjOYK0o7bB .cluster rect{fill:#ffffde;stroke:#aaaa33;stroke-width:1px;}#mermaid-svg-pcgPRCxjOYK0o7bB .cluster text{fill:#333;}#mermaid-svg-pcgPRCxjOYK0o7bB .cluster span{color:#333;}#mermaid-svg-pcgPRCxjOYK0o7bB div.mermaidTooltip{position:absolute;text-align:center;max-width:200px;padding:2px;font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:12px;background:hsl(80, 100%, 96.2745098039%);border:1px solid #aaaa33;border-radius:2px;pointer-events:none;z-index:100;}#mermaid-svg-pcgPRCxjOYK0o7bB :root{--mermaid-font-family:"trebuchet ms",verdana,arial,sans-serif;} Publisher exchange Queue1 Consumer 1 Queue2 Consumer 2 在广播模式下消息发送流程是这样的- 1 可以有多个队列
- 2 每个队列都要绑定到Exchange交换机
- 3 生产者发送的消息只能发送到交换机交换机来决定要发给哪个队列生产者无法决定
- 4 交换机把消息发送给绑定过的所有队列
- 5 订阅队列的消费者都能拿到消息声明队列和交换机
Spring提供了一个接口Exchange来表示所有不同类型的交换机UML类图在consumer服务中创建一个类声明队列和交换机Configuration
public class FanoutConfig {/*** 声明交换机* return Fanout类型交换机*/Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange(itcast.fanout);}/*** 第1个队列*/Beanpublic Queue fanoutQueue1(){return new Queue(fanout.queue1);}/*** 绑定队列和交换机*/Bean // 方法中的参数从IoC容器中获取public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}/*** 第2个队列*/Beanpublic Queue fanoutQueue2(){return new Queue(fanout.queue2);}/*** 绑定队列和交换机*/Beanpublic Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}
} 消息发送:
在publisher服务的SpringAmqpTest类中添加测试方法Test
public void testFanoutExchange() {// 交换机名称String exchangeName itcast.fanout;// 消息String message hello, everyone!;rabbitTemplate.convertAndSend(exchangeName, , message);
}消息接收
在consumer服务的SpringRabbitListener中添加两个方法作为消费者RabbitListener(queues fanout.queue1)
public void listenFanoutQueue1(String msg) {System.out.println(消费者1接收到Fanout消息【 msg 】);
}RabbitListener(queues fanout.queue2)
public void listenFanoutQueue2(String msg) {System.out.println(消费者2接收到Fanout消息【 msg 】);
}交换机的作用是什么- 接收publisher发送的消息
- 将消息按照规则路由到与之绑定的队列
- 不能缓存消息路由失败消息丢失
- FanoutExchange的会将消息路由到每个绑定的队列声明队列、交换机、绑定关系的Bean是什么- Queue
- FanoutExchange
- Binding5.Direct定向
在Fanout模式中一条消息会被所有订阅的队列都消费。
但是在某些场景下我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。#mermaid-svg-ViPCWm5zYK0wsIQF {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-ViPCWm5zYK0wsIQF .error-icon{fill:#552222;}#mermaid-svg-ViPCWm5zYK0wsIQF .error-text{fill:#552222;stroke:#552222;}#mermaid-svg-ViPCWm5zYK0wsIQF .edge-thickness-normal{stroke-width:2px;}#mermaid-svg-ViPCWm5zYK0wsIQF .edge-thickness-thick{stroke-width:3.5px;}#mermaid-svg-ViPCWm5zYK0wsIQF .edge-pattern-solid{stroke-dasharray:0;}#mermaid-svg-ViPCWm5zYK0wsIQF .edge-pattern-dashed{stroke-dasharray:3;}#mermaid-svg-ViPCWm5zYK0wsIQF .edge-pattern-dotted{stroke-dasharray:2;}#mermaid-svg-ViPCWm5zYK0wsIQF .marker{fill:#333333;stroke:#333333;}#mermaid-svg-ViPCWm5zYK0wsIQF .marker.cross{stroke:#333333;}#mermaid-svg-ViPCWm5zYK0wsIQF svg{font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;}#mermaid-svg-ViPCWm5zYK0wsIQF .label{font-family:"trebuchet ms",verdana,arial,sans-serif;color:#333;}#mermaid-svg-ViPCWm5zYK0wsIQF .cluster-label text{fill:#333;}#mermaid-svg-ViPCWm5zYK0wsIQF .cluster-label span{color:#333;}#mermaid-svg-ViPCWm5zYK0wsIQF .label text,#mermaid-svg-ViPCWm5zYK0wsIQF span{fill:#333;color:#333;}#mermaid-svg-ViPCWm5zYK0wsIQF .node rect,#mermaid-svg-ViPCWm5zYK0wsIQF .node circle,#mermaid-svg-ViPCWm5zYK0wsIQF .node ellipse,#mermaid-svg-ViPCWm5zYK0wsIQF .node polygon,#mermaid-svg-ViPCWm5zYK0wsIQF .node path{fill:#ECECFF;stroke:#9370DB;stroke-width:1px;}#mermaid-svg-ViPCWm5zYK0wsIQF .node .label{text-align:center;}#mermaid-svg-ViPCWm5zYK0wsIQF .node.clickable{cursor:pointer;}#mermaid-svg-ViPCWm5zYK0wsIQF .arrowheadPath{fill:#333333;}#mermaid-svg-ViPCWm5zYK0wsIQF .edgePath .path{stroke:#333333;stroke-width:2.0px;}#mermaid-svg-ViPCWm5zYK0wsIQF .flowchart-link{stroke:#333333;fill:none;}#mermaid-svg-ViPCWm5zYK0wsIQF .edgeLabel{background-color:#e8e8e8;text-align:center;}#mermaid-svg-ViPCWm5zYK0wsIQF .edgeLabel rect{opacity:0.5;background-color:#e8e8e8;fill:#e8e8e8;}#mermaid-svg-ViPCWm5zYK0wsIQF .cluster rect{fill:#ffffde;stroke:#aaaa33;stroke-width:1px;}#mermaid-svg-ViPCWm5zYK0wsIQF .cluster text{fill:#333;}#mermaid-svg-ViPCWm5zYK0wsIQF .cluster span{color:#333;}#mermaid-svg-ViPCWm5zYK0wsIQF div.mermaidTooltip{position:absolute;text-align:center;max-width:200px;padding:2px;font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:12px;background:hsl(80, 100%, 96.2745098039%);border:1px solid #aaaa33;border-radius:2px;pointer-events:none;z-index:100;}#mermaid-svg-ViPCWm5zYK0wsIQF :root{--mermaid-font-family:"trebuchet ms",verdana,arial,sans-serif;} key:blue key:red Publisher exchange Queue Consumer 1 Queue2 Consumer 2 在Direct模型下- 队列与交换机的绑定不能是任意绑定了而是要指定一个RoutingKey路由key。
- 消息的发送方在 向 Exchange发送消息时也必须指定消息的 RoutingKey。
- Exchange不再把消息交给每一个绑定的队列而是根据消息的Routing Key进行判断
只有队列的Routingkey与消息的 Routing key完全一致才会接收到消息。案例需求如下:1. 利用RabbitListener声明Exchange、Queue、RoutingKey2. 在consumer服务中编写两个消费者方法分别监听direct.queue1和direct.queue23. 在publisher中编写测试方法向itcast. direct发送消息声明队列和交换机基于Bean的方式声明队列和交换机比较麻烦Spring还提供了基于注解方式来声明。在consumer的SpringRabbitListener中添加两个消费者同时基于注解来声明队列和交换机RabbitListener(bindings QueueBinding(value Queue(name direct.queue1), //创建队列exchange Exchange(name itcast.direct, type ExchangeTypes.DIRECT),//创建交换机key {red, blue} //绑定接受消息的key
))
public void listenDirectQueue1(String msg){System.out.println(消费者接收到direct.queue1的消息【 msg 】);
}RabbitListener(bindings QueueBinding(value Queue(name direct.queue2),exchange Exchange(name itcast.direct, type ExchangeTypes.DIRECT),key {red, yellow}
))
public void listenDirectQueue2(String msg){System.out.println(消费者接收到direct.queue2的消息【 msg 】);
}消息发送在publisher服务的SpringAmqpTest类中添加测试方法Test
public void testSendDirectExchange() {// 交换机名称String exchangeName itcast.direct;// 消息String message 红色警报日本乱排核废水导致海洋生物变异惊现哥斯拉;// 发送消息keyred两个消费者都能收到消息rabbitTemplate.convertAndSend(exchangeName, red, message);// 发送消息keyblue消费者1 能收到消息rabbitTemplate.convertAndSend(exchangeName, blue, message);// 发送消息keyyellow消费者2 能收到消息rabbitTemplate.convertAndSend(exchangeName, yellow, message);
}总结
Direct交换机与Fanout交换机的差异- Fanout交换机将消息路由给每一个与之绑定的队列
- Direct交换机根据RoutingKey判断路由给哪个队列
- 如果多个队列具有相同的RoutingKey则与Fanout功能类似基于RabbitListener注解声明队列和交换机的常见注解- 开始声明bindings ?
- 指定一个绑定关系 QueueBinding
- 声明队列value Queue
- 声明交换机exchange Exchange
- 指定路由keykey {一个或多个}6.Topic主题
Topic类型的Exchange与Direct相比- 相同点都可以根据RoutingKey把消息路由到不同的队列
- 不同点Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符 Routingkey 一般都是有一个或多个单词组成多个单词之间以”.”分割
例如 item.insert, item.del 通配符规则#匹配零个一个或多个词任意多个【常用】*匹配不多不少必须是1个词#mermaid-svg-DHpL1gFQBtSYHkif {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-DHpL1gFQBtSYHkif .error-icon{fill:#552222;}#mermaid-svg-DHpL1gFQBtSYHkif .error-text{fill:#552222;stroke:#552222;}#mermaid-svg-DHpL1gFQBtSYHkif .edge-thickness-normal{stroke-width:2px;}#mermaid-svg-DHpL1gFQBtSYHkif .edge-thickness-thick{stroke-width:3.5px;}#mermaid-svg-DHpL1gFQBtSYHkif .edge-pattern-solid{stroke-dasharray:0;}#mermaid-svg-DHpL1gFQBtSYHkif .edge-pattern-dashed{stroke-dasharray:3;}#mermaid-svg-DHpL1gFQBtSYHkif .edge-pattern-dotted{stroke-dasharray:2;}#mermaid-svg-DHpL1gFQBtSYHkif .marker{fill:#333333;stroke:#333333;}#mermaid-svg-DHpL1gFQBtSYHkif .marker.cross{stroke:#333333;}#mermaid-svg-DHpL1gFQBtSYHkif svg{font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;}#mermaid-svg-DHpL1gFQBtSYHkif .label{font-family:"trebuchet ms",verdana,arial,sans-serif;color:#333;}#mermaid-svg-DHpL1gFQBtSYHkif .cluster-label text{fill:#333;}#mermaid-svg-DHpL1gFQBtSYHkif .cluster-label span{color:#333;}#mermaid-svg-DHpL1gFQBtSYHkif .label text,#mermaid-svg-DHpL1gFQBtSYHkif span{fill:#333;color:#333;}#mermaid-svg-DHpL1gFQBtSYHkif .node rect,#mermaid-svg-DHpL1gFQBtSYHkif .node circle,#mermaid-svg-DHpL1gFQBtSYHkif .node ellipse,#mermaid-svg-DHpL1gFQBtSYHkif .node polygon,#mermaid-svg-DHpL1gFQBtSYHkif .node path{fill:#ECECFF;stroke:#9370DB;stroke-width:1px;}#mermaid-svg-DHpL1gFQBtSYHkif .node .label{text-align:center;}#mermaid-svg-DHpL1gFQBtSYHkif .node.clickable{cursor:pointer;}#mermaid-svg-DHpL1gFQBtSYHkif .arrowheadPath{fill:#333333;}#mermaid-svg-DHpL1gFQBtSYHkif .edgePath .path{stroke:#333333;stroke-width:2.0px;}#mermaid-svg-DHpL1gFQBtSYHkif .flowchart-link{stroke:#333333;fill:none;}#mermaid-svg-DHpL1gFQBtSYHkif .edgeLabel{background-color:#e8e8e8;text-align:center;}#mermaid-svg-DHpL1gFQBtSYHkif .edgeLabel rect{opacity:0.5;background-color:#e8e8e8;fill:#e8e8e8;}#mermaid-svg-DHpL1gFQBtSYHkif .cluster rect{fill:#ffffde;stroke:#aaaa33;stroke-width:1px;}#mermaid-svg-DHpL1gFQBtSYHkif .cluster text{fill:#333;}#mermaid-svg-DHpL1gFQBtSYHkif .cluster span{color:#333;}#mermaid-svg-DHpL1gFQBtSYHkif div.mermaidTooltip{position:absolute;text-align:center;max-width:200px;padding:2px;font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:12px;background:hsl(80, 100%, 96.2745098039%);border:1px solid #aaaa33;border-radius:2px;pointer-events:none;z-index:100;}#mermaid-svg-DHpL1gFQBtSYHkif :root{--mermaid-font-family:"trebuchet ms",verdana,arial,sans-serif;} topic bindingKey1 bindingKey2 bindingKey3 bindingKey4 Publisher exchange Queue1 Consumer 1 Queue2 Consumer 2 Queue3 Consumer 3 Queue4 Consumer 4 举例
demo.#能够匹配demo, demo.spu, demo.spu.insert
demo.*只能匹配demo.spu实现思路如下1. 并利用RabbitListener声明Exchange、Queue、RoutingKey2. 在consumer服务中编写两个消费者方法分别监听topic.queue1和topic.queue23. 在publisher中编写测试方法向itcast. topic发送消息- Queue1假设绑定的是china.# 因此凡是以 china.开头的routing key 都会被匹配到。
- 包括china.news和china.weather
- Queue2假设绑定的是#.news 因此凡是以 .news结尾的 routing key 都会被匹配。
- 包括china.news和japan.news消息接收在consumer服务的SpringRabbitListener中添加方法RabbitListener(bindings QueueBinding(value Queue(name demo.queue1),exchange Exchange(name demo.topic, type ExchangeTypes.TOPIC),key china.#
))
public void listenTopicQueue1(String msg){System.out.println(消费者接收到topic.queue1的消息【 msg 】);
}RabbitListener(bindings QueueBinding(value Queue(name demo.queue2),exchange Exchange(name demo.topic, type ExchangeTypes.TOPIC),key #.news
))
public void listenTopicQueue2(String msg){System.out.println(消费者接收到demo.queue2的消息【 msg 】);
}消息发送在publisher服务的SpringAmqpTest类中添加测试方法:
/*** topicExchange*/
Test
public void testSendTopicExchange() {// 交换机名称String exchangeName demo.topic;// 消息String message 喜报孙悟空大战哥斯拉胜!;// 发送消息rabbitTemplate.convertAndSend(exchangeName, china.news, message);// 发送消息rabbitTemplate.convertAndSend(exchangeName, china.weather, 明天天气晴20-36度);
}# 总结描述下Direct交换机与Topic交换机的差异- Topic交换机接收的消息RoutingKey必须是多个单词以 **.** 分割
- Topic交换机与队列绑定时的bindingKey可以指定通配符- #代表0个1个或多个词- *代表1个词7.消息转换器
Spring会把你发送的消息序列化为字节发送给MQ接收消息的时候还会把字节反序列化为下面的message对象。void convertAndSend(String exchange, String routingKey,Object message) throw AmqpException;默认情况下Spring采用的序列化方式是JDK序列化。众所周知JDK序列化存在下列问题- 数据体积过大
- 可读性差测试默认转换器:Test
public void testSendMap() throws InterruptedException {// 准备消息MapString,Object msg new HashMap();msg.put(name, Jack);msg.put(age, 21);// 发送消息rabbitTemplate.convertAndSend(simple.queue, msg);
}1、执行前先停止consumer服务防止消息被消费掉无法在RabbitMQ控制台看到
2、MQ服务上没有simple.queue临时通过管理端快速创建一个
发送消息后查看控制台# 配置JSON转换器显然JDK序列化方式并不合适。我们希望消息体的体积更小、可读性更高因此可以使用JSON方式来做序列化和反序列化。1、在publisher和consumer两个服务中都引入依赖因此咱们选择在父工程添加!-- mq-demo的pom.xmljacksonSpringBoot用的 --
dependencygroupIdcom.fasterxml.jackson.core/groupIdartifactIdjackson-databind/artifactId
/dependency2、配置消息转换器#在PublisherApplication和ConsumerApplication两个启动类中都添加一个Bean
PublisherApplication作用Java对象 》JSON字符串
import org.springframework.amqp.support.converter.MessageConverter;SpringBootApplication
public class PublisherApplication {public static void main(String[] args) {SpringApplication.run(PublisherApplication.class);}Bean //注意导包org.springframework.amqp.support.converter.MessageConverterpublic MessageConverter messageConverter(){return new Jackson2JsonMessageConverter();}
}3、重新发送消息通过管理平台查询效果4、接受消息SpringRabbitListenerRabbitListener(queues simple.queue)
public void listenObjectQueue(MapString,Object msg){System.out.println(接收到object.queue的消息 msg);
}二、MQ高级
1.消息可靠性
消息从发送到消费者接收会经历多个过程#mermaid-svg-vGe4oEFPVCijCxx3 {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-vGe4oEFPVCijCxx3 .error-icon{fill:#552222;}#mermaid-svg-vGe4oEFPVCijCxx3 .error-text{fill:#552222;stroke:#552222;}#mermaid-svg-vGe4oEFPVCijCxx3 .edge-thickness-normal{stroke-width:2px;}#mermaid-svg-vGe4oEFPVCijCxx3 .edge-thickness-thick{stroke-width:3.5px;}#mermaid-svg-vGe4oEFPVCijCxx3 .edge-pattern-solid{stroke-dasharray:0;}#mermaid-svg-vGe4oEFPVCijCxx3 .edge-pattern-dashed{stroke-dasharray:3;}#mermaid-svg-vGe4oEFPVCijCxx3 .edge-pattern-dotted{stroke-dasharray:2;}#mermaid-svg-vGe4oEFPVCijCxx3 .marker{fill:#333333;stroke:#333333;}#mermaid-svg-vGe4oEFPVCijCxx3 .marker.cross{stroke:#333333;}#mermaid-svg-vGe4oEFPVCijCxx3 svg{font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;}#mermaid-svg-vGe4oEFPVCijCxx3 .label{font-family:"trebuchet ms",verdana,arial,sans-serif;color:#333;}#mermaid-svg-vGe4oEFPVCijCxx3 .cluster-label text{fill:#333;}#mermaid-svg-vGe4oEFPVCijCxx3 .cluster-label span{color:#333;}#mermaid-svg-vGe4oEFPVCijCxx3 .label text,#mermaid-svg-vGe4oEFPVCijCxx3 span{fill:#333;color:#333;}#mermaid-svg-vGe4oEFPVCijCxx3 .node rect,#mermaid-svg-vGe4oEFPVCijCxx3 .node circle,#mermaid-svg-vGe4oEFPVCijCxx3 .node ellipse,#mermaid-svg-vGe4oEFPVCijCxx3 .node polygon,#mermaid-svg-vGe4oEFPVCijCxx3 .node path{fill:#ECECFF;stroke:#9370DB;stroke-width:1px;}#mermaid-svg-vGe4oEFPVCijCxx3 .node .label{text-align:center;}#mermaid-svg-vGe4oEFPVCijCxx3 .node.clickable{cursor:pointer;}#mermaid-svg-vGe4oEFPVCijCxx3 .arrowheadPath{fill:#333333;}#mermaid-svg-vGe4oEFPVCijCxx3 .edgePath .path{stroke:#333333;stroke-width:2.0px;}#mermaid-svg-vGe4oEFPVCijCxx3 .flowchart-link{stroke:#333333;fill:none;}#mermaid-svg-vGe4oEFPVCijCxx3 .edgeLabel{background-color:#e8e8e8;text-align:center;}#mermaid-svg-vGe4oEFPVCijCxx3 .edgeLabel rect{opacity:0.5;background-color:#e8e8e8;fill:#e8e8e8;}#mermaid-svg-vGe4oEFPVCijCxx3 .cluster rect{fill:#ffffde;stroke:#aaaa33;stroke-width:1px;}#mermaid-svg-vGe4oEFPVCijCxx3 .cluster text{fill:#333;}#mermaid-svg-vGe4oEFPVCijCxx3 .cluster span{color:#333;}#mermaid-svg-vGe4oEFPVCijCxx3 div.mermaidTooltip{position:absolute;text-align:center;max-width:200px;padding:2px;font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:12px;background:hsl(80, 100%, 96.2745098039%);border:1px solid #aaaa33;border-radius:2px;pointer-events:none;z-index:100;}#mermaid-svg-vGe4oEFPVCijCxx3 :root{--mermaid-font-family:"trebuchet ms",verdana,arial,sans-serif;} Publisher exchange Queue1 Consumer 1 Queue2 Consumer 2 其中的每一步都可能导致消息丢失常见的丢失原因包括- 发送时丢失- 生产者发送的消息未送达exchange- 消息到达exchange后未到达queue
- MQ宕机queue将消息丢失
- consumer接收到消息后未消费就宕机针对这些问题RabbitMQ分别给出了解决方案- 生产者确认机制发送时丢失
- 消息持久化MQ宕机
- 消费者确认机制消费者宕机
- 失败重试机制消费失败1.1.生产者消息确认 RabbitMQ提供了生产者确认机制来避免消息发送到MQ过程中丢失。这种机制必须给每个消息指定一个唯一ID。消息发送到MQ以后会返回一个结果给发送者表示消息是否处理成功。返回结果有两种方式- publisher-confirm发送者确认- 消息成功投递到交换机返回ack- 消息未投递到交换机返回nack- publisher-return发送者回执- 消息投递到交换机了但是没有路由到队列。返回通知及路由失败原因。- 正常到达队列没有任何回复没有回复就是成功确认机制发送消息时需要给每个消息设置一个全局唯一Id以区分不同消息避免ack冲突。举个栗子修改publisher服务中的application.yml文件添加下面的内容spring:rabbitmq:host: 192.168.200.130 # rabbitMQ的ip地址port: 5672 # 端口username: itcastpassword: 123321virtual-host: /publisher-confirm-type: correlated #判断是否到达交换机异步通知publisher-returns: true #判断是否到达队列template:mandatory: true #定义消息路由失败时的策略解释说明一下- publish-confirm-type开启publisher-confirm这里支持两种类型- simple同步等待confirm结果直到超时【一般不使用影响性能】- correlated异步回调定义ConfirmCallbackMQ返回结果时会回调这个ConfirmCallback- publish-returns- true开启publish-return功能同样是基于callback机制不过是定义ReturnCallback- false关闭publish-return功能
- template.mandatory定义消息路由失败时的策略。- true则调用ReturnCallback- false则直接丢弃消息修改consumer服务中的application.yml改为自己的虚拟机IPspring:rabbitmq:host: 192.168.200.130 # rabbitMQ的ip地址port: 5672 # 端口username: itcastpassword: 123321virtual-host: /定义Return回调:每个RabbitTemplate只能配置一个ReturnCallback因此需要在项目启动过程中配置
作用 如果消息没有到达队列会执行回调方法修改publisher服务添加一个ReturnCallbackpackage cn.itcast.mq.config;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Configuration;Slf4j
Configuration
//ApplicationContextAware: 在Spring容器Bean工厂创建好的时候通知咱们
public class CommonConfig implements ApplicationContextAware {Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {// 获取RabbitTemplateRabbitTemplate rabbitTemplate applicationContext.getBean(RabbitTemplate.class);// 设置ReturnCallback先用匿名内部类rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) - {// 投递失败没有到达队列记录日志log.error(消息队列接收失败应答码{}原因{}交换机{}路由键{},消息{},replyCode, replyText, exchange, routingKey, message.toString());// 如果有业务需要可以重发消息//rabbitTemplate.convertAndSend(exchange, routingKey, message);});}
}定义Confirm回调:ConfirmCallback可以在发送消息时指定因为每个业务处理confirm成功或失败的逻辑不一定相同。在publisher服务的cn.itcast.mq.spring.SpringAmqpTest类中定义一个单元测试方法Test
public void testSendMessage2SimpleQueue() throws InterruptedException {// 1.消息体String message hello, spring amqp!;// 2.全局唯一的消息ID需要封装到CorrelationData中//uuid, 雪花算法CorrelationData correlationData new CorrelationData(UUID.randomUUID().toString());// 3.添加callbackcorrelationData.getFuture().addCallback(result - {if(result.isAck()){// 3.1.ack消息成功log.debug(消息发送到交换机成功, ID:{}, correlationData.getId());}else{// 3.2.nack消息失败log.error(消息发送到交换机失败, ID:{}, 原因{},correlationData.getId(), result.getReason());}},ex - log.error(消息发送异常, ID:{}, 原因{},correlationData.getId(),ex.getMessage()));// 4.发送消息其中simple.test是路由keyrabbitTemplate.convertAndSend(amq.topic, simple.test, message, correlationData);// 休眠一会儿等待ack回执//如果不休眠程序就直接结束了RabbitMQ服务器就无法回调咱们写的代码Thread.sleep(2000);
}登录到MQ的管理端# 测试1、发送到一个不存在的交换机camq.topicrabbitTemplate.convertAndSend(camq.topic, simple.test, message, correlationData);//查看日志会有一个没有到达交换机的信息2、发送到一个已经存在的交换机amq.topic系统自带的但没有绑定指定的路由rabbitTemplate.convertAndSend(amq.topic, simple.test, message, correlationData);//查看日志没有路由到队列3、通过管理端指定amq.topic交换机的路由key到simple.queuerabbitTemplate.convertAndSend(amq.topic, simple.test, message, correlationData);//成功发送需要到管理端查看一下队列中是否有消息1.2.消息持久化
生产者确认可以确保消息投递到RabbitMQ的队列中但是消息发送到RabbitMQ以后如果突然宕机
也可能导致消息丢失。要想确保消息在RabbitMQ中安全保存必须开启消息持久化机制。- 交换机持久化
- 队列持久化
- 消息持久化# 交换机持久化RabbitMQ中交换机默认是非持久化的mq重启后就丢失。SpringAMQP中可以通过代码指定交换机持久化Bean
public DirectExchange simpleExchange(){// 三个参数交换机名称、是否持久化、当没有queue与其绑定时是否自动删除//durable: 持久化return new DirectExchange(simple.direct, true, false);//默认创建就是持久化的交换机//return new DirectExchange(simple.direct);
}提示由SpringAMQP声明的交换机都是持久化的可以在RabbitMQ控制台看到持久化的交换机都会带上D的标示# 队列持久化RabbitMQ中队列默认是非持久化的mq重启后就丢失。SpringAMQP中可以通过代码指定交换机持久化Bean
public Queue simpleQueue(){// 使用QueueBuilder构建队列durable就是持久化的return QueueBuilder.durable(simple.queue).build();//return new Queue(simple.queue);
}提示由SpringAMQP声明的交换机都是持久化的# 消息持久化利用SpringAMQP发送消息时可以设置消息的属性MessageProperties指定delivery-mode- 非持久化MessageDeliveryMode.NON_PERSISTENT
- 持久化MessageDeliveryMode.PERSISTENT用java代码指定Test
public void testDurableMessage() {// 1.准备消息Message message MessageBuilder.withBody(hello, spring.getBytes(StandardCharsets.UTF_8)).setDeliveryMode(MessageDeliveryMode.PERSISTENT) //设置消息的属性持久化.build();// 2.发送消息rabbitTemplate.convertAndSend(simple.queue, message);
}提示由SpringAMQP声明的交换机都是持久化的1.3.消费者消息确认
RabbitMQ确认消息被消费者消费后会立刻删除。而RabbitMQ是通过消费者回执来确认消费者是否成功处理消息的消费者获取消息后应该向RabbitMQ发送ACK回
执表明自己已经处理消息。设想这样的场景- 1RabbitMQ投递消息给消费者
- 2消费者获取消息后返回ACK给RabbitMQ
- 3RabbitMQ删除消息
- 4消费者宕机消息尚未处理这样消息就丢失了。因此消费者返回ACK的时机非常重要。/********************************************************************************而SpringAMQP则允许配置三种确认模式- manual手动ack需要在处理完消息后调用api发送ack【麻烦一般不使用】。
- auto自动ack由spring监测listener代码是否出现异常没有异常则返回ack抛出异常则返回nack
- none关闭ackMQ假定消费者获取消息后肯定会成功处理因此消息投递后立即被删除由此可知- manual自己根据业务情况判断什么时候该ack太麻烦不使用
- auto模式类似事务机制出现异常时返回nack消息回滚到mq没有异常返回ack
- none模式下消息投递是不可靠的可能丢失不适合用在项目中因此我们都是使用默认的auto即可。# none模式修改consumer服务的application.yml文件添加下面内容spring:rabbitmq:listener:simple:acknowledge-mode: none # 关闭ack修改consumer服务的SpringRabbitListener类中的方法模拟一个消息处理异常RabbitListener(queues simple.queue)
public void listenSimpleQueue(String msg) {log.info(消费者接收到simple.queue的消息【{}】, msg);// 模拟异常 -给MQ返回nackSystem.out.println(1 / 0);log.debug(消息成功处理完成);
}// 测试可以发现当消息处理抛异常时消息依然被RabbitMQ删除了# auto模式# 再次把确认机制修改为auto:spring:rabbitmq:listener:simple:#消费成功返回ack#消费失败返回nackacknowledge-mode: auto # 根据异常自动ack在异常位置打断点再次发送消息程序卡在断点时可以发现此时消息状态为unack未确定状态抛出异常后因为Spring会自动返回nack所以消息恢复至Ready状态并且没有被RabbitMQ删除1.4.消费失败重试机制
当消费者出现异常后消息会不断requeue重入队到队列再重新发送给消费者然后再次异常再次requeue
无限循环导致mq的消息处理飙升带来不必要的压力怎么办呢# 本地重试我们可以利用Spring的retry机制在消费者出现异常时利用本地重试而不是无限制的requeue到mq队列。修改consumer服务的application.yml文件添加内容spring:rabbitmq:listener:simple:retry: #本地重试enabled: true # 开启消费者失败重试initial-interval: 1000 # 初始的失败等待时长为1秒multiplier: 2 # 失败的等待时长倍数下次等待时长 multiplier * last-intervalmax-attempts: 3 # 最大重试次数包含服务器推送的第一次stateless: true # true无状态false有状态。如果业务中包含事务这里改为false重启consumer服务重复之前的测试。可以发现- 在重试3次后SpringAMQP会抛出异常AmqpRejectAndDontRequeueException说明本地重试触发了- 查看RabbitMQ控制台发现消息被删除了RejectAndDontRequeue说明最后SpringAMQP返回的是ack
mq删除消息了reject: 拒绝
dont re queue: 不要重新放到队列# 结论- 开启本地重试时消息处理过程中抛出异常不会requeue到队列而是在消费者本地重试
- 重试达到最大次数后Spring会返回ack给MQ服务器(reject not re queue)消息会被丢弃失败策略:在之前的测试中达到最大重试次数后消息会被丢弃这是由Spring内部机制决定的。在开启重试模式后重试次数耗尽如果消息依然失败会有MessageRecoverer接口来处理
它包含三种不同的实现
- RejectAndDontRequeueRecoverer重试耗尽后直接reject丢弃消息。默认就是这种方式- ImmediateRequeueMessageRecoverer本地重试耗尽后返回nack消息重新入队重新推送消息- RepublishMessageRecoverer【最优方法】重试耗尽后将失败消息投递到指定的交换机后续人工介入来处理处理方案是RepublishMessageRecoverer失败后将消息投递到一个指定的专门存放异常消息的队列
后续由人工集中处理。1在consumer服务中定义处理失败消息的交换机和队列package cn.itcast.mq.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;Configuration
public class ErrorMessageConfig {Beanpublic DirectExchange errorMessageExchange(){return new DirectExchange(error.direct);}Beanpublic Queue errorQueue(){return new Queue(error.queue, true);}Beanpublic Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with(error);}//TODO 指定失败处理策略
}
/**************************************************************************************************/
2定义一个RepublishMessageRecoverer关联队列和交换机Bean //非常特殊方法上有Bean方法中所有的参数自动就有一个Autowired
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){//最终效果将重试失败的消息重新发送到指定的交换机路由keyreturn new RepublishMessageRecoverer(rabbitTemplate, error.direct, error);
}/*********************************************************************************************************/
完整代码 package cn.itcast.mq.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.context.annotation.Bean;Configuration
public class ErrorMessageConfig {Beanpublic DirectExchange errorMessageExchange(){return new DirectExchange(error.direct);}Beanpublic Queue errorQueue(){return new Queue(error.queue, true);}Beanpublic Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with(error);}Bean //修改本地重试耗尽之后消息处理策略把消息发到指定的交换keypublic MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){return new RepublishMessageRecoverer(rabbitTemplate, error.direct, error);}
}1.5.总结
如何确保RabbitMQ消息的可靠性- 开启生产者确认机制确保生产者的消息能到达交换机和队列
- 开启持久化功能确保消息未消费前在队列中不会丢失
- 开启消费者确认机制为auto由spring确认消息处理成功后完成ack
- 开启消费者失败本地重试机制并设置MessageRecoverer多次重试失败后将消息投递到异常交换机
- 交由人工处理2.死信交换机
2.1.认识死信
当一个队列中的消息满足下列情况之一时可以成为死信dead letter- 消费者使用basic.reject或 basic.nack声明消费失败并且消息的requeue参数设置为false
- 消息是一个过期消息超时无人消费【利用此机制实现延迟消息】
- 要投递的队列消息满了无法投递如果一个消息被消费者拒绝了变成了死信如果这个包含死信的队列配置了dead-letter-exchange属性指定了一个交换机那么队列中的死信就会投递到
这个交换机中而这个交换机称为死信交换机Dead Letter Exchange简称DLX。如果这个死信交换机也绑定了一个队列则消息最终会进入这个只存放死信的队列因为simple.queue绑定了死信交换机 dl.direct并且设置了路由key因此死信最终会经过死信交换机路由给死信队列。- 指定死信交换机名称dl.direct
- 指定死信交换机与死信队列绑定的RoutingKeydl这样才能确保投递的消息能到达死信交换机并且正确的路由到死信队列。 下边代码只是为了演示对应图片中的配置不用添加到项目中Bean
public Queue simpleQueue(){//return new Queue(simple.queue);return QueueBuilder.durable(simple.queue) // 指定队列名称并持久化.deadLetterExchange(dl.direct) // 指定死信交换机.deadLetterRoutingKey(dl) //指定路由key.build();
}# 总结:什么样的消息会成为死信- 消息被消费者reject或者返回nack并且设置了requeuefalse
- 消息超时未消费
- 队列满了死信交换机的使用场景是什么- 如果队列绑定了死信交换机死信会投递到死信交换机
- 可以利用死信交换机收集所有消费者处理失败的消息死信交由人工处理进一步提高消息队列的可靠性。2.2.TTL(过期时间)
TTL也就是Time-To-Live过期时间。如果一个队列中的消息TTL结束仍未消费则会变为死信。TTL超时分为两种情况- 消息本身设置了超时时间
- 消息所在的队列设置了超时时间 思考为什么要给消息或者队列设置过期时间呢 目的实现延迟任务的功能 比如要实现如下功能- 延迟10分钟发送短信给用户ttl 10分钟- 用户下单如果用户在15 分钟内未支付则自动取消- 预约工作会议20分钟后自动通知所有参会人员# 创建死信交换机在consumer服务的SpringRabbitListener中定义一个新的消费者并且声明死信交换机 dl.direct、
死信队列 dl.queueRabbitListener(bindings QueueBinding(value Queue(name dl.queue, durable true),exchange Exchange(name dl.direct),key dl
))
public void listenDlQueue(String msg){log.info(接收到 dl.queue的延迟消息{}, msg);
}# 声明队列指定超时时间在consumer服务中新建TTLMessageConfig创建ttl队列- 设置超时时间ttl(10000)
- 指定死信交换机deadLetterExchange(dl.direct)
- 指定死信的路由keydeadLetterRoutingKey(dl)package cn.itcast.mq.config;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;Configuration
public class TTLMessageConfig {Beanpublic Queue ttlQueue(){return QueueBuilder.durable(ttl.queue) // 指定队列名称并持久化.ttl(10000) // 设置队列的超时时间10秒.deadLetterExchange(dl.direct) // 指定死信交换机.deadLetterRoutingKey(dl).build();}/*** 声明交换机将ttl队列与交换机绑定*/Beanpublic DirectExchange ttlExchange(){return new DirectExchange(ttl.direct);}Beanpublic Binding ttlBinding(Queue ttlQueue, DirectExchange ttlExchange){return BindingBuilder.bind(ttlQueue).to(ttlExchange).with(ttl);}
}在publisher服务中发送消息Test
public void testTTLQueue() {// 创建消息String message hello, ttl queue;// 消息ID需要封装到CorrelationData中CorrelationData correlationData new CorrelationData(UUID.randomUUID().toString());// 发送消息rabbitTemplate.convertAndSend(ttl.direct, ttl, message, correlationData);// 记录日志log.debug(发送消息成功);
}注意先启动消费者再发送消息执行完之后观察时间戳可以看到消息发送与接收之间的时差大概是10秒。# 发送消息时设定TTL在发送消息时也可以指定TTLTest
public void testTTLMsg() {// 创建消息Message message MessageBuilder.withBody(hello, ttl message.getBytes(StandardCharsets.UTF_8))//setex : set expire.setExpiration(5000) //设置过期时间.build();// 消息ID需要封装到CorrelationData中CorrelationData correlationData new CorrelationData(UUID.randomUUID().toString());// 发送消息rabbitTemplate.convertAndSend(ttl.direct, ttl, message, correlationData);log.debug(发送消息成功);
}查看发送消息日志接收消息日志这次发送与接收的延迟只有5秒。说明当队列、消息都设置了TTL时任意一个到期就会成为死信。# 总结消息超时的两种方式是- 给队列设置ttl属性进入队列后超过ttl时间的消息变为死信
- 给消息设置ttl属性队列接收到消息超过ttl时间后变为死信如何实现发送一个消息20秒后消费者才收到消息- 给消息的目标队列指定死信交换机
- 将消费者监听的队列绑定到死信交换机
- 发送消息时给消息设置超时时间为20秒2.3.延迟交换机插件
利用TTL结合死信交换机我们实现了消息发出后消费者延迟收到消息的效果。这种消息模式就称为延迟队列Delay Queue模式。延迟队列的使用场景包括- 延迟发送短信
- 用户下单如果用户在15 分钟内未支付则自动取消
- 预约工作会议20分钟后自动通知所有参会人员因为延迟队列的需求非常多所以RabbitMQ的官方也推出了DelayExchange插件原生支持延迟队列效果。参考RabbitMQ的插件列表页面https://www.rabbitmq.com/community-plugins.html使用方式可以参考官网地址https://blog.rabbitmq.com/posts/2015/04/scheduling-messages-with-rabbitmq# 使用DelayExchange插件的使用也非常简单- 声明一个交换机交换机的类型可以是任意类型
- 设定delayed属性为true
- 声明队列与其绑定# 1声明DelayExchange交换机基于注解方式【常用】注意如果MQ容器没有安装DelayExchange插件直接指定delayedtrue启动项目时会报错RabbitListener(bindings QueueBinding(value Queue(name delay.queue,durable true),exchange Exchange(name delay.direct,delayed true),keydelay
))
public void listenDelayedQueue(String msg){log.info(接受到 delay.queue的延迟消息 {}msg);
}在consumer服务的SpringRabbitListener中添加 优势代码简单RabbitListener(bindings QueueBinding(value Queue(name delay.queue, durable true),exchange Exchange(name delay.direct, delayed true),key delay
))
public void listenDelayExchange(String msg) {log.info(消费者接收到了delay.queue的延迟消息{}, msg);
}第二种方式也可以基于Bean的方式 优势清晰明了# 2发送消息发送消息时一定要携带x-delay属性指定延迟的时间Test
public void testDelayedMsg(){Message message MessageBuilder.withBody(hello,delayed message,getBytes(StandardCharsets.UTF_8)).setHeader(x-delay,10000).build();CorrelationData correlationData new CorrelationData(UUID.random.UUID().toString());rabbitTemplate.convertAndSend(delay.direct,delay,message,correlationData);log.debug(发送消息成功);
}Test
public void testSendDelayMessage() throws InterruptedException {// 1.准备消息Message message MessageBuilder.withBody(hello, delayed messsage.getBytes(StandardCharsets.UTF_8)).setDeliveryMode(MessageDeliveryMode.PERSISTENT).setHeader(x-delay, 10000) //时间必须是数字不能是字符串.build();// 2.准备CorrelationDataCorrelationData correlationData new CorrelationData(UUID.randomUUID().toString());// 3.发送消息rabbitTemplate.convertAndSend(delay.direct, delay, message, correlationData);log.info(发送消息成功);
}此时idea控制台会有一个报错信息原因很简单在之前课程中我们添加了定义发送者Return回调如果消息发送之后没有到达队列就会报错。当使用插件发送消息时设置了x-delay10000那消息只要没有到过期时间就不会路由到队列中
而是存在一个叫Mnesia的分布式数据库管理系统中。因此需要在publisher服务的CommonConfig中判断是否为延迟消息// 判断是否是延迟消息
Integer receivedDelay message.getMessageProperties().getReceivedDelay();
if (receivedDelay ! null receivedDelay 0) {// 是一个延迟消息忽略这个错误提示return;
}# 总结延迟队列插件的使用步骤包括哪些- 声明一个交换机添加delayed属性为true
- 发送消息时添加x-delay头值为超时时间必须是int值3.惰性队列
# 消息堆积问题当生产者发送消息的速度超过了消费者处理消息的速度就会导致队列中的消息堆积直到队列存储消息达到上限。之后发送的消息就会成为死信可能会被丢弃这就是消息堆积问题。#mermaid-svg-ydtCTX49urtUkPyP {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-ydtCTX49urtUkPyP .error-icon{fill:#552222;}#mermaid-svg-ydtCTX49urtUkPyP .error-text{fill:#552222;stroke:#552222;}#mermaid-svg-ydtCTX49urtUkPyP .edge-thickness-normal{stroke-width:2px;}#mermaid-svg-ydtCTX49urtUkPyP .edge-thickness-thick{stroke-width:3.5px;}#mermaid-svg-ydtCTX49urtUkPyP .edge-pattern-solid{stroke-dasharray:0;}#mermaid-svg-ydtCTX49urtUkPyP .edge-pattern-dashed{stroke-dasharray:3;}#mermaid-svg-ydtCTX49urtUkPyP .edge-pattern-dotted{stroke-dasharray:2;}#mermaid-svg-ydtCTX49urtUkPyP .marker{fill:#333333;stroke:#333333;}#mermaid-svg-ydtCTX49urtUkPyP .marker.cross{stroke:#333333;}#mermaid-svg-ydtCTX49urtUkPyP svg{font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;}#mermaid-svg-ydtCTX49urtUkPyP .label{font-family:"trebuchet ms",verdana,arial,sans-serif;color:#333;}#mermaid-svg-ydtCTX49urtUkPyP .cluster-label text{fill:#333;}#mermaid-svg-ydtCTX49urtUkPyP .cluster-label span{color:#333;}#mermaid-svg-ydtCTX49urtUkPyP .label text,#mermaid-svg-ydtCTX49urtUkPyP span{fill:#333;color:#333;}#mermaid-svg-ydtCTX49urtUkPyP .node rect,#mermaid-svg-ydtCTX49urtUkPyP .node circle,#mermaid-svg-ydtCTX49urtUkPyP .node ellipse,#mermaid-svg-ydtCTX49urtUkPyP .node polygon,#mermaid-svg-ydtCTX49urtUkPyP .node path{fill:#ECECFF;stroke:#9370DB;stroke-width:1px;}#mermaid-svg-ydtCTX49urtUkPyP .node .label{text-align:center;}#mermaid-svg-ydtCTX49urtUkPyP .node.clickable{cursor:pointer;}#mermaid-svg-ydtCTX49urtUkPyP .arrowheadPath{fill:#333333;}#mermaid-svg-ydtCTX49urtUkPyP .edgePath .path{stroke:#333333;stroke-width:2.0px;}#mermaid-svg-ydtCTX49urtUkPyP .flowchart-link{stroke:#333333;fill:none;}#mermaid-svg-ydtCTX49urtUkPyP .edgeLabel{background-color:#e8e8e8;text-align:center;}#mermaid-svg-ydtCTX49urtUkPyP .edgeLabel rect{opacity:0.5;background-color:#e8e8e8;fill:#e8e8e8;}#mermaid-svg-ydtCTX49urtUkPyP .cluster rect{fill:#ffffde;stroke:#aaaa33;stroke-width:1px;}#mermaid-svg-ydtCTX49urtUkPyP .cluster text{fill:#333;}#mermaid-svg-ydtCTX49urtUkPyP .cluster span{color:#333;}#mermaid-svg-ydtCTX49urtUkPyP div.mermaidTooltip{position:absolute;text-align:center;max-width:200px;padding:2px;font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:12px;background:hsl(80, 100%, 96.2745098039%);border:1px solid #aaaa33;border-radius:2px;pointer-events:none;z-index:100;}#mermaid-svg-ydtCTX49urtUkPyP :root{--mermaid-font-family:"trebuchet ms",verdana,arial,sans-serif;} 队列满丢弃 Publisher Queue Consumer 死信 解决消息堆积有三种思路- 增加更多消费者提高消费速度
- 在消费者内开启线程池多线程处理加快消息处理速度
- 惰性队列扩大队列容积提高堆积上限从RabbitMQ的3.6.0版本开始就增加了Lazy Queues的概念也就是惰性队列。惰性队列的特征如下- 接收到消息后直接存入磁盘而非内存缺点速度会变慢
- 消费者要消费消息时才会从磁盘中读取并加载到内存最终推送给消费者
- 支持数百万条的消息存储3.1.基于命令行设置lazy-queue
注(本操作是Linux操作系统进行的)设置一个队列为惰性队列只需要在声明队列时指定x-queue-mode属性为lazy即可。可以通过命令行将一个运行中的队列修改为惰性队列#进入MQ容器
docker exec -it mq bashrabbitmqctl set_policy Lazy ^lazy-queue$ {queue-mode:lazy} --apply-to queues 命令解读- rabbitmqctl RabbitMQ的命令行工具
- set_policy 添加一个策略
- Lazy 策略名称可以自定义
- ^lazy-queue$ 用正则表达式匹配队列的名字
- {queue-mode:lazy} 设置队列模式为lazy模式
- --apply-to queues 策略的作用对象是所有的队列3.2.Bean声明lazy-queue Bean
public Queue lazyQueue(){return QueueBuilder.durable(lazy.queue).lazy()//开启x-queue-mode为lazy.build();
}package cn.itcast.mq.config;import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;Configuration
public class LazyConfig {Beanpublic Queue lazyQueue() {return QueueBuilder.durable(lazy.queue).lazy() //指定是惰性队列.build();}Beanpublic Queue normalQueue() {return QueueBuilder.durable(normal.queue).build();}
}重启cousumer服务确认已经创建了以上两个队列3.3.注解声明LazyQueue
此处没有给队列绑定交换机因此使用的是queuesToDeclare ?而不是bindings RabbitListener(queuesToDeclare Queue(name lazy.queue,durable true,arguments Argument(name x-queue-mode, value lazy)
))
public void listLazyQueue(String msg) {log.info(接收到 lazy.queue 的消息{}, msg);
}3.4.测试
先把cousumer服务停掉不然发送的消息都被消费掉了无法观察效果1、在publisher服务的SpringAmqpTest中发送消息到惰性队列Test
public void testLazyQueue() throws InterruptedException {long b System.currentTimeMillis();for (int i 0; i 1000; i) {// 1.准备消息Message message MessageBuilder.withBody(hello, Spring.getBytes(StandardCharsets.UTF_8)).setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT).build();// 2.发送消息rabbitTemplate.convertAndSend(lazy.queue, message);}long e System.currentTimeMillis();System.out.println(e - b);
}发现消息都在磁盘中2、也可以发送到普通队列做为对比Test
public void testNormalQueue() throws InterruptedException {long b System.currentTimeMillis();for (int i 0; i 1000; i) {// 1.准备消息Message message MessageBuilder.withBody(hello, Spring.getBytes(StandardCharsets.UTF_8)).setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT).build();// 2.发送消息rabbitTemplate.convertAndSend(normal.queue, message);}long e System.currentTimeMillis();System.out.println(e - b);
}发现消息都在内存中# 总结消息堆积问题的解决方案- 队列上绑定多个消费者提高消费速度
- 在消费者内开启线程池多线程处理加快消息处理速度
- 使用惰性队列可以再mq中保存更多消息惰性队列的优点有哪些- 基于磁盘存储消息上限高
- 没有间歇性的page-out性能比较稳定惰性队列的缺点有哪些- 基于磁盘存储消息时效性会降低
- 性能受限于磁盘的IO4.MQ集群
4.1.集群分类
RabbitMQ的是基于Erlang语言编写而Erlang又是一个面向并发的语言天然支持集群模式。RabbitMQ的集群有两种模式- 普通集群是一种分布式集群将队列分散到集群的各个节点从而提高整个集群的并发能力。
- 镜像集群是一种主从集群在普通集群的基础上添加了主从备份功能提高集群的数据可用性。镜像集群虽然支持主从但主从同步并不是强一致的某些情况下可能有数据丢失的风险。因此在RabbitMQ的3.8版本以后推出了新的功能仲裁队列来代替镜像集群底层采用Raft协议确保
主从的数据一致性。4.2.普通集群
# 集群结构和特征普通集群或者叫标准集群classic cluster具备下列特征- 会在集群的各个节点间共享部分数据包括交换机、队列元信息。不包含队列中的消息。
- 当访问集群某节点时如果队列不在该节点会从数据所在节点传递到当前节点并返回
- 队列所在节点宕机队列中的消息就会丢失结构如图4.3.镜像集群
# 集群结构和特征镜像集群本质是主从模式具备下面的特征- 交换机、队列、队列中的消息会在各个mq的镜像节点之间同步备份。
- 创建队列的节点被称为该队列的**主节点**备份到的其它节点叫做该队列的**镜像**节点。
- 一个队列的主节点可能是另一个队列的镜像节点
- 所有操作都是主节点完成然后同步给镜像节点
- 主宕机后镜像节点会替代成新的主结构如图4.4.仲裁队列
# 集群特征镜像集群虽然支持主从但主从同步并不是强一致的某些情况下可能有数据丢失的风险。仲裁队列仲裁队列是3.8版本以后才有的新功能用来替代镜像队列底层采用Raft协议确保主从的数据一致性
具备下列特征- 与镜像队列一样都是主从模式支持主从数据同步
- 使用非常简单没有复杂的配置
- 主从同步基于Raft协议强一致Java代码创建仲裁队列Bean
public Queue quorumQueue() {return QueueBuilder.durable(quorum.queue2) // 持久化//.layzy() //惰性队列.quorum() // 仲裁队列.build();
}# SpringAMQP连接MQ集群注意这里用address来代替host、port方式spring:rabbitmq:#host: 192.168.200.130#port: 5672addresses: 192.168.200.130:8071, 192.168.200.130:8072, 192.168.200.130:8073username: itcastpassword: 123321virtual-host: /注意因为重新创建的3个MQ集群还没有安装延迟队列插件因此原来练习延迟队列的代码需要注释掉
1、创建交换机时RabbitListener(bindings QueueBinding(value Queue(name delay.queue, durable true),exchange Exchange(name delay.direct, delayed true),key delay
))
public void listenDelayExchange(String msg) {log.info(消费者接收到了delay.queue的延迟消息{}, msg);
}
/******************************************************************************************/
2、发送消息时Test
public void testSendDelayMessage() throws InterruptedException {// 1.准备消息Message message MessageBuilder.withBody(hello, delayed messsage.getBytes(StandardCharsets.UTF_8)).setDeliveryMode(MessageDeliveryMode.PERSISTENT).setHeader(x-delay, 10000).build();// 2.准备CorrelationDataCorrelationData correlationData new CorrelationData(UUID.randomUUID().toString());// 3.发送消息rabbitTemplate.convertAndSend(delay.direct, delay, message, correlationData);log.info(发送消息成功);
}