网站建设可以用350摸板,北京市网站制作设计,智能建站程序,wordpress腾讯课堂目录 一、序言二、生产者确保消息发送成功1、为什么需要Publisher Confirms2、哪些消息会被确认处理成功 三、消费者保证消息被处理四、Spring RabbitMQ支持代码示例1、 application.yml2、RabbigtMQ配置3、可靠生产者配置4、可靠消费者配置5、测试用例 一、序言
在有些业务场… 目录 一、序言二、生产者确保消息发送成功1、为什么需要Publisher Confirms2、哪些消息会被确认处理成功 三、消费者保证消息被处理四、Spring RabbitMQ支持代码示例1、 application.yml2、RabbigtMQ配置3、可靠生产者配置4、可靠消费者配置5、测试用例 一、序言
在有些业务场景中消息是不能丢的比如分布式事务资金动账出账方扣款那么入账方就一定要收款。以前写了一篇分布式事务的文章里面的跨地区转账就是一个实际案例。
消息是有可能丢的比如生产者在发送消息时broker服务挂了消息没有来得及落盘这时消息就彻底丢了。
保证MQ消息可靠传输主要有两个方面一方面是消息生产者确保消息一定发送成功另一方面是消费者确保消息一定被处理。 二、生产者确保消息发送成功
1、为什么需要Publisher Confirms
在Spring AMQP中AmqpTemplate的实现RabbitTemplate已经支持 Publisher Confirms and Returns所谓的publisher confirms意思就是消息发布者确认消息是否已经被发送。
在RabbitMQ官方文档描述中持久化的消息在Broker重启时也是应该存活的这里的词用的是应该因为消息有可能在落地磁盘前Broker就挂了导致消息丢失。
最直接的解决方案是通过事务但是通过事务有两个问题
事务阻塞发布者必须等待Broker处理完每条消息。事务很重每次提交都会要求触发fsync()强制磁盘这个过程需要花很长的时间。 备注在RabbitMQ官方测试中通过事务去保证发布10000条消息需要花至少4分钟的时间。 而通过Publisher Confirm机制一旦Broker处理完就会确认消息而且这个过程是异步的生产者可以流式发布消息不需要等待Broker并且Broker会批量高效将消息落盘。
2、哪些消息会被确认处理成功
当Broker确认消息时会通知消息发布者消息是否被成功处理成功处理的基本规则如下
无法路由的mandatory(必须有符合条件的队列)和immediate(必须有消费者在线)类型在被basic.return后会被确认。非持久化消息在入队时会被确认。持久化消息当持久化到磁盘或者被消费者消费时会被确认。 三、消费者保证消息被处理
消费者端确保消息消费很简单关闭消息自动确认就好开启消息手动确认。当然有些场景消息只能被处理一次可以通过分布式锁来实现。 四、Spring RabbitMQ支持代码示例
1、 application.yml
server:port: 8080
spring:rabbitmq:addresses: localhost:5672username: adminpassword: adminvirtual-host: /publisher-returns: truepublisher-confirm-type: correlatedlistener:type: simplesimple:acknowledge-mode: manualconcurrency: 5max-concurrency: 20prefetch: 5template:mandatory: true备注 这里一定要设置spring.rabbitmq.publisher-returns为true并且设置spring.rabbitmq.publisher-confirm-type为correlated同时设置spring.rabbitmq.template.mandatory为true。上面我们将消费者的确认模式改为了手动确认。 2、RabbigtMQ配置
Configuration
public class RabbitReliableTransportConfig {/*** RabbitTemplate消息转换器配置自动将对象转换为json字符串** return*/Beanpublic MessageConverter jackson2JsonMessageConverter() {Jackson2JsonMessageConverter messageConverter new Jackson2JsonMessageConverter();messageConverter.setClassMapper(new DefaultJackson2JavaTypeMapper());return messageConverter;}Beanpublic Queue reliableQueue() {return QueueBuilder.durable(reliable-queue).build();}
}3、可靠生产者配置
Slf4j
Component
RequiredArgsConstructor
public class RabbitMqReliableProducer {private final RabbitTemplate rabbitTemplate;public void sendReliableMsg(String body) {// 发送可靠消息ReliableMsgDTO reliableMsgDTO ReliableMsgDTO.builder().body(body).build();CorrelationData correlationData new CorrelationData();rabbitTemplate.convertAndSend(reliable-queue, reliableMsgDTO, correlationData);// 发送确认逻辑CompletableFutureConfirm future correlationData.getFuture().completable();future.whenComplete((confirm, throwable) - {if (confirm.isAck()) {log.info(消息已经被成功发送, 消息内容:{}, JSON.toJSONString(reliableMsgDTO));return;}log.warn(消息发送未成功发送, 原因:{}, 消息内容:{}, confirm.getReason(), JSON.toJSONString(reliableMsgDTO), throwable);// 5秒后再发送LockSupport.parkNanos(5 * 1000 * 1000 * 1000L);rabbitTemplate.convertSendAndReceive(reliableMsgDTO, correlationData);});}
}4、可靠消费者配置
Slf4j
Component
public class RabbitMQReliableConsumer {RabbitListener(queues reliable-queue)public void handleMsgFromQueue(ReliableMsgDTO reliableMsgDTO, Channel channel, Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {channel.basicAck(tag, false);// channel.basicNack(tag, false, false);log.info(Message received from queue, message body: {}, JSON.toJSONString(reliableMsgDTO));}
}备注这里我们开启了消息的手动确认如果消息处理失败没有确认那么消息将会在下次消费者参加连接时再次被投递。 5、测试用例
测试结果如下每当消息发送至Broker成功后会触发回调如果消息发送失败将会触发重新发送。
2024-01-20 18:13:11.399 INFO 12316 --- [78.107.127:5672] c.u.r.i.p.RabbitMqReliableProducer : 消息已经被成功发送, 消息内容:{body:hello}
2024-01-20 18:13:11.399 INFO 12316 --- [ntContainer#0-5] c.u.r.i.c.RabbitMQReliableConsumer : Message received from queue, message body: {body:hello}