建设银行重庆分行网站,怎么做单向网站链接,出版社网站建设方案,wordpress 产品购买如何保证消息的可靠性投递#xff1f;
1.保证生产者向broke可靠性投递#xff0c;开启ack投递成功确认#xff0c;如果失败的话进行消息补偿
/*** author yueF_L* date 2023-08-10 01:32* ConfirmCallback#xff1a;消息只要被 RabbitMQ broker 接收到就会触发confirm方…如何保证消息的可靠性投递
1.保证生产者向broke可靠性投递开启ack投递成功确认如果失败的话进行消息补偿
/*** author yueF_L* date 2023-08-10 01:32* ConfirmCallback消息只要被 RabbitMQ broker 接收到就会触发confirm方法。*/
Slf4j
Component
public class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback {Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if (!ack) {log.error(confirm发送到broker失败\r\n correlationData{}\r\n ack{}\r\n cause{},correlationData, ack, cause);} else {log.info(confirm发送到broker成功\r\n correlationData{}\r\n ack{}\r\n cause{},correlationData, ack, cause);}}
2. 保证消息能投敌到目标 queue
/*** author yueF_L* date 2023-08-10 01:29* ReturnCallback如果消息未能投递到目标 queue 里将触发returnedMessage方法。* 若向 queue 投递消息未成功可记录下当前消息的详细投递数据方便后续做重发或者补偿等操作。*/
Slf4j
Component
public class ReturnCallbackService implements RabbitTemplate.ReturnCallback {Overridepublic void returnedMessage(Message message, int replyCode, String replyText,String exchange, String routingKey) {log.info(returnedMessage \r\n message{}\r\n replyCode{}\r\n replyText{}\r\n exchange{}\r\n routingKey{},message, replyCode, replyText, exchange, routingKey);}
}将配置set到rabbitTemplate
/*** author yueF_L* date 2023-08-10 01:25* 消息队列配置*/
Slf4j
Configuration
RequiredArgsConstructor
public class RabbitMQConfig {private final ConfirmCallbackService confirmCallbackService;private final ReturnCallbackService returnCallbackService;BeanRabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate new RabbitTemplate(connectionFactory);// 开启失败通知rabbitTemplate.setMandatory(true);rabbitTemplate.setConfirmCallback(confirmCallbackService);rabbitTemplate.setReturnCallback(returnCallbackService);return rabbitTemplate;}
}yml配置 代码中的调用 RabbitListener(queues TtlQueueConfig.DEAD_LETTER_QUEUE_TELEPHONE_BILL)public void receiveD(Message message, Channel channel) {try {try {String msg new String(message.getBody());// 模拟异常测试重试int a 1 / 0;//apiService.doApiHeartChainTelephoneBillOrder(msg);// 手动确认消息channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);log.info(当前时间{},收到话费死信队列信息{}, new Date(), msg);}catch (Exception e){//参数1消费消息的index//参数2是否批量否定多个消息设为false就与basicReject功能一样triue的前提也是在同一个channel且在该消息否定前存在未确认的消息//参数3 对异常消息的处理true表示重排序false表示丢弃// 如果拒绝消息要求mq重发的话一直异常会进入死循环//channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);log.error(TtlQueueConfig.DEAD_LETTER_QUEUE_TELEPHONE_BILL 消息反馈失败param:{}, message.getBody());throw e;}} catch (Exception e) {log.error(监听RabbitMq、队列 TtlQueueConfig.DEAD_LETTER_QUEUE_TELEPHONE_BILL 发生异常 e.getMessage());throw new CustomException(监听RabbitMq、队列 TtlQueueConfig.DEAD_LETTER_QUEUE_TELEPHONE_BILL 发生异常 e.getMessage());}}