asp网站表格代码,合作市建设局网站,东莞企业网站建设公司,网站外链怎么发延迟队列概念
延迟队列中的元素是希望在指定时间到了之后或之前取出和处理消息#xff0c;并且队列内部是有序的。简单来说#xff0c;延时队列就是用来存放需要在指定时间被处理的元素的队列
延迟队列使用场景
延迟队列经常使用的场景有以下几点#xff1a;
订单在十分…延迟队列概念
延迟队列中的元素是希望在指定时间到了之后或之前取出和处理消息并且队列内部是有序的。简单来说延时队列就是用来存放需要在指定时间被处理的元素的队列
延迟队列使用场景
延迟队列经常使用的场景有以下几点
订单在十分钟之内未支付则自动取消新创建的店铺如果在十天内都没有上传过商品则自动发送消息提醒用户注册成功后如果三天内没有登陆则进行短信提醒用户发起退款如果三天内没有得到处理则通知相关运营人员预定会议后需要在预定的时间点前十分钟通知各个与会人员参加会议
这些场景都有一个特点需要在某个事件发生之后或者之前的指定时间点完成某一项任务如 发生订单生成事件在十分钟之后检查该订单支付状态然后将未支付的订单进行关闭看起来似乎 使用定时任务一直轮询数据每秒查一次取出需要被处理的数据然后处理不就完事了吗如果 数据量比较少确实可以这样做比如对于“如果账单一周内未支付则进行自动结算”这样的需求 如果对于时间不是严格限制而是宽松意义上的一周那么每天晚上跑个定时任务检查一下所有未支 付的账单确实也是一个可行的方案。但对于数据量比较大并且时效性较强的场景如“订单十 分钟内未支付则关闭“短期内未支付的订单数据可能会有很多活动期间甚至会达到百万甚至千万 级别对这么庞大的数据量仍旧使用轮询的方式显然是不可取的很可能在一秒内无法完成所有订单 的检查同时会给数据库带来很大压力无法满足业务要求而且性能低下 RabbitMQ中的TTL
TTL表示RabbitMQ中的一个消息或者队列的属性表明一条消息或者该队列中所有消息的最大存活时间单位是毫秒。换句话说如果一条消息设置了 TTL 属性或者进入了设置 TTL 属性的队列那么这 条消息如果在 TTL 设置的时间内没有被消费则会成为死信。如果同时配置了队列的 TTL 和消息的 TTL那么较小的那个值将会被使用
消息设置TTL
rabbitTemplate.convertAndSend( myExchange, // 交换机名称 XC, // 路由键 message, correlationData - { // 设置消息的 TTL correlationData.getMessageProperties().setExpiration(String.valueOf(ttlTime)); return correlationData; }
);
队列设置TTL
MapString, Object args new HashMap();
args.put(x-message-ttl, 60000); // 设置队列中所有消息的TTL为60秒
return new Queue(myQueue, true, false, false, args);
两者区别
如果设置了队列的 TTL 属性那么一旦消息过期就会被队列丢弃(如果配置了死信队列被丢到死信队 列中)而第二种方式消息即使过期也不一定会被马上丢弃因为消息是否过期是在即将投递到消费者之前判定的如果当前队列有严重的消息积压情况则已过期的消息也许还能存活较长时间另外还需要注意的一点是如果不设置 TTL表示消息永远不会过期如果将 TTL 设置为 0则表示除非此时可以 直接投递该消息到消费者否则该消息将会被丢弃
整合SpringBoot
创建项目 添加依赖
?xml version1.0 encodingUTF-8?
project xmlnshttp://maven.apache.org/POM/4.0.0 xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersionparentgroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-parent/artifactIdversion3.2.2/versionrelativePath/ !-- lookup parent from repository --/parentgroupIdcom.example/groupIdartifactIdspringboot-rabbitmq/artifactIdversion0.0.1-SNAPSHOT/versionnamedemo/namedescriptionDemo project for Spring Boot/descriptionpropertiesjava.version17/java.version/propertiesdependenciesdependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId/dependencydependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactIdoptionaltrue/optional/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-test/artifactIdscopetest/scope/dependencydependencygroupIdorg.springframework.amqp/groupIdartifactIdspring-rabbit-test/artifactIdscopetest/scope/dependency/dependenciesbuildpluginsplugingroupIdorg.springframework.boot/groupIdartifactIdspring-boot-maven-plugin/artifactId/plugin/plugins/build/project修改配置文件
spring.rabbitmq.host118.31.6.132
spring.rabbitmq.port5672
spring.rabbitmq.usernameadmin
spring.rabbitmq.password123
队列TTL
代码架构图
创建两个队列 QA 和 QB两者队列 TTL 分别设置为 10S 和 40S然后在创建一个交换机 X 和死信交 换机 Y它们的类型都是 direct创建一个死信队列 QD它们的绑定关系如下
配置文件类代码
package com.example.demo.config;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;
Configuration
public class TtlQueueConfig {public static final String X_EXCHANGE X;public static final String QUEUE_A QA;public static final String QUEUE_B QB;public static final String Y_DEAD_LETTER_EXCHANGE Y;public static final String DEAD_LETTER_QUEUE QD;Bean(xExchange)public DirectExchange xExchange(){return new DirectExchange(X_EXCHANGE);}Bean(yExchange)public DirectExchange yExchange(){return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);}Bean(queueA)public Queue queueA(){MapString,Object args new HashMap();args.put(x-dead-letter-exchange,Y_DEAD_LETTER_EXCHANGE);args.put(x-dead-letter-routing-key,YD);args.put(x-message-ttl,10000);return QueueBuilder.durable(QUEUE_A).withArguments(args).build();}Bean(queueB)public Queue queueB(){MapString, Object args new HashMap(3);args.put(x-dead-letter-exchange, Y_DEAD_LETTER_EXCHANGE);args.put(x-dead-letter-routing-key, YD);args.put(x-message-ttl, 40000);return QueueBuilder.durable(QUEUE_B).withArguments(args).build();}Beanpublic Binding queueaBindingX(Qualifier(queueA) Queue queueA,Qualifier(xExchange) DirectExchange xExchange){return BindingBuilder.bind(queueA).to(xExchange).with(XA);}Beanpublic Binding queuebBindingX(Qualifier(queueB) Queue queue1B,Qualifier(xExchange) DirectExchange xExchange){return BindingBuilder.bind(queue1B).to(xExchange).with(XB);}Beanpublic Binding deadLetterBindingQAD(Qualifier(queueD) Queue queueD,Qualifier(yExchange) DirectExchange yExchange){return BindingBuilder.bind(queueD).to(yExchange).with(YD);}Bean(queueD)public Queue queueD(){return new Queue(DEAD_LETTER_QUEUE);}
}消息生产者代码
package com.example.demo.controller;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.util.Date;Slf4j
RequestMapping(/ttl)
RestController
public class SendMsgController {Autowiredprivate RabbitTemplate rabbitTemplate;GetMapping(/sendMsg/{message})public void sendMsg(PathVariable String message){log.info(当前时间{}发送一个消息给两个TTL队列{},new Date(),message);rabbitTemplate.convertAndSend(X,XA,消息来自ttl为10s的队列message);rabbitTemplate.convertAndSend(X,XB,消息来自ttl为40s的队列message);}
}消息消费者代码
package com.example.demo.consumer;import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.io.IOException;
import java.util.Date;Slf4j
Component
public class DeadLetterQueueConsumer {//接受消息RabbitListener(queues QD)public void receiveD(Message message, Channel channel) throws IOException {String msg new String(message.getBody());log.info(当前时间{},收到死信队列信息{}, new Date().toString(), msg);}}发送一个请求127.0.0.1:8080/ttl/sendMsg/haha 第一条消息在 10S 后变成了死信消息然后被消费者消费掉第二条消息在 40S 之后变成了死信消息 然后被消费掉这样一个延时队列就打造完成了
不过如果这样使用的话岂不是每增加一个新的时间需求就要新增一个队列这里只有 10S 和 40S 两个时间选项如果需要一个小时后处理那么就需要增加 TTL 为一个小时的队列如果是预定会议室然后提前通知这样的场景岂不是要增加无数个队列才能满足需求
延迟队列优化
代码架构图
在这里新增了一个队列 QC,绑定关系如下,该队列不设置 TTL 时间 配置文件类代码
package com.example.demo.config;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;import java.util.HashMap;
import java.util.Map;Component
public class MsgTtlQueueConfig {public static final String Y_DEAD_LETTER_EXCHANGE Y;public static final String QUEUE_C QC;//声明队列 C 死信交换机Bean(queueC)public Queue queueB(){MapString, Object args new HashMap(3);//声明当前队列绑定的死信交换机args.put(x-dead-letter-exchange, Y_DEAD_LETTER_EXCHANGE);//声明当前队列的死信路由 keyargs.put(x-dead-letter-routing-key, YD);//没有声明 TTL 属性return QueueBuilder.durable(QUEUE_C).withArguments(args).build();}//声明队列 B 绑定 X 交换机Beanpublic Binding queuecBindingX(Qualifier(queueC) Queue queueC,Qualifier(xExchange) DirectExchange xExchange){return BindingBuilder.bind(queueC).to(xExchange).with(XC);}
}
消息生产者代码
package com.example.demo.controller;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.util.Date;Slf4j
RequestMapping(/ttl)
RestController
public class SendMsgController {Autowiredprivate RabbitTemplate rabbitTemplate;GetMapping(sendExpirationMsg/{message}/{ttlTime})public void sendMsg(PathVariable String message,PathVariable String ttlTime) {rabbitTemplate.convertAndSend(X, XC, message, correlationData -{correlationData.getMessageProperties().setExpiration(ttlTime);return correlationData;});log.info(当前时间{},发送一条时长{}毫秒 TTL 信息给队列 C:{}, new Date(),ttlTime, message);}
}发起请求
http://localhost:8080/ttl/sendExpirationMsg/你好 1/20000 http://localhost:8080/ttl/sendExpirationMsg/你好 2/2000 看起来似乎没什么问题但是在最开始的时候就介绍过如果使用在消息属性上设置 TTL 的方式消 息可能并不会按时“死亡“因为 RabbitMQ 只会检查第一个消息是否过期如果过期则丢到死信队列 如果第一个消息的延时时长很长而第二个消息的延时时长很短第二个消息并不会优先得到执行
Rabbitmq 插件实现延迟队列
如果不能实现在消息粒度上的 TTL并使其在设置的 TTL 时间 及时死亡就无法设计成一个通用的延时队列。那如何解决呢接下来我们就去解决该问题
安装延时队列插件
在官网上下载 https://www.rabbitmq.com/community-plugins.html下载 rabbitmq_delayed_message_exchange 插件然后解压放置到 RabbitMQ 的插件目录。或者
链接https://pan.baidu.com/s/1U7rdXf2yk9PRGxOJxhcY8A?pwdd0jd 提取码d0jd 进入 RabbitMQ 的安装目录下的 plgins 目录执行下面命令让该插件生效 rabbitmq_delayed_message_exchange 重启RabbitMQsystemctl restart rabbitmq-server
然后我们就可以看管理界面 代码架构图
在这里新增了一个队列 delayed.queue,一个自定义交换机 delayed.exchange绑定关系如下 配置文件类代码 package com.example.demo.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.amqp.core.*;import java.util.HashMap;
import java.util.Map;Configuration
public class DelayedQueueConfig {public static final String DELAYED_QUEUE_NAME delayed.queue;public static final String DELAYED_EXCHANGE_NAME delayed.exchange;public static final String DELAYED_ROUTING_KEY delayed.routingkey;Beanpublic Queue delayedQueue(){return new Queue(DELAYED_QUEUE_NAME);}//自定义交换机 我们这里实现一个延迟交换机Beanpublic CustomExchange delayedExchange(){MapString, Object args new HashMap();//自定义交换机的类型args.put(x-delayed-type, direct);/*** 1、交换机名称* 2、交换机类型* 3、持久化* 4、自动删除* 5、其他参数*/return new CustomExchange(DELAYED_EXCHANGE_NAME, x-delayed-message, true, false,args);}Beanpublic Binding bindingDelayedQueue(Qualifier(delayedQueue) Queue queue,Qualifier(delayedExchange) CustomExchangedelayedExchange) {return BindingBuilder.bind(queue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();}
}消息生产者代码
package com.example.demo.controller;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.util.Date;Slf4j
RequestMapping(/ttl)
RestController
public class SendMsgController {Autowiredprivate RabbitTemplate rabbitTemplate;public static final String DELAYED_EXCHANGE_NAME delayed.exchange;public static final String DELAYED_ROUTING_KEY delayed.routingkey;GetMapping(sendDelayMsg/{message}/{delayTime})public void sendMsg(PathVariable String message,PathVariable Integer delayTime) {rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME, DELAYED_ROUTING_KEY, message,correlationData -{correlationData.getMessageProperties().setDelay(delayTime);return correlationData;});}
}消息消费者代码
package com.example.demo.consumer;import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.io.IOException;
import java.util.Date;Slf4j
Component
public class DeadLetterQueueConsumer {public static final String DELAYED_QUEUE_NAME delayed.queue;RabbitListener(queues DELAYED_QUEUE_NAME)public void receiveDelayedQueue(Message message){String msg new String(message.getBody());log.info(当前时间{},收到延时队列的消息{}, new Date().toString(), msg);}}发起请求
http://localhost:8080/ttl/sendDelayMsg/come on baby1/20000 http://localhost:8080/ttl/sendDelayMsg/come on baby2/2000 第二个消息被先消费掉了符合预期
总结
延时队列在需要延时处理的场景下非常有用使用 RabbitMQ 来实现延时队列可以很好的利用 RabbitMQ 的特性如消息可靠发送、消息可靠投递、死信队列来保障消息至少被消费一次以及未被正 确处理的消息不会被丢弃。另外通过 RabbitMQ 集群的特性可以很好的解决单点故障问题不会因为 单个节点挂掉导致延时队列不可用或者消息丢失