龙岗网站建设工程,公司网站上线流程,凡科建站教程,建设社团网站的可行性分析RabbitMq消费与生产#xff0c;消费失败重发机制#xff0c;发送确认机制#xff0c;消息发送结果回执 1. RabbitMq集成spring bootRabbitMq集成依赖RabbitMq配置RabbitMq生产者#xff0c;队列#xff0c;交换通道配置#xff0c;消费者示例 2. RabbitMq消息确认机制消息… RabbitMq消费与生产消费失败重发机制发送确认机制消息发送结果回执 1. RabbitMq集成spring bootRabbitMq集成依赖RabbitMq配置RabbitMq生产者队列交换通道配置消费者示例 2. RabbitMq消息确认机制消息确认机制分自动确认和手动确认 3. 消息重发机制消息重发配置消息重发如何触发 4. 延时消息队列5. 接收返回结果队列尚未研究后续用到补充 6. 遇到的报错启动报错 Channel shutdown: channel error; protocol method: 1. RabbitMq集成spring boot RabbitMq集成依赖 这里spring-boot依赖版本为2.3.7版本RabbitMq集成amqp包版本在spring-boot中有涵盖不单独指明版本了。 parentgroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-parent/artifactIdversion2.3.7.RELEASE/version
/parentdependencyManagementdependenciesdependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-dependencies/artifactIdversion2.3.7.RELEASE/versiontypepom/type/dependency/dependencies
/dependencyManagement dependencies!-- rabbitMQ --dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId/dependency
/dependencies RabbitMq配置 spring:rabbitmq:# 基础项host: ip地址port: 端口username: 用户名password: 密码# virtualhost需要提前在MQ的Web管理界面里手动创建或者配置默认host/virtual-host: /# 生产者#确认消息已发送到交换机(Exchange)publisher-confirm-type: correlated#开启消息发送确认机制默认为false#如果没有本条配置信息当消费者收到生产者发送的消息后生产者无法收到确认成功的回调信息publisher-confirms: true#支持消息发送失败返回队列,默认为falsepublisher-returns: true# 消费者listener:type: simplesimple:#自动签收auto 手动 manualacknowledge-mode: auto#个字段一定要设置成 false 不然无法消费的数据不会进入死信队列的default-requeue-rejected: falseprefetch: 1 #限制每次发送一条数据max-concurrency: 1 #启动消费者最大数量concurrency: 1 #同一个队列启动几个消费者retry:enabled: true #是否支持重试max-attempts: 3 # 最大重试次数默认为3initial-interval: 30s # 重试间隔时间默认1000(单位毫秒)max-interval: 120s # 重试最大间隔# 时间间隔的乘子下一次间隔的时间间隔时间 × 乘子但最大不超过重试最大间隔multiplier: 1 RabbitMq生产者队列交换通道配置消费者示例 Exchange 交换机配置 Component
public class DnfxExchangeConfig {AutowiredRabbitMqConfig rabbitMqConfig;/*** topic交换机起名* 如果rabbitmq设置的类型是topic 就用topic类型的Exchange** return*/BeanTopicExchange dnfxOrderExchange() {return new TopicExchange(rabbitMqConfig.getFxexchange());}
}队列queue配置 Component
public class DnfxQueueConfig {AutowiredRabbitMqConfig rabbitMqConfig;/*** 队列起名** return*/Beanpublic Queue dnfxOrderQueue() {MapString, Object argsMap new HashMapString, Object();//队列优先级 argsMap.put(x-max-priority, 5);//true 是否持久 return new Queue(rabbitMqConfig.getFxqueue(), true, false, false, argsMap);}
}将队列和交换机绑定, 并设置用于匹配键 Component
public class DnfxRoutingConfig {AutowiredRabbitMqConfig rabbitMqConfig;AutowiredDnfxQueueConfig queueConfig;AutowiredDnfxExchangeConfig exchangeConfig;/*** 绑定将队列和交换机绑定, 并设置用于匹配键 myDirectRouting** return*/BeanBinding bindingOrderRouting() {return BindingBuilder.bind(queueConfig.dnfxOrderQueue()).to(exchangeConfig.dnfxOrderExchange()).with(rabbitMqConfig.getFxrouting());}
}配置加载 Configuration
ConfigurationProperties(prefix xx.mq)
Data
public class RabbitMqConfig {private String fxqueue;private String fxexchange;private String fxrouting;
} RabbitTemplate Configuration
public class DnfxRabbitMqConfig {AutowiredRabbitMqConfig rabbitMqConfig;Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {return new RabbitTemplate(connectionFactory);}
}生产者 Component
public class DemoTestProduce {Autowiredprivate RabbitTemplate rabbitTemplate;AutowiredRabbitMqConfig rabbitMqConfig;public void sendDemoMsg() {String message 测试消息发送;rabbitTemplate.convertAndSend(rabbitMqConfig.getFxexchange(), rabbitMqConfig.getFxrouting(), message);}
}消费者 Component
public class DnfxAliBbMessageListener {private final static Logger logger LoggerFactory.getLogger(DnfxAliBbMessageListener.class);RabbitListener(containerFactory rabbitListenerContainerFactory, queues ${xx.mq.fxqueue})public void listenSimpleQueueMessage(String msg) throws IOException {logger.info(接收到的1688回执消息{}, msg);}
}2. RabbitMq消息确认机制 消息确认机制分自动确认和手动确认 消息确认签收配置 # 消费者
listener:type: simplesimple:#自动签收 auto 手动 manualacknowledge-mode: auto消息确认签收机制不过多赘述网上有大把说明这里简单描述一下以及记录一下个人使用心得。 acknowledge-mode: auto 配置为自动签收时候消息送达至消费者手上后Mq自动签收并移除消息出消息队列。 acknowledge-mode: false 配置为手动签收时候消息送达至消费者后消费者需要手动触发签收动作如果消费者没有发送ACK消息RabbitMQ服务器就会认为该消息还没有被消费会将该消息重新发送给其他消费者。例如下图手动签收模式没有主动向MQ发送签收讯息那么当前消费的这条消息会被标记为Unacked 关于签收 确认机制可以参考 https://blog.csdn.net/qq_42331185/article/details/131696949 这里贴部分这个博主的结论 RabbitListener(containerFactory rabbitListenerContainerFactory, queues ${xx.mq.fxqueue})public void listenSimpleQueueMessage(Message message, Channel channel) throws IOException {String msgBody new String(message.getBody());logger.info(接收到的1688回执消息{}, msgBody);long deliveryTag message.getMessageProperties().getDeliveryTag();channel.basicReject(deliveryTag, false);deliveryTag消息传递标签格式为序列号必须使用这个标签不然信道会关闭详情下面会说到 multiple为true则表示序号deliverTag之前的消息均被确认或拒绝basicNackfalse表示当前消息。为true的时候就可以做到批量确认 requeue为true表示失败的消息将会重新排队不会丢弃或者死信为false则表示丢弃 1、消息成功签收 basicAck(deliveryTagmultiple)channel.basicAck(message.getEnvelope().getDeliveryTag(), false);2、失败确认 basicNack(deliveryTagmultiplerequeue)channel.basicNack(message.getEnvelope().getDeliveryTag(),false, true);3、失败确认basicReject(deliveryTagrequeue)channel.basicReject(message.getEnvelope().getDeliveryTag(), true);注关于以上手动确认multiple属性为true时批量确认这个元素个人未进行验证失败确认requeue为true时当前消息会重新丟至MQ队列中等待下次消费已验证 关于消息确认机制自动确认可能导致消息丢失如果单条消息发送至消费者后消费者处理报错最多触发消息重发机制重发达到重发上限后便会抛弃此消息造成消息丢失。 手动确认签收千万不要在cath中或者final中进行失败重发签收即basicNack basicReject 失败签收时requeue 为true否则当前消息若真为异常消息此消息会一直消费失败签收重新排队进行循环导致消息积压或者资源浪费
3. 消息重发机制 消息重发配置 注意 如果遗漏 max-interval multiplier两个属性消息重发机制仍会生效但是重发间隔时间为默认10秒重发 initial-interval 重发间隔时间将不会生效。此处已验证尚未确认是bug或者本身就是联动配置 # 消费者
listener:type: simplesimple:retry:enabled: true #是否支持重试max-attempts: 3 # 最大重试次数默认为3initial-interval: 30s # 重试间隔时间默认1000(单位毫秒)max-interval: 120s # 重试最大间隔# 时间间隔的乘子下一次间隔的时间间隔时间 × 乘子但最大不超过重试最大间隔multiplier: 1消息重发如何触发 消息重发机制与消息确认签收机制是两种不同的机制这个概念不要弄混了消息确认签收机制亦可以将消息重新放入队列进行二次消费 消息重发机制在消费者进行消费时如果rabbitmq开启了消息重发机制当消费者处理消息时候抛出了异常即触发消息重发机制注意处理消息逻辑不要用try-catch捕捉异常异常被捕捉后会抛出异常信息但不会影响代码正常执行amqp aop会视为正常消费不会触发重发机制。 RabbitListener(containerFactory rabbitListenerContainerFactory, queues ${zcwl.mq.fxqueue})public void listenSimpleQueueMessage(Message message, Channel channel) throws IOException {String msgBody new String(message.getBody());logger.info(接收到的1688回执消息{}, msgBody);long deliveryTag message.getMessageProperties().getDeliveryTag();//此处会抛出异常int a 1 / 0;//确认签收机制为手动签收一定要进行签收否则触发重发机制后此条消息仍会被标记为unackedchannel.basicReject(deliveryTag, false);4. 延时消息队列 延时消息队列需要配合RabbitMq延时消息队列插件使用安装延时消息队列插件此处不赘述网上搜一大把 延时消息队列创建队列以及绑定key时没什么特殊的在创建exchange交换机时需要注意选项如下图所示即可。 x-delayed-type redirect 如果不能创建报错时那么topic也是可以的 注册exchange交换机时候注意给入 x-delayed-type 参数队列注册以及队列交换机绑定与普通队列一样即可 BeanCustomExchange dnfxOrderDelayExchange() {MapString, Object args new HashMapString, Object();args.put(x-delayed-type, topic);return new CustomExchange(rabbitMqConfig.getFxOrderDelayExchange(), x-delayed-message, true, false, args);}测试发送延时消息方法队列监听与普通消息一样即可 public void sendDelayMsg() {System.out.println(LocalDateTime.now() 发送延时消息);String message 这里是测试延时发送消息;this.rabbitTemplate.convertAndSend(rabbitMqConfig.getFxOrderDelayExchange(), rabbitMqConfig.getFxOrderDelayRouting(), message, message1 - {//delay的单位是毫秒message1.getMessageProperties().setDelay(1000 * 60);return message1;});}5. 接收返回结果队列
尚未研究后续用到补充
6. 遇到的报错 启动报错 Channel shutdown: channel error; protocol method: 报错详情 Channel shutdown: channel error; protocol method: #methodchannel.close(reply-code406, reply-textPRECONDITION_FAILED - inequivalent arg type for exchange fx-bb-msg-exchange in vhost /: received direct but current is topic, class-id40, method-id10)此错误为注册交换机时候抛出的错误错误信息为注册交换机的属性与RabbitMq已经创建好的交换机属性不一致程序试图修改属性报错。 错误示范 当前exchange交换机创建时候创建的类型Type为topic类型在注册exchange交换机时返回的却是DirectExchange那么系统便会尝试修改属性从而引发报错 修复方式 创建时返回TopicExchange即可与 创建的交换机类型保持一致