免费建自己的网站赚钱,华亭县建设局网站2017,外包公司做网站,18款禁用网站app全部RabbitMQ详解 死信队列死信来源消息TTL过期队列达到最大长度消息被拒绝 RabbitMQ延迟队列TTL的两种设置队列设置TTL消息设置TTL 整合SrpingBoot队列TTL延时队列TTL优化Rabbtimq插件实现延迟队列 死信队列
先从概念解释上搞清楚这个定义#xff0c;死信#xff0c;顾名思义就… RabbitMQ详解 死信队列死信来源消息TTL过期队列达到最大长度消息被拒绝 RabbitMQ延迟队列TTL的两种设置队列设置TTL消息设置TTL 整合SrpingBoot队列TTL延时队列TTL优化Rabbtimq插件实现延迟队列 死信队列
先从概念解释上搞清楚这个定义死信顾名思义就是无法被消费的消息字面意思可以这样理解一般来说producer 将消息投递到 broker 或者直接到queue 里了consumer 从 queue 取出消息 进行消费但某些时候由于特定的原因导致 queue 中的某些消息无法被消费这样的消息如果没有后续的处理就变成了死信有死信自然就有了死信队列。 应用场景为了保证订单业务的消息数据不丢失需要使用到 RabbitMQ 的死信队列机制当消息消费发生异常时将消息投入死信队列中。还有比如说用户在商城下单成功并点击去支付后在指定时间未支付时自动失效。 死信来源 消息 TTL 过期:TTL是 Time To Live 的缩写, 也就是生存时间 队列达到最大长度:队列满了无法再添加数据到 MQ 中 消息被拒绝:(basic.reject 或 basic.nack) 并且 requeue false
举例 交换机类型是 direct两个消费者一个生产者两个队列消息队列和死信队列
消息TTL过期
生产者 package eight;import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import utils.RabbitMQUtils;import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;public class Producer {//普通交换机的名称public static final String NORMAL_EXCHANGE normal_exchange;public static void main(String[] args) throws IOException, TimeoutException {Channel channel RabbitMQUtils.getChannel();//死信消息//死信消息 设置ttl时间 live to time 单位是msAMQP.BasicProperties properties new AMQP.BasicProperties().builder().expiration(10000).build();for (int i 1; i 11; i) {String message info i;System.out.println(发送消息: message);channel.basicPublish(NORMAL_EXCHANGE,zhangsan,properties,message.getBytes(StandardCharsets.UTF_8));}}
}
消费者
package eight;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;
import utils.RabbitMQUtils;import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeoutException;public class Consumer01 {//普通交换机的名称public static final String NORMAL_EXCHANGE normal_exchange;//死信交换机的名称public static final String DEAD_EXCHANGE dead_exchange;//普通队列的名称public static final String NORMAL_QUEUE normal_queue;//死信队列的名称public static final String DEAD_QUEUE dead_queue;public static void main(String[] args) throws IOException, TimeoutException {Channel channel RabbitMQUtils.getChannel();//声明死信和普通交换机channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);//声明普通队列//参数mapMapString, Object arguments new HashMap();//普通队列设置死信队列arguments.put(x-dead-letter-exchange,DEAD_EXCHANGE);//设置死信routingKeyarguments.put(x-dead-letter-routing-key,lisi);channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments);//声明死信队列channel.queueDeclare(DEAD_QUEUE,false,false,false,null);//绑定普通交换机和队列channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,zhangsan);//绑定死信交换机和死信队列channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,lisi);System.out.println(等待接收消息...);//接收消息DeliverCallback deliverCallback (consumerTag, message)-{System.out.println(Consumer01接受的消息是: new String(message.getBody()));};channel.basicConsume(NORMAL_QUEUE,true,deliverCallback,consumerTag - {});}
}
package eight;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import utils.RabbitMQUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer02 {//死信队列的名称public static final String DEAD_QUEUE dead_queue;public static void main(String[] args) throws IOException, TimeoutException {Channel channel RabbitMQUtils.getChannel();System.out.println(等待接收死信消息...);DeliverCallback deliverCallback (consumerTag, message) -{System.out.println(Consumer02接受的消息是new String(message.getBody(),UTF-8));};channel.basicConsume(DEAD_QUEUE,true,deliverCallback,consumerTag - {});}
}
在启动消费者01后 将消费者01任务结束模拟消费者 模拟其接收不到消息 开启生产者和死信队列02 此时发现死信队列收到了消息 队列达到最大长度
消息生产者代码去掉 TTL 属性basicPublish 的第三个参数改为 null
package night;import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import utils.RabbitMQUtils;import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;public class Producer {//普通交换机的名称public static final String NORMAL_EXCHANGE normal_exchange;public static void main(String[] args) throws IOException, TimeoutException {Channel channel RabbitMQUtils.getChannel();for (int i 1; i 11; i) {String message info i;System.out.println(发送消息: message);channel.basicPublish(NORMAL_EXCHANGE,zhangsan,null,message.getBytes(StandardCharsets.UTF_8));}}
}
消费者参数添加
arguments.put(x-max-length,6);package night;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import utils.RabbitMQUtils;import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;public class Consumer01 {//普通交换机的名称public static final String NORMAL_EXCHANGE normal_exchange;//死信交换机的名称public static final String DEAD_EXCHANGE dead_exchange;//普通队列的名称public static final String NORMAL_QUEUE normal_queue;//死信队列的名称public static final String DEAD_QUEUE dead_queue;public static void main(String[] args) throws IOException, TimeoutException {Channel channel RabbitMQUtils.getChannel();//声明死信和普通交换机channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);//声明普通队列//参数mapMapString, Object arguments new HashMap();//普通队列设置死信队列arguments.put(x-dead-letter-exchange,DEAD_EXCHANGE);//设置死信routingKeyarguments.put(x-dead-letter-routing-key,lisi);arguments.put(x-max-length,6);channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments);//声明死信队列channel.queueDeclare(DEAD_QUEUE,false,false,false,null);//绑定普通交换机和队列channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,zhangsan);//绑定死信交换机和死信队列channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,lisi);System.out.println(等待接收消息...);//接收消息DeliverCallback deliverCallback (consumerTag, message)-{System.out.println(Consumer01接受的消息是: new String(message.getBody()));};channel.basicConsume(NORMAL_QUEUE,true,deliverCallback,consumerTag - {});}
} 注意 因为参数改变了所以需要把原先队列删除 开启消费者C1创建出队列然后停止该 C1 的运行启动生产者 启动消费者 C1 等待 10 秒之后再启动消费者 C2 此时超过消息超过我们设置的最大长度6 超过6的消息就会变成死信 进入到死信队列中 此时 死信队列中就会收到4条死信
消息被拒绝
消息生产者代码同上生产者一致 需求消费者 C1 拒收消息 “info5”开启手动应答
package night;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import utils.RabbitMQUtils;import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;public class Consumer01 {//普通交换机的名称public static final String NORMAL_EXCHANGE normal_exchange;//死信交换机的名称public static final String DEAD_EXCHANGE dead_exchange;//普通队列的名称public static final String NORMAL_QUEUE normal_queue;//死信队列的名称public static final String DEAD_QUEUE dead_queue;public static void main(String[] args) throws IOException, TimeoutException {Channel channel RabbitMQUtils.getChannel();//声明死信和普通交换机channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);//声明普通队列//参数mapMapString, Object arguments new HashMap();//普通队列设置死信队列arguments.put(x-dead-letter-exchange,DEAD_EXCHANGE);//设置死信routingKeyarguments.put(x-dead-letter-routing-key,lisi);channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments);//声明死信队列channel.queueDeclare(DEAD_QUEUE,false,false,false,null);//绑定普通交换机和队列channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,zhangsan);//绑定死信交换机和死信队列channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,lisi);System.out.println(等待接收消息...);//接收消息DeliverCallback deliverCallback (consumerTag, message)-{String msg new String(message.getBody(), StandardCharsets.UTF_8);if(msg.equals(info5)){System.out.println(Consumer01接受的消息是msg 此消息是被C1拒绝的);//requeue 设置为 false 代表拒绝重新入队 该队列如果配置了死信交换机将发送到死信队列中channel.basicReject(message.getEnvelope().getDeliveryTag(),false);}else {System.out.println(Consumer01接受的消息是msg);channel.basicAck(message.getEnvelope().getDeliveryTag(),false);}};//将自动应答改为手动应答channel.basicConsume(NORMAL_QUEUE,false,deliverCallback,consumerTag - {});}
}
开启消费者C1创建出队列然后停止该 C1 的运行启动生产者,启动消费者 C1 等待 10 秒之后再启动消费者 C2
RabbitMQ延迟队列
延时队列,队列内部是有序的最重要的特性就体现在它的延时属性上延时队列中的元素是希望 在指定时间到了以后或之前取出和处理简单来说延时队列就是用来存放需要在指定时间被处理的 元素的队列。
延迟队列使用场景 订单在十分钟之内未支付则自动取消 新创建的店铺如果在十天内都没有上传过商品则自动发送消息提醒 用户注册成功后如果三天内没有登陆则进行短信提醒 用户发起退款如果三天内没有得到处理则通知相关运营人员 预定会议后需要在预定的时间点前十分钟通知各个与会人员参加会议 这些场景都有一个特点需要在某个事件发生之后或者之前的指定时间点完成某一项任务如 发生订单生成事件在十分钟之后检查该订单支付状态然后将未支付的订单进行关闭那我们一直轮询数据每秒查一次取出需要被处理的数据然后处理不就完事了吗
如果数据量比较少确实可以这样做比如对于「如果账单一周内未支付则进行自动结算」这样的需求 如果对于时间不是严格限制而是宽松意义上的一周那么每天晚上跑个定时任务检查一下所有未支付的账单确实也是一个可行的方案。但对于数据量比较大并且时效性较强的场景如「订单十分钟内未支付则关闭」短期内未支付的订单数据可能会有很多活动期间甚至会达到百万甚至千万级别对这么庞大的数据量仍旧使用轮询的方式显然是不可取的很可能在一秒内无法完成所有订单的检查同时会给数据库带来很大压力无法满足业务要求而且性能低下。
TTL的两种设置
TTL 是什么呢TTL 是 RabbitMQ 中一个消息或者队列的属性表明一条消息或者该队列中的所有消息的最大存活时间单位是毫秒。
换句话说如果一条消息设置了 TTL 属性或者进入了设置 TTL 属性的队列那么这条消息如果在 TTL 设置的时间内没有被消费则会成为「死信」。如果同时配置了队列的 TTL 和消息的 TTL那么较小的那个值将会被使用有两种方式设置 TTL。
队列设置TTL
在创建队列的时候设置队列的 x-message-ttl 属性
MapString, Object params new HashMap();
params.put(x-message-ttl,5000);
return QueueBuilder.durable(QA).withArguments(args).build(); // QA 队列的最大存活时间位 5000 毫秒
消息设置TTL
针对每条消息设置 TTL
rabbitTemplate.converAndSend(X,XC,message,correlationData - {correlationData.getMessageProperties().setExpiration(5000);
});
两者区别
如果设置了队列的 TTL 属性那么一旦消息过期就会被队列丢弃(如果配置了死信队列被丢到死信队列中)而第二种方式消息即使过期也不一定会被马上丢弃因为消息是否过期是在即将投递到消费者之前判定的如果当前队列有严重的消息积压情况则已过期的消息也许还能存活较长时间
另外还需要注意的一点是如果不设置 TTL表示消息永远不会过期如果将 TTL 设置为 0则表示除非此时可以直接投递该消息到消费者否则该消息将会被丢弃
整合SrpingBoot
添加依赖
parentgroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-parent/artifactIdversion2.5.5/versionrelativePath/
/parent
dependenciesdependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter/artifactId/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-test/artifactIdscopetest/scope/dependencydependencygroupIdcom.alibaba/groupIdartifactIdfastjson/artifactIdversion1.2.47/version/dependencydependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactId/dependency!--RabbitMQ 依赖--dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId/dependency
/dependencies
设置配置文件
server:port: 8080
spring:rabbitmq:host: 62.234.167.47port: 5672username: rootpassword: 123456
队列TTL
创建两个队列 QA 和 QB两个队列的 TTL 分别设置为 10S 和 40S然后再创建一个交换机 X 和死信交换机 Y它们的类型都是 direct创建一个死信队列 QD它们的绑定关系如下 原先配置队列信息写在了生产者和消费者代码中现在可写在配置类中生产者只发消息消费者只接受消息
配置文件
package com.example.rabbitmqtest.config;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;Configuration
public class TtlQueueConfig {//普通交换机的名称public static final String X_EXCHANGEX;//死信交换机的名称public static final String Y_DEAD_LETTER_EXCHANGEY;//普通队列的名称public static final String QUEUE_AQA;public static final String QUEUE_BQB;//死信队列的名称public static final String DEAD_LETTER_QUEUEQD;//声明xExchangeBean(xExchange)public DirectExchange xExchange(){return new DirectExchange(X_EXCHANGE);}//声明yExchange 别名Bean(yExchange)public DirectExchange yExchange(){return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);}//声明普通队列Bean(queueA)public Queue queueA(){MapString, Object arguments new HashMap();//设置死信交换机arguments.put(x-dead-letter-exchange,Y_DEAD_LETTER_EXCHANGE);//设置死信routingKeyarguments.put(x-dead-letter-routing-key,YD);//设置TTLarguments.put(x-message-ttl,40000);return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build();}//声明普通队列 要有ttl 为40sBean(queueB)public Queue queueB(){MapString,Object arguments new HashMap(3);//设置死信交换机arguments.put(x-dead-letter-exchange,Y_DEAD_LETTER_EXCHANGE);//设置死信RoutingKeyarguments.put(x-dead-letter-routing-key,YD);//设置TTL 10s 单位是msarguments.put(x-message-ttl,40000);return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build();}//声明死信队列Bean(queueD)public Queue queueD(){return QueueBuilder.durable(DEAD_LETTER_QUEUE).build();}//声明队列QA绑定x交换机Beanpublic Binding queueABindingX(Qualifier(queueA) Queue queueA, Qualifier(xExchange) DirectExchange xExchange){return BindingBuilder.bind(queueA).to(xExchange).with(XA);}//声明队列 QB 绑定 X 交换机Beanpublic Binding queueBBindingX(Qualifier(queueB) Queue queueB,Qualifier(xExchange) DirectExchange xExchange){return BindingBuilder.bind(queueB).to(xExchange).with(XB);}//声明队列 QD 绑定 Y 交换机Beanpublic Binding queueDBindingY(Qualifier(queueD) Queue queueD,Qualifier(yExchange) DirectExchange yExchange){return BindingBuilder.bind(queueD).to(yExchange).with(YD);}}
生产者
package com.example.rabbitmqtest.controller;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.util.Date;Slf4j
RestController
RequestMapping(/ttl)
public class SendMessageController {Autowiredprivate RabbitTemplate rabbitTemplate;RequestMapping(/sendMsg/{message})public void sendMsg(PathVariable(message) String message){log.info(当前时间:{},发送一条信息给两个TTL队列{},new Date().toString(),message);rabbitTemplate.convertAndSend(X,XA,消息来自ttl为10s的队列 message);rabbitTemplate.convertAndSend(X,XB,消息来自ttl为40s的队列 message);}
}
消费者代码
package com.example.rabbitmqtest.controller;import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.util.Date;Slf4j
Component
public class DeadLetterQueueConsumer {RabbitListener(queues QD)public void receiverD(Message message, Channel channel){String msg new String(message.getBody());log.info(当前时间:{}, 收到死信队列消息{},new Date().toString(),msg);}
} 第一条消息在 10S 后变成了死信消息然后被消费者消费掉第二条消息在 40S 之后变成了死信消息 然后被消费掉这样一个延时队列就打造完成了。
不过如果这样使用的话岂不是每增加一个新的时间需求就要新增一个队列这里只有 10S 和 40S 两个时间选项如果需要一个小时后处理那么就需要增加 TTL 为一个小时的队列如果是预定会议室然后提前通知这样的场景岂不是要增加无数个队列才能满足需求
延时队列TTL优化
在这里新增了一个队列 QC该队列不设置 TTL 时间根据前端的请求确定 TTL 时间绑定关系如下 新增配置类
package com.example.rabbitmqtest.config;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;Configuration
public class MsgTtlQueueConfig {//普通队列的名称public static final String QUEUE_C QC;//死信交换机的名称public static final String Y_DEAD_LETTER_EXCHANGE Y;//声明QCBean(queueC)public Queue QueueC(){MapString,Object arguments new HashMap(3);//设置死信交换机arguments.put(x-dead-letter-exchange,Y_DEAD_LETTER_EXCHANGE);//设置死信routingKeyarguments.put(x-dead-letter-routing-Key,XC);return QueueBuilder.durable(QUEUE_C).withArguments(arguments).build();}//声明队列QC 绑定 X 交换机Beanpublic Binding queueCBindingX(Qualifier(queueC) Queue queueC, Qualifier(xExchange)DirectExchange xExchange){return BindingBuilder.bind(queueC).to(xExchange).with(XC);}}
修改生产者
package com.example.rabbitmqtest.controller;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.util.Date;Slf4j
RestController
RequestMapping(/ttl)
public class SendMessageController {Autowiredprivate RabbitTemplate rabbitTemplate;RequestMapping(/sendMsg/{message})public void sendMsg(PathVariable(message) String message){log.info(当前时间:{},发送一条信息给两个TTL队列{},new Date().toString(),message);rabbitTemplate.convertAndSend(X,XA,消息来自ttl为10s的队列 message);rabbitTemplate.convertAndSend(X,XB,消息来自ttl为40s的队列 message);}//开始发消息RequestMapping(/sendExpirationMsg/{message}/{ttlTime})public void sendMsg(PathVariable(message) String message, PathVariable(ttlTime) String ttlTime){log.info(当前时间:{},发送一条时长是{}毫秒TTL信息给队列QC:{},new Date().toString(),ttlTime,message);rabbitTemplate.convertAndSend(X,XC,message,message1 - {message1.getMessageProperties().setExpiration(ttlTime);return message1;});}
}
Rabbtimq插件实现延迟队列
上文中提到的问题确实是一个问题如果不能实现在消息粒度上的 TTL并使其在设置的 TTL 时间及时死亡就无法设计成一个通用的延时队列。那如何解决呢接下来我们就去解决该问题。
安装延时队列插件
可去官网下载 (opens new window)找到 rabbitmq_delayed_message_exchange 插件放置到 RabbitMQ 的插件目录。
因为官网也是跳转去该插件的 GitHub 地址进行下载点击跳转(opens new window)
打开 Linux用 Xftp 将插件放到 RabbitMQ 的安装目录下的 plgins 目录
RabbitMQ 与其 plgins 目录默认分别位于
# RabbitMQ 安装目录
cd /usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8
# RabbitMQ 的 plgins 所在目录
cd /usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins
进入目录后执行下面命令让该插件生效然后重启 RabbitMQ
# 安装
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
# 重启服务
systemctl restart rabbitmq-server
打开 Web 界面查看交换机的新增功能列表如果多出了如图所示代表成功添加插件 实战
在这里新增了一个队列 delayed.queue一个自定义交换机 delayed.exchange绑定关系如下: