国外设计大师网站,wordpress远程发布XML,酒店网站设计公司,大型门户网站建设定做文章目录 一、死信交换机二、TTL1. Queue指定死信交换机并设置TTL2. 消息设置TTL 三、延迟队列1. SpringAMQP创建延迟队列2. 设置消息延迟3. 测试 一、死信交换机
当一个队列中的消息满足下列情况之一时#xff0c;可以成为死信#xff08;dead letter#xff09;#xff… 文章目录 一、死信交换机二、TTL1. Queue指定死信交换机并设置TTL2. 消息设置TTL 三、延迟队列1. SpringAMQP创建延迟队列2. 设置消息延迟3. 测试 一、死信交换机
当一个队列中的消息满足下列情况之一时可以成为死信dead letter
消费者使用basic.reject或 basic.nack声明消费失败并且消息的requeue参数设置为false消息是一个过期消息超时无人消费要投递的队列消息堆积满了最早的消息可能成为死信
如果该队列配置了dead-letter-exchange属性指定了一个交换机那么队列中的死信就会投递到这个交换机中而 这个交换机称为死信交换机(Dead Letter Exchange,简称DLX)。
二、TTL 如果message和queue都有ttl采用更小的一方。 1. Queue指定死信交换机并设置TTL
Configuration
public class CommonConfig {Beanpublic DirectExchange ttlExchange(){return new DirectExchange(ttl.direct);}Beanpublic Queue ttlQueue(){return QueueBuilder.durable(ttl.queue).ttl(10000).deadLetterExchange(dl.direct).deadLetterRoutingKey(dl).build();}Beanpublic Binding ttlBinding(){return BindingBuilder.bind(ttlQueue()).to(ttlExchange()).with(ttl);}
}2. 消息设置TTL
Test
public void testTTLMessage(){Message message MessageBuilder.withBody(hello ttl.getBytes(StandardCharsets.UTF_8)).setDeliveryMode(MessageDeliveryMode.PERSISTENT).setExpiration(5000).build();rabbitTemplate.convertAndSend(ttl.direct, ttl, message);log.info(ttl消息已发送);
}借助TTL机制可以用死信交换机模拟延迟队列但是设计上比较牵强性能不好。
三、延迟队列
这是官方提供的一些额外插件 https://www.rabbitmq.com/community-plugins.html
下载其中的DelayExchange插件把.ez文件挂载到RabbitMQ容器的/plugins目录下然后进入容器执行
rabbitmq-plugins enable rabbitmq_delayed_message_exchangeroot7c4ba266e5bc:/# rabbitmq-plugins enable rabbitmq_delayed_message_exchange
Enabling plugins on node rabbit7c4ba266e5bc:
rabbitmq_delayed_message_exchange
The following plugins have been configured:rabbitmq_delayed_message_exchangerabbitmq_managementrabbitmq_management_agentrabbitmq_prometheusrabbitmq_web_dispatch
Applying plugin configuration to rabbit7c4ba266e5bc...
The following plugins have been enabled:rabbitmq_delayed_message_exchangestarted 1 plugins.
1. SpringAMQP创建延迟队列
基于RabbitListener或者基于Bean都可以。 RabbitListener(bindings QueueBinding(value Queue(name delay.queue),exchange Exchange(name delay.direct, delayed true),key delay))public void listenDelayExchange(String msg){log.info(消费者接收到delay.queue的延迟消息【 msg 】);}2. 设置消息延迟
这个插件只能在消息上设置延迟时间没有队列设置延迟时间的概念不过都是一样的。 message要在Header上添加一个x-delay。 Testpublic void testDelayMessage(){Message message MessageBuilder.withBody(hello delay.getBytes(StandardCharsets.UTF_8)).setDeliveryMode(MessageDeliveryMode.PERSISTENT).setHeader(x-delay, 5000).build();// confirm callbackCorrelationData correlationData new CorrelationData(UUID.randomUUID().toString());rabbitTemplate.convertAndSend(delay.direct, delay, message, correlationData);log.info(发送消息成功);}3. 测试
直接运行测试可能会报错因为rabbitmq意识到消息到了exchange却没有立即到queue被认为错误回调returnback所以我们在ReturnCallBack中绕过这个限制。
Slf4j
Configuration
public class CommonConfig implements ApplicationContextAware {Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {RabbitTemplate rabbitTemplate applicationContext.getBean(RabbitTemplate.class);rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey)-{//check if is delay messageif (message.getMessageProperties().getReceivedDelay() ! null message.getMessageProperties().getReceivedDelay() 0) {return;}log.error(消息发送到queue失败replyCode{}, reason{}, exchange{}, routeKey{}, message{},replyCode, replyText, exchange, routingKey, message.toString());});}
}运行Test测试可以看到Test方面消息发送的时间为21:09:13
21:09:13:516 INFO 25468 --- [ main] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#2063c53e:0/SimpleConnection6415f61e [delegateamqp://rabbitmq127.0.0.1:5672/, localPort 62470]
21:09:13:557 INFO 25468 --- [ main] cn.itcast.mq.spring.SpringAmqpTest : 发送消息成功listener方面消息消费的时间为21:09:18刚好5s。
21:08:31:952 INFO 19532 --- [ main] cn.itcast.mq.ConsumerApplication : Started ConsumerApplication in 1.735 seconds (JVM running for 2.357)
21:09:18:583 INFO 19532 --- [ntContainer#0-1] c.i.mq.listener.SpringRabbitListener : 消费者接收到delay.queue的延迟消息【hello delay】