英语培训网站建设需求分析报告,能免费建手机网站吗,建设工程公司岗位职责,织梦网站首页内容十五、死信队列
1、死信的概念
先从概念解释上搞清楚这个定义#xff0c;死信#xff0c;顾名思义就是无法被消费的消息#xff0c;字面意思可以这样理解#xff0c;一般来说#xff0c;producer 将消息投递到 broker 或者直接到 queue 里了#xff0c;consumer 从 que…十五、死信队列
1、死信的概念
先从概念解释上搞清楚这个定义死信顾名思义就是无法被消费的消息字面意思可以这样理解一般来说producer 将消息投递到 broker 或者直接到 queue 里了consumer 从 queue 取出消息进行消费但某些时候由于特定的原因导致 queue 中的某些消息无法被消费这样的消息如果没有后续的处理就变成了死信有死信自然就有了死信队列。
应用场景为了保证订单业务的消息数据不丢失需要使用到 RabbitMQ 的死信队列机制当消息消费发生异常时将消息投入死信队列中。还有比如说用户在商城下单成功并点击去支付后在指定时间未支付时自动失效。
2、死信的来源 消息 TTL 过期TTL 是 Time To Live 的缩写也就是生存时间。 队列达到最大长度队列满了无法再添加数据到 mq 中。 消息被拒绝(basic.reject 或 basic.nack) 并且 requeuefalse。
3、死信实战 死信之 TTL
1消费者1
①定义普通交换机、死信交换机与正常队列、死信队列
②声明死信交换机、普通交换机与普通队列
③设置正常队列和死信交换机的联系使用argument参数
④绑定普通交换机和正常队列、死信交换机和死信队列
⑤正常接收消息 public class Consumer01 {//普通交换机名称private static final String NORMAL_EXCHANGE normal_exchange;//死信交换机名称private static final String DEAD_EXCHANGE dead_exchange;public static void main(String[] args) throws Exception {Channel channel RabbitMqUtils.getChannel();//声明死信和普通交换机 类型为 directchannel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);//声明死信队列String deadQueue dead-queue;channel.queueDeclare(deadQueue, false, false, false, null);//死信队列绑定队列、交换机、路由键routingKeychannel.queueBind(deadQueue, DEAD_EXCHANGE, lisi);//正常队列绑定死信队列信息MapString, Object params new HashMap();//正常队列设置死信交换机 参数 key 是固定值params.put(x-dead-letter-exchange, DEAD_EXCHANGE);//正常队列设置死信 routing-key 参数 key 是固定值params.put(x-dead-letter-routing-key, lisi);//正常队列String normalQueue normal-queue;channel.queueDeclare(normalQueue, false, false, false, params);channel.queueBind(normalQueue, NORMAL_EXCHANGE, zhangsan);System.out.println(等待接收消息........... );DeliverCallback deliverCallback (consumerTag, delivery) - {String message new String(delivery.getBody(), UTF-8);System.out.println(Consumer01 接收到消息 message);};channel.basicConsume(normalQueue, true, deliverCallback, consumerTag - {});}}2生产者
①定义普通交换机的名称
②设置消息的TTL死信消息的设置
③正常发送消息 public class Producer {private static final String NORMAL_EXCHANGE normal_exchange;public static void main(String[] argv) throws Exception {Channel channel RabbitMqUtils.getChannel();channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);//设置消息的 TTL 时间 10sAMQP.BasicProperties properties new AMQP.BasicProperties().builder().expiration(10000).build();//该信息是用作演示队列个数限制for (int i 1; i 11; i) {String message info i;channel.basicPublish(NORMAL_EXCHANGE, zhangsan, properties, message.getBytes());System.out.println(生产者发送消息: message);}}
}3测试效果1
先启动消费者1再启动生产者 注意正常交换机的Features里面多了 DLX、DLK 4正常队列和死信队列中消息数的变化对比 当正常队列的消息TTL到了之后还没有被接受正常队列的消息会被转发到死信交换机再到死信队列里面这个过程很快的 5消费者2
接收死信队列的消息即可 public class Consumer02 {//死信交换机名称private static final String DEAD_EXCHANGE dead_exchange;public static void main(String[] args) throws Exception {Channel channel RabbitMqUtils.getChannel();//声明交换机channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);//声明队列String deadQueue dead-queue;channel.queueDeclare(deadQueue, false, false, false, null);channel.queueBind(deadQueue, DEAD_EXCHANGE, lisi);System.out.println(等待接收死信消息........... );DeliverCallback deliverCallback (consumerTag, delivery) - {String message new String(delivery.getBody(), UTF-8);System.out.println(Consumer02 接收到消息 message);};channel.basicConsume(deadQueue, true, deliverCallback, consumerTag - {});}
}6测试效果2
启动消费者2 死信之最大长度
①生产者和消费者删除掉TTL的代码
②在消费者1中设置正常队列的长度限制 //设置正常队列的长度限制例如发10个4个则为死信
params.put(x-max-length,6);注先把原来生成的队列删除因为条件参数发生变化了已不再是同一个了否则会报错
测试效果 死信之消息被拒
先注释掉最大长度的代码然后将消息都接收完才可以继续发消息还有删掉之前创建的最长时间TTL和最大长度的队列以免对测试造成影响。 public class Consumer01 {//普通交换机名称private static final String NORMAL_EXCHANGE normal_exchange;//死信交换机名称private static final String DEAD_EXCHANGE dead_exchange;public static void main(String[] args) throws Exception {Channel channel RabbitMqUtils.getChannel();//声明死信和普通交换机 类型为 directchannel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);//声明死信队列String deadQueue dead-queue;channel.queueDeclare(deadQueue, false, false, false, null);//死信队列绑定队列、交换机、路由键routingKeychannel.queueBind(deadQueue, DEAD_EXCHANGE, lisi);//正常队列绑定死信队列信息MapString, Object params new HashMap();//正常队列设置死信交换机 参数 key 是固定值params.put(x-dead-letter-exchange, DEAD_EXCHANGE);//正常队列设置死信 routing-key 参数 key 是固定值params.put(x-dead-letter-routing-key, lisi);
// //设置正常队列的长度限制例如发10个4个则为死信
// params.put(x-max-length,6);//正常队列String normalQueue normal-queue;channel.queueDeclare(normalQueue, false, false, false, params);channel.queueBind(normalQueue, NORMAL_EXCHANGE, zhangsan);System.out.println(等待接收消息........... );DeliverCallback deliverCallback (consumerTag, delivery) - {String message new String(delivery.getBody(), UTF-8);if (message.equals(info5)) {System.out.println(Consumer01 接收到消息 message 并拒绝签收该消息);//requeue 设置为 false 代表拒绝重新入队 该队列如果配置了死信交换机将发送到死信队列中channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);} else {System.out.println(Consumer01 接收到消息 message);channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}};//开启手动应答channel.basicConsume(normalQueue, false, deliverCallback, consumerTag - {});}}测试效果 消息队列-RabbitMQ死信队列 到此完结笔者归纳、创作不易大佬们给个3连再起飞吧