网站建设说,python如何调用wordpress,微动网站建设,网站备案接入ipLison dreamlison163.com, v1.0.0, 2023.06.23
RabbitMQ-进阶 死信队列、延迟队列、防丢失机制 文章目录 RabbitMQ-进阶 死信队列、延迟队列、防丢失机制死信队列延迟队列延迟队列介绍**延迟队列_死信队列_的实现**延迟队列_插件实现下载插件RabbitMQ 配置类RabbitMQ …Lison dreamlison163.com, v1.0.0, 2023.06.23
RabbitMQ-进阶 死信队列、延迟队列、防丢失机制 文章目录 RabbitMQ-进阶 死信队列、延迟队列、防丢失机制死信队列延迟队列延迟队列介绍**延迟队列_死信队列_的实现**延迟队列_插件实现下载插件RabbitMQ 配置类RabbitMQ 生产者RabbitMQ 消费者测试 RabbitMQ防止消息丢失消息丢失场景生产者发送消息没有发送到rabbit交换机交换机没有发送到队列交换机、队列、消息没有设置持久化消费者接收到消息没有执行业务逻辑导致消息丢失 死信队列
概念 在MQ中当消息成为死信Dead message后消息中间件可以 将其从当前队列发送到另一个队列中这个队列就是死信队列。而 在RabbitMQ中由于有交换机的概念实际是将死信发送给了死 信交换机Dead Letter Exchange简称DLX。死信交换机和死信队列和普通的没有区别。 消息成为死信的情况
队列消息长度到达限制消费者拒签消息并且不把消息重新放入原队列消息到达存活时间未被消费
代码实现
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.ExchangeBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;Configuration
public class RabbitConfig2 {private final String DEAD_EXCHANGE dead_exchange;private final String DEAD_QUEUE dead_queue;private final String NORMAL_EXCHANGE normal_exchange;private final String NORMAL_QUEUE normal_queue;// 死信交换机Bean(DEAD_EXCHANGE)public Exchange deadExchange(){return ExchangeBuilder.topicExchange(DEAD_EXCHANGE).durable(true).build();}// 死信队列Bean(DEAD_QUEUE)public Queue deadQueue(){return QueueBuilder.durable(DEAD_QUEUE).build();}// 死信交换机绑定死信队列Beanpublic Binding bindDeadQueue(Qualifier(DEAD_EXCHANGE) Exchange exchange, Qualifier(DEAD_QUEUE)Queue queue){return BindingBuilder.bind(queue).to(exchange).with(dead_routing).noargs();}// 普通交换机Bean(NORMAL_EXCHANGE)public Exchange normalExchange(){return ExchangeBuilder.topicExchange(NORMAL_EXCHANGE).durable(true).build();}// 普通队列Bean(NORMAL_QUEUE)public Queue normalQueue(){return QueueBuilder.durable(NORMAL_QUEUE).deadLetterExchange(DEAD_EXCHANGE) // 绑定死信交换机.deadLetterRoutingKey(dead_routing) // 死信队列路由关键字.ttl(10000) // 消息存活10s.maxLength(10) // 队列最大长度为10.build();}// 普通交换机绑定普通队列Beanpublic Binding bindNormalQueue(Qualifier(NORMAL_EXCHANGE) Exchange exchange,Qualifier(NORMAL_QUEUE)Queue queue){return BindingBuilder.bind(queue).to(exchange).with(my_routing).noargs();}
}测试
1、生产者发送消息
Test
public void testDlx(){// 存活时间过期后变成死信// rabbitTemplate.convertAndSend(normal_exchange,my_routing,测试死信);// 超过队列长度后变成死信// for (int i 0; i 20; i) {// rabbitTemplate.convertAndSend(normal_exchange,my_routing,测试死信);// }// 消息拒签但不返回原队列后变成死信rabbitTemplate.convertAndSend(normal_exchange,my_routing,测试死信);
}
2、
Component
public class DlxConsumer {RabbitListener(queues normal_queue)public void listenMessage(Message message, Channel channel) throws IOException {// 拒签消息channel.basicNack(message.getMessageProperties().getDeliveryTag(),true,false);}
}延迟队列
延迟队列介绍
什么是延时队列
延时队列即就是放置在该队列里面的消息是不需要立即消费的而是等待一段时间之后取出消费 但RabbitMQ中并未提供延迟队列功能我们可以使用死信队列实现延迟队列的效果 延迟交换机主要帮我们解决什么问题 1当我们的业务比较复杂的时候 需要针对不同的业务消息类型设置不同的过期时间策略 name必然我们也需要为不同的队列消息的过期时间创建很多的Queue的Bean对象 当业务复杂到一定程度时 这种方式维护成本过高 2就是队列的先进先出原则导致的问题当先进入队列的消息的过期时间比后进入消息中的过期时间长的时候消息是串行被消费的所以必然是等到先进入队列的消息的过期时间结束 后进入队列的消息的过期时间才会被监听然而实际上这个消息早就过期了这就导致了本来过期时间为3秒的消息实际上过了13秒才会被处理这在实际应用场景中肯定是不被允许的 适用场景 1商城订单超时未支付取消订单 2使用权限到期前十分钟提醒用户 3收益项目投入后一段时间后产生收益 延迟队列_死信队列_的实现
1、创建SpringBoot订单模块添加SpringMVC、RabbitMQ、 lombok依赖
dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId
/dependency
dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId
/dependency
dependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactId
/dependency2、编写配置文件
spring:rabbitmq:host: 127.0.0.1port: 5672username: adminpassword: 123456virtual-host: /# 日志格式
logging:pattern:console: %d{HH:mm:ss.SSS} %clr(%-5level) --- [%-15thread] %cyan(%-50logger{50}):%msg%n3、创建队列和交换机
Configuration
public class RabbitConfig {// 订单交换机和队列private final String ORDER_EXCHANGE order_exchange;private final String ORDER_QUEUE order_queue;// 过期订单交换机和队列private final String EXPIRE_EXCHANGE expire_exchange;private final String EXPIRE_QUEUE expire_queue;// 过期订单交换机Bean(EXPIRE_EXCHANGE)public Exchange deadExchange(){return ExchangeBuilder.topicExchange(EXPIRE_EXCHANGE).durable(true).build();}// 过期订单队列Bean(EXPIRE_QUEUE)public Queue deadQueue(){return QueueBuilder.durable(EXPIRE_QUEUE).build();}// 将过期订单队列绑定到交换机Beanpublic Binding bindDeadQueue(Qualifier(EXPIRE_EXCHANGE) Exchange exchange,Qualifier(EXPIRE_QUEUE) Queue queue){return BindingBuilder.bind(queue).to(exchange).with(expire_routing).noargs();}// 订单交换机Bean(ORDER_EXCHANGE)public Exchange normalExchange(){return ExchangeBuilder.topicExchange(ORDER_EXCHANGE).durable(true).build();}// 订单队列Bean(ORDER_QUEUE)public Queue normalQueue(){return QueueBuilder.durable(ORDER_QUEUE).ttl(10000) // 存活时间为10s,模拟30min.deadLetterExchange(EXPIRE_EXCHANGE) // 绑定死信交换机.deadLetterRoutingKey(expire_routing) //死信交换机的路由关键字.build();}// 将订单队列绑定到交换机Beanpublic Binding bindNormalQueue(Qualifier(ORDER_EXCHANGE) Exchange exchange,Qualifier(ORDER_QUEUE) Queue queue){return BindingBuilder.bind(queue).to(exchange).with(order_routing).noargs();}
}4、编写下单的控制器方法下单后向订单交换机发送消息
Testpublic String placeOrder(String orderId){System.out.println(处理订单数据...);// 将订单id发送到订单队列rabbitTemplate.convertAndSend(order_exchange, order_routing, orderId);return 下单成功修改库存;}
5、编写监听死信队列的消费者
// 过期订单消费者
Component
public class ExpireOrderConsumer {// 监听队列RabbitListener(queues expire_queue)public void listenMessage(String orderId){System.out.println(查询orderId号订单的状态如果已支付则无需处理如果未支付则需要回退库存);}
}延迟队列_插件实现
在使用死信队列实现延迟队列时会遇到一个问题RabbitMQ只会移除队列顶端的过期消息如果第一个消息的存活时长较长而第二个消息的存活时长较短则第二个消息并不会及时执行。 RabbitMQ虽然本身不能使用延迟队列但官方提供了延迟队列插件安装后可直接使用延迟队列
下载插件
RabbitMQ 实现了一个插件 x-delay-message 来实现延时队列我们可以从 官网下载到它
https://www.rabbitmq.com/community-plugins.htmlhttps://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases选择 .ez 格式的文件下载下载后放置 RabbitMQ 的安装目录下的 plugins 目录下如我的路径为
docker cp rabbitmq_delayed_message_exchange-3.8.17.8f537ac.ez rabbitmq1:/pluginsdocker exec rabbitmq1 rabbitmq-plugins enable rabbitmq_delayed_message_exchangeRabbitMQ 配置类
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;Configuration
Slf4j
public class RabbitConfig3 {/*** 交换机*/public static final String DELAY_EXCHANGE delay_exchange;/*** 队列*/public static final String DELAY_QUEUE delay_queue;/*** 路由*/public static final String DELAY_KEY delay_key;Beanpublic RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {connectionFactory.setPublisherConfirms(true);connectionFactory.setPublisherReturns(true);RabbitTemplate rabbitTemplate new RabbitTemplate(connectionFactory);rabbitTemplate.setMandatory(true);rabbitTemplate.setConfirmCallback((correlationData, ack, cause) - log.info(消息发送成功:correlationData({}),ack({}),cause({}), correlationData, ack, cause));rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) - log.info(消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}, exchange, routingKey, replyCode, replyText, message));return rabbitTemplate;}/*** 直接模式队列1*/Beanpublic Queue directOneQueue() {return new Queue(cundream);}/*** 延时队列交换机** return*/Beanpublic CustomExchange delayExchange() {MapString, Object args new HashMap();args.put(x-delayed-type, direct);return new CustomExchange(DELAY_EXCHANGE, x-delayed-message, true, false, args);}/*** 延时队列** return*/Beanpublic Queue delayQueue() {return new Queue(DELAY_QUEUE, true);}/*** 给延时队列绑定交换机** return*/Beanpublic Binding delayBinding(Queue delayQueue, CustomExchange delayExchange) {return BindingBuilder.bind(delayQueue).to(delayExchange).with(DELAY_KEY).noargs();}
}
RabbitMQ 生产者 import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;Service
Slf4j
public class RabbitMqServiceImpl implements RabbitMqService {Autowiredprivate RabbitTemplate rabbitTemplate;public void sendDelayMessage(Object object, long millisecond) {this.rabbitTemplate.convertAndSend(delay_exchange,delay_key,object.toString(),message - {message.getMessageProperties().setHeader(x-delay, millisecond);return message;});}
}
RabbitMQ 消费者
import cn.hutool.json.JSONUtil;
import com.github.cundream.springbootbuilding.common.rabbitmq.RabbitConst;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.io.IOException;/*** className: com.github.cundream.springbootbuilding.common.rabbitmq.consumer- ReceiveDealyConsumer* description:* author: 李村 * createDate:*/
Slf4j
RabbitListener(queuesToDeclare Queue(RabbitConst.DELAY_QUEUE))
Component
public class ReceiveDealyHandler {RabbitHandlerpublic void directHandlerManualAck(Object object, Message message, Channel channel) {// 如果手动ACK,消息会被监听消费,但是消息在队列中依旧存在,如果 未配置 acknowledge-mode 默认是会在消费完毕后自动ACK掉final long deliveryTag message.getMessageProperties().getDeliveryTag();try {log.info(直接队列1手动ACK接收消息{}, object.toString());// 通知 MQ 消息已被成功消费,可以ACK了channel.basicAck(deliveryTag, false);} catch (IOException e) {try {// 处理失败,重新压入MQchannel.basicRecover();} catch (IOException e1) {e1.printStackTrace();}}}
}
测试
通过测试第一条消息在 5s后接收到第二条消息在 10s后接收到说明我们的延时队列已经成功 RequestMapping(value /delayMessage,method RequestMethod.GET)public void delayMessage() {String message1 这是第一条消息;String message2 这是第二条消息;rabbitMqService.sendDelayMessage(message1, 5000);rabbitMqService.sendDelayMessage(message2, 10000);}
RabbitMQ防止消息丢失
消息丢失场景
MQ消息丢失场景主要有三个
消息生产者发送消息后rabbitMq服务器没有收到导致消息丢失rabbitmq收到消息后没有持久化保存导致消息丢失消费者收到消息后没来得及处理消费者宕机导致消息丢失
生产者发送消息没有发送到rabbit交换机
解决方案消息异步确认机制confirm机制
spring:rabbitmq:host: 127.0.0.1port: 5672username: adminpassword: 123456virtual-host: /publisher-confirms: true # 消息异步确认机制confirm机制开启confirm机制后在生产者每次发送消息都会调用回调代码开发人员需要写回调函数的逻辑处理发送失败的消息
Component
Slf4j
public class RabbitMQConfirmAndReturn implements RabbitTemplate.ConfirmCallback {Autowiredprivate RabbitTemplate rabbitTemplate;PostConstructpublic void init() {rabbitTemplate.setConfirmCallback(this);}/*** confirm机制只保证消息到达exchange不保证消息可以路由到正确的queue* param correlationData 发送的消息的信息交换机路由消息体等* param ack true成功false失败* param cause 发生错误的信息*/Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {// 失败一般解决方案是将发送失败消息存入定时任务队列尝试重新发送消息再多次失败// 就不再发送转为人工处理if (!ack) {log.error(rabbitmq confirm fail,cause:{}, cause);// ...... 失败处理逻辑}}
}
交换机没有发送到队列
解决方案Return模式确保消息从交换机发送到队列。
1、开启return模式
#开启 return 机制
spring:rabbitmq:publisher-returns: true
2、开发回调函数
Component
public class Sender implements RabbitTemplate.ReturnCallback {Autowiredprivate RabbitTemplate rabbitTemplate;PostConstructpublic void init() {rabbitTemplate.setReturnCallback(this);}//通过实现ReturnCallback接口如果消息从交换器发送到对应队列失败时触发比如根据发送消息时指定的routingKey找不到队列时会触发Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {System.out.println(消息主体message: message);System.out.println(消息replyCode: replyCode);System.out.println(描述: replyText);System.out.println(消息使用的交换器exchange: exchange);System.out.println(消息使用的路由键routing: routingKey);}
}
交换机、队列、消息没有设置持久化
交换机、队列、消息没有持久化当rabbitmq的服务重启之后这些信息就会丢失。
交换机持久化 在声明交换机的时候设置持久化属性 /*** 构造参数说明* 参数1交换机名称* 参数2durabletrue表示持久化false表示不持久化* 参数3autoDeletetrue自动删除false不自动删除*/Beanpublic TopicExchange exchange() {return new TopicExchange(exchangeName, true, false);}
队列持久化 在声明队列的时候设置持久化属性 public Queue queue() {/*** param queueName 队列名称* param durable 队列持久化true持久化false不持久化* param exclusive 是否排他 true不排他false排他此处配置一般false* param autoDelete 是否自动删除无生产者队列自动删除* param args 队列参数*/return new Queue(queueName, true, false, false, args);}
消息持久化
消息的持久化是默认持久的。无需配置
消费者接收到消息没有执行业务逻辑导致消息丢失
解决方案手动确认消息机制 配置文件配置
**spring.rabbitmq.listener.simple.acknowledge-modemanual**spring:rabbitmq:host: 127.0.0.1#host: 10.106.10.91port: 5672username: adminpassword: 123456virtual-host: pubpublisher-confirms: true # 开启发送确认publisher-returns: true # 开启发送失败回退#开启acklistener:direct:acknowledge-mode: manualsimple:acknowledge-mode: manual #采取手动应答#concurrency: 1 # 指定最小的消费者数量#max-concurrency: 1 #指定最大的消费者数量retry:enabled: true # 是否支持重试
Component
public class Consumer {RabbitHandlerpublic void consumeMsg(String msg, Channel channel, Message message) throws IOException {//拿到消息延迟消费try {// .... 消费消息业务逻辑/*** deliveryTag 消息的随机标签信息* multiple 是否批量true表示一次性的将小于deliveryTag的值进行ack*/channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (InterruptedException e) {e.printStackTrace();/*** deliveryTag 消息的随机标签信息* multiple 是否批量true表示一次性的将小于deliveryTag的值进行ack* requeue 被拒绝的消息是否重新入队列*/channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);}}
}
当业务出现意料之外的一场消息就会重新回到队列中会分发到其他正常consumer中进行消费