图书管理系统网站开发,抚顺市网站建设,图片制作二维码,景区宣传推广方案RabbitMQ队列类型
Classic经典队列 这是RabbitMQ最为经典的队列类型。在单机环境中#xff0c;拥有比较高的消息可靠性。 经典队列可以选择是否持久化(Durability)以及是否自动删除(Auto delete)两个属性。 Durability有两个选项#xff0c;Durable和Transient。 Durable表…RabbitMQ队列类型
Classic经典队列 这是RabbitMQ最为经典的队列类型。在单机环境中拥有比较高的消息可靠性。 经典队列可以选择是否持久化(Durability)以及是否自动删除(Auto delete)两个属性。 Durability有两个选项Durable和Transient。 Durable表示队列会将消息保存到硬盘这样消息的安全性更高。但是同时由于需要有更多的IO操作所以生产和消费消息的性能相比Transient会比较低。 Auto delete属性如果选择为是那队列将在至少一个消费者已经连接然后所有的消费者都断开连接后删除自己。 经典队列不适合积累太多的消息。如果队列中积累的消息太多了会严重影响客户端生产消息以及消费消息的性能。因此经典队列主要用在数据量比较小并且生产消息和消费消息的速度比较稳定的业务场景。比如内部系统之间的服务调用。
Quorum仲裁队列 仲裁队列是3.8引入的一个新队列类型;仲裁队列相比Classic经典队列在分布式环境下对消息的可靠性保障更高。 Quorum是基于Raft一致性协议实现的一种新型的分布式消息队列他实现了持久化多备份的FIFO队列主要就是针对RabbitMQ的镜像模式设计的。简单理解就是quorum队列中的消息需要有集群中多半节点同意确认后才会写入到队列中。 Classic与Quorum对比、少了一些高级特性 Quorum队列更适合于 队列长期存在并且对容错、数据安全方面的要求比低延迟、不持久等高级队列更能要求更严格的场景。例如 电商系统的订单引入MQ后处理速度可以慢一点但是订单不能丢失。 Quorum不适合的场景如下 队列的临时性暂时性或独占队列、高队列变动率声明和删除率尽可能低的延迟由于其数据安全功能底层共识算法固有的延迟更高当数据安全不是优先事项时例如应用程序不使用手动确认不使用发布者确认很长的队列积压流可能更适合
创建Quorum队列
Spring创建仲裁队列需要设置参数“-x-queue-type”为“quorum”
Configuration
public class QuorumConfig {public final static String QUEUE_TYPE x-queue-type;public final static String QUEUE_TYPE_VAL quorum;public final static String QUEUE_NAME quorumQueue;Beanpublic Queue quorumQueue() {HashMapString, Object params new HashMap();params.put(QUEUE_TYPE,QUEUE_TYPE_VAL);Queue queue new Queue(QUEUE_NAME, true, false, false, params);return queue;}
} Rabbit Client创建Quorum队列
MapString,Object params new HashMap();
params.put(x-queue-type,quorum);
//声明Quorum队列的方式就是添加一个x-queue-type参数指定为quorum。默认是classic
channel.queueDeclare(QUEUE_NAME, true, false, false, params); Quorum队列的消息是必须持久化的所以durable参数必须设定为true如果声明为false就会报错。同样exclusive参数必须设置为false。这些声明在Producer和Consumer中是要保持一致的。
Stream队列
Stream队列是3.9.0版本引入新队列类型。持久化到磁盘并且具备分布式备份的更适合于消费者多读消息非常频繁的场景。Stream队列的核心是以append-only只添加的日志来记录消息整体来说就是消息将以append-only的方式持久化到日志文件中然后通过调整每个消费者的消费进度offset来实现消息的多次分发。类似kafka;
创建Stream队列
Spring AMQP目前还不支持创建Stream队列只能使用原生API创建 MapString,Object params new HashMap();params.put(x-queue-type,stream);params.put(x-max-length-bytes, 20_000_000_000L); // maximum stream size: 20 GBparams.put(x-stream-max-segment-size-bytes, 100_000_000); // size of segment files: 100 MBchannel.queueDeclare(QUEUE_NAME, true, false, false, params);Stream队列的durable参数必须声明为trueexclusive参数必须声明为false。 x-max-length-bytes 表示日志文件的最大字节数 x-stream-max-segment-size-bytes 每一个日志文件的最大大小。这两个是可选参数通常为了防止stream日志无限制累计都会配合stream队列一起声明。
消费者 MapString,Object consumeParam new HashMap();consumeParam.put(x-stream-offset,last);channel.basicConsume(QUEUE_NAME, false,consumeParam, myconsumer);x-stream-offset的类型
first: 从日志队列中第一个可消费的消息开始消费last: 消费消息日志中最后一个消息next: 相当于不指定offset消费不到消息。Offset: 一个数字型的偏移量Timestamp:一个代表时间的Data类型变量表示从这个时间点开始消费。例如 一个小时前Date timestamp new Date(System.currentTimeMillis() - 60 * 60 * 1_000)
Stream队列产品目前不够成熟目前用的最多的还是Classic经典队列。RabbitMQ目前主推的是Quorum队列
死信消息
有以下三种情况RabbitMQ会将一个正常消息转成死信 消息被消费者确认拒绝。消费者把requeue参数设置为true(false)并且在消费后向 RabbitMQ返回拒绝。channel.basicReject或者channel.basicNack。 消息达到预设的TTL时限还一直没有被消费。 消息由于队列已经达到最长长度限制而被丢掉 TTL即最长存活时间 Time-To-Live 。消息在队列中保存时间超过这个TTL即会被认为死亡。死亡的消息会被丢入死信队列如果没有配置死信队列的话RabbitMQ会保证死了的消息不会再次被投递并且在未来版本中会主动删除掉这些死掉的消息。 声明队列时、设置x-message-ttl值 MapString, Object args new HashMapString, Object();args.put(x-message-ttl, 60000);channel.queueDeclare(myqueue, false, false, false, args);如何判断消息是否为死信
消息被作为死信转移到死信队列后header中还会加上第一次成为死信的三个属性并且这三个属性在以后的传递过程中都不会更改。具体可以调试去看看
x-first-death-reason 原因x-first-death-queue 队列x-first-death-exchange 交换机
死信队列
存在死信消息的队列RabbitMQ中有两种方式可以声明死信队列一种是针对某个单独队列指定对应的死信队列。另一种就是以策略的方式进行批量死信队列的配置。 流程图如下
代码 死信交换机、队列
Configuration
public class DeadConfig {public final static String DEAD_EXCHANGE deadExchange;public final static String DEAD_QUEUE_NAME deadQueue;Beanpublic FanoutExchange deadExchange() {FanoutExchange directExchange new FanoutExchange(DEAD_EXCHANGE);return directExchange;}Beanpublic Queue deadQueue() {Queue queue new Queue(DEAD_QUEUE_NAME);return queue;}Beanpublic Binding deadBinding(FanoutExchange deadExchange, Queue deadQueue) {return BindingBuilder.bind(deadQueue).to(deadExchange);}}发送者
Controller
public class MessageTx {Autowiredprivate MessageService messageService;GetMapping(/sendDeadMsg)ResponseBodypublic String sendMoreMsgTx(){//发送10条消息for (int i 0; i 10; i) {String msg msgi;System.out.println(发送消息 msgmsg);// xiangjiao.exchange 交换机// xiangjiao.routingKey 队列messageService.sendMessage(MessageConfig.EXCHANGE_NAME, , msg);//每两秒发送一次try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}}return send ok;}
}Slf4j
Component
public class MessageService {Autowiredprivate RabbitTemplate rabbitTemplate;public void sendMessage(String exchange,String routingKey,Object msg) {// 暂时关闭 return 配置//rabbitTemplate.setReturnCallback(this);//发送消息rabbitTemplate.convertAndSend(exchange,routingKey,msg);}}消费者
public class MessageConsumer {// RabbitHandler : 标记的方法只能有一个参数类型为String ,若是传Map参数、则需要传入map参数// RabbitListener:标记的方法可以传入Channel Message参数RabbitListener(queues MessageConfig.MESSAGE_QUEUE_NAME)public void listenObjectQueue(Channel channel, Message message, String msg) throws IOException {System.out.println(接收到object.queue的消息 msg);System.out.println(消息ID message.getMessageProperties().getDeliveryTag());try {channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);System.out.println(拒绝消息 tag message.getMessageProperties().getDeliveryTag());
// channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}catch (IOException exception) {//拒绝确认消息channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);//拒绝消息
// channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);}}}
注入容器
Configuration
public class MessageConfig {public final static String EXCHANGE_NAME deadMessageTestExchange;public final static String MESSAGE_QUEUE_NAME deadMessageTestQueue;public final static String MESSAGE_ROUTE_KEY deadMessageTestRoutingKey;public final static String DEAD_EXCHANGE_KEY x-dead-letter-exchange;Beanpublic FanoutExchange deadMessageTestExchange() {return new FanoutExchange(EXCHANGE_NAME);}Beanpublic Queue deadMessageTestQueue() {HashMapString, Object params new HashMap();params.put(DEAD_EXCHANGE_KEY, DeadConfig.DEAD_EXCHANGE);return new Queue(MESSAGE_QUEUE_NAME, true, false, false, params);}Beanpublic MessageConsumer deadMessageTestConsumer() {return new MessageConsumer();}Beanpublic Binding messageBinding(Queue deadMessageTestQueue, FanoutExchange deadMessageTestExchange) {return BindingBuilder.bind(deadMessageTestQueue).to(deadMessageTestExchange);}
}延迟队列
RabbitMQ有提供插件使用延迟队列 另外可借助 死信队列 实现延迟队列 实现思路
给普通队列设置消息过期时间(延迟时间) 不设置消费者当消息过期后将消息放入死信队列 给死信队列设置消费者
懒队列
懒队列会尽可能早的将消息内容保存到硬盘当中并且只有在用户请求到时才临时从硬盘加载到RAM内存当中。 可解决部分消息积压问题、海量消息积压RabbitMQ存不下就得使用分布式存储消息 适用的一些场景
消费者服务宕机了有一个突然的消息高峰生产者生产消息超过消费者消费者消费太慢了
默认情况下RabbitMQ接收到消息时会保存到内存以便使用同时把消息写到硬盘。但是 消息写入硬盘的过程是会阻塞队列的。RabbitMQ虽然做了优化但是在长队列中表现不是很理想所以有了懒队列、 以磁盘IO为代价解决消息积压问题
SpringBoot懒队列声明方式
Configuration
public class LazyQueueConfig {Beanpublic Queue lazyQueue() {HashMapString, Object params new HashMap();params.put(x-queue-mode, lazy);return new Queue(lazyQueue, true, false, false, params);}
}原生API方式
MapString, Object args new HashMapString, Object();
args.put(x-queue-mode, lazy);
channel.queueDeclare(myqueue, false, false, false, args);懒队列适合消息量大且长期有堆积的队列可以减少内存使用加快消费速度。但是这是以大量消耗集群的网络及磁盘IO为代价的。
集群模式
分布式环境下是不允许单点故障存在需要保证高可用 因此需要集群环境保证高可用另外若存在海量消息还需要保证存放得下、即分布式存储
普通集群模式
集群的各个节点之间只会有相同的元数据即队列结构而消息不会进行冗余只存在一个节点中。消费时如果消费的不是存有数据的节点 RabbitMQ会临时在节点之间进行数据传输将消息从存有数据的节点传输到消费的节点。此模式解决分布式存储问题、但可靠性不高相当于多个单机服务每个都是独立的一个都不可以宕机。某台机器宕机、则存储的消息无法消费、若未开启持久化、则丢失消息 若消费者正在处理消息则机器无法收到确认信息该消息重新入队则重复消费普通集群模式不支持高可用即当某一个节点服务挂了后需要手动重启服务才能保证这一部分消息能正常消费。
镜像集群模式
在普通集群的基础上每次保存消息后机器主动同步到多台机器上 而不是消费者获取消息时再去其他节点上获取集群会选举主节点master, 当主节点挂了则会重新选举此方式实现了集群高可用但是集群之间同步消息频繁海量数据时、同步频率更大导致占满带宽
消息常见问题
RabbitMQ如何保证消息不丢失
先看看哪些情况下会存在丢失消息 124步骤是可能丢消息的因为三个步骤都是跨网络的
生产者保证消息正确发送到RibbitMQ
对于单个数据可以使用生产者确认机制。通过多次确认的方式保证生产者的消息能够正确的发送到RabbitMQ中。RabbitMQ的生产者确认机制分为同步确认和异步确认。同步确认主要是通过在生产者端使用Channel.waitForConfirmsOrDie()指定一个等待确认的完成时间。异步确认机制则是通过channel.addConfirmListener(ConfirmCallback var1, ConfirmCallback var2)在生产者端注入两个回调确认函数。第一个函数是在生产者消息发送成功时调用第二个函数则是生产者消息发送失败时调用。两个函数需要通过sequenceNumber自行完成消息的前后对应。sequenceNumber的生成方式需要通过channel的序列获取。int sequenceNumber channel.getNextPublishSeqNo(); 如果发送批量消息在RabbitMQ中另外还有一种手动事务的方式可以保证消息正确发送手动事务机制主要有几个关键的方法 channel.txSelect() 开启事务 channel.txCommit() 提交事务 channel.txRollback() 回滚事务 用这几个方法来进行事务管理。但是这种方式需要手动控制事务逻辑并且手动事务会对channel产生阻塞造成吞吐量下降
RabbitMQ消息存盘不丢消息
消息若是只存内存中则宕机会丢失消息 因此队列需要开启持久化durable参数、默认创建队列durable都会为true; 而Quorum和Stream队列默认都是开启持久化
RabbitMQ 主从消息同步时不丢消息
普通集群模式消息是分散存储的不会主动进行消息同步了是有可能丢失消息的。而镜像模式集群数据会主动在集群各个节点当中同步这时丢失消息的概率不会太高。
RabbitMQ消费者不丢失消息
消费者确认分为自动确认手动确认若是自动确认则消息处理完会返回确认ack;若是处理出现异常 则会重新入队再次处理 因此存在重复消费问题
若是手动确认消息处理过程中使用channel#basicAck, basicNack, basicReject返回确认或拒绝SpringBoot配置文件中通过属性spring.rabbitmq.listener.simple.acknowledge-mode需要设置mutual手动确认 SpringBoot配置文件中通过属性spring.rabbitmq.listener.simple.acknowledge-mode 进行指定。可以设定为 AUTO 自动应答 MANUAL 手动应答NONE 不应答 如何保证消息幂等
当消费者消费消息处理业务逻辑时如果抛出异常或者不向RabbitMQ返回响应默认情况下RabbitMQ会无限次数的重复进行消息消费。
处理幂等问题要设定RabbitMQ的重试次数。在SpringBoot集成RabbitMQ时可以在配置文件 中指定spring.rabbitmq.listener.simple.retry开头的一系列属性来制定重试策略。
需要在业务上处理幂等问题 处理幂等问题的关键是要给每个消息一个唯一的标识虽然RabbitMQ会给每条消息带上MessageId (处理幂等问题的关键是要给每个消息一个唯一的标识); SpringBoot框架集成RabbitMQ后可以给每个消息指定一个全局唯一的MessageID在消费者端针对MessageID做幂等性判断。
//发送者
Message message2 MessageBuilder.withBody(message.getBytes()).setMessageId(UUID.randomUUID().toString()).build();
rabbitTemplate.send(message2);//消费者获取MessageID自己做幂等性判断
RabbitListener(queues fanout_email_queue)
public void process(Message message) throws Exception {// 获取消息IdString messageId message.getMessageProperties().getMessageId();...
}可为了业务上的方便再封装一层 专门用来放入消息ID 否则设置ID的代码随处可见
如何保证消息的顺序
RabbitMQ中保证顺序的方法是 单队列单消息推送 若是多队列的情况下RabbitMQ没有很好的解决方案
个人思考如果RabbitMQ架构上很难处理可以通过业务设置保证顺序 即给每条消息设置序号 消费时、查询数据库之前的消息是否处理完若没有查到则等待一会 若查得到则处理消息处理完后把消息id 序号 放入数据库代表已经处理完
RabbitMQ的数据堆积问题
bbitMQ一直以来都有一个缺点就是对于消息堆积问题的处理不好。当RabbitMQ中有大量消息堆积时整体性能会严重下降。而目前新推出的Quorum队列以及Stream队列目的就在于解决这个核心问题。目前大部分企业还是围绕Classic经典队列构建应用。因此在使用RabbitMQ时还是要非常注意消息堆积的问题。尽量让消息的消费速度和生产速度保持一致。
对于生产者 最明显的方式自然是降低消息生产的速度。但是生产者端产生消息的速度通常是跟业务息息相关的一般情况下不太好直接优化。但是可以选择尽量多采用批量消息的方式降低IO频率。对于服务器端 可使用懒队列方式存储 部分消息积压(单机的磁盘容量还是有限)可使用Sharding分片队列(分布式存储) 对于消费者 检查业务代码是不是太挫了 优化代码代码性能没问题、则要增加消费者数量提升消费速度若是经常存在海量消息则可以放入数据库、慢慢消费