怀远网站建设,重庆网站建设及推广公司,建设社团网站的可行性分析,网站建设的实训报告的实训感受背景
对于核心业务需要保证消息必须正常消费#xff0c;就必须考虑消费失败的场景#xff0c;rabbitmq提供了以下三种消费失败处理机制
直接reject#xff0c;丢弃消息#xff08;默认#xff09;返回nack#xff0c;消息重新入队列将失败消息投递到指定的交换机 对于核…背景
对于核心业务需要保证消息必须正常消费就必须考虑消费失败的场景rabbitmq提供了以下三种消费失败处理机制
直接reject丢弃消息默认返回nack消息重新入队列将失败消息投递到指定的交换机 对于核心业务第一种方法显然不可接受第二种方法如果代码有异常导致消费一直失败就会出现不断失败重新入队列的死循环问题较好的方案是3待消费失败问题修复后将消息从死信队列取出发回原队列重新消费。
实现
rabbit版本
dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactIdversion2.6.3/version
/dependency配置死信交换机路由队列 配置延迟消息业务队列消费失败投递到死信队列
Bean(orderCloseQueue)public Queue orderCloseQueue() {return QueueBuilder.durable(OrderRabbitConstants.ORDER_CLOSE_QUEUE).deadLetterExchange(RabbitMqConstants.DEAD_LETTER_EXCHANGE).deadLetterRoutingKey(RabbitMqConstants.DEAD_LETTER_ROUTING_KEY).build();}配置手动返回ACK
Bean(name {manualContainerFactory})
public SimpleRabbitListenerContainerFactory manualContainerFactory(Qualifier(connectionFactory) ConnectionFactory connectionFactory) {SimpleRabbitListenerContainerFactory factory new SimpleRabbitListenerContainerFactory();this.manualFactoryConfigurer.configure(factory, connectionFactory);factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);factory.setDefaultRequeueRejected(this.enableRequeueRejected);if (this.enableConsumers) {factory.setConcurrentConsumers(this.concurrentConsumers);factory.setMaxConcurrentConsumers(this.maxConcurrentConsumers);factory.setPrefetchCount(this.prefetchCount);}return factory;
}业务队列消息消费模拟失败
RabbitListener(queues OrderRabbitConstants.ORDER_CLOSE_QUEUE, containerFactory manualContainerFactory)public void consumerCloseOrder(Message message, Channel channel) throws IOException {String orderCode new String(message.getBody(), CharsetUtil.UTF_8);String messageId message.getMessageProperties().getMessageId();log.info(收到MQ messageId[{}],订单号[{}], messageId, orderCode);if (true) {channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);return;}}效果 可以看到死信队列dead.letter.queue已经正常收到死信消息编写逻辑将死信消费推回原队列
for (int i 0; i 10_000; i) {Message message rabbitTemplate.receive(RabbitMqConstants.DEAD_LETTER_QUEUE);if (message null) {return String.format(完成%d条, i);}log.info(拉取死信消息:[{}], message);try {MapString, Object headers message.getMessageProperties().getHeaders();MapString, Object deathMap ((ListMapString, Object) headers.get(x-death)).get(0);String exchange deathMap.get(exchange).toString();String routingKey ((List) deathMap.get(routing-keys)).get(0).toString();rabbitTemplate.send(exchange, routingKey, message);} catch (Exception ex) {log.error(消费死信消息失败, ex);rabbitTemplate.send(RabbitMqConstants.DEAD_LETTER_EXCHANGE, RabbitMqConstants.DEAD_LETTER_ROUTING_KEY, message);return 重入队列异常;}}重推回业务队列效果