做网站一个月能赚多少钱,免费商城app,shopkeeper wordpress,湘潭关键词优化服务应用场景 通常在一些需要经历一段时间或者到达某个指定时间节点才会执行的功能#xff0c;比如以下这些场景#xff1a; 订单超时提醒收货自动确认会议提醒代办事项提醒 为什么使用延时队列 对于数据量小且实时性要求不高的需求来说#xff0c;最简单的方法就是定时扫描数据…应用场景 通常在一些需要经历一段时间或者到达某个指定时间节点才会执行的功能比如以下这些场景 订单超时提醒收货自动确认会议提醒代办事项提醒 为什么使用延时队列 对于数据量小且实时性要求不高的需求来说最简单的方法就是定时扫描数据库。 但是当数量达到数百万、上千万级别且时定时扫库就显得非常低效且消耗资源 甚至有些时间间隔小实时性要求高的情况上一次扫描还没结束下一次就又开始了 这时候如果使用延时队列就会比较合适 延时队列的几种方式
Quartz 定时任务实现扫库DelayQueue JDK中提供了一组实现延迟队列的APIRedis sorted setRedis 过期键监听回调RabbitMQ 死信队列RabbitMQ 基于插件实现延迟队列Wheel 时间轮训算法
Redisson 实现延时队列
顾名思义 Redis son 就是 Redis 的儿子举个栗子先
1.引入 pom
dependencygroupIdorg.redisson/groupIdartifactIdredisson-spring-boot-starter/artifactIdversion${lastest.version}/version
/dependency2.封装一个 RedissonQueue 类
Service
public class RedissonQueue {public static final String QUEUE delayQueue;// 默认超时时间30秒public static final Integer DEFAULT_TIMEOUT 30;Resourceprivate RedissonClient redissonClient;// 加入任务并设置到期时间public void offer(String taskId, Integer timeout) {RDelayedQueueString delayedQueue delayedQueue();delayedQueue.offer(taskId, Objects.isNull(timeout) ? DEFAULT_TIMEOUT : timeout, TimeUnit.SECONDS);}// 移除任务public void remove(String taskId) {RDelayedQueueString delayedQueue delayedQueue();delayedQueue.removeIf(messageId - messageId.equals(taskId));}// 任务列表public RDelayedQueueString delayedQueue() {RBlockingDequeString blockingDeque blockingDeque();return redissonClient.getDelayedQueue(blockingDeque);}public RBlockingDequeString blockingDeque() {return redissonClient.getBlockingDeque(QUEUE);}public boolean isShutdown() {return redissonClient.isShutdown();}public void shutdown() {redissonClient.shutdown();}}3.交给 Spring 管理
Slf4j
Service
public class RedissonService implements ApplicationRunner {Resourceprivate RedissonQueue redissonQueue;Resource(name threadPoolTaskExecutor)private ThreadPoolTaskExecutor executor;Overridepublic void run(ApplicationArguments args) {RBlockingDequeString blockingDeque redissonQueue.blockingDeque();executor.execute(() - {while (true) {if (redissonQueue.isShutdown()) {return;} else {String messageId null;try {messageId blockingDeque.take();} catch (InterruptedException e) {log.warn(RedissonConsumer error:{}, e.getMessage());}if (!Objects.isNull(messageId) !messageId.isEmpty()) {log.warn(timeout messageId : {}, messageId);}}}});}// 初始化启动服务就执行一次PostConstructpublic void init() {redissonQueue.delayedQueue();}PreDestroypublic void shutdown() {redissonQueue.shutdown();}}4.测试接口
Operation(summary 添加任务, description 添加任务)
PostMapping
public ResponseEntity? add(RequestParam(value taskId, required false) String taskId,RequestParam(value timeout, required false) Integer timeout) {taskId StringUtils.isEmpty(taskId) ? String.valueOf(snowflake.nextId()) : taskId;redissonQueue.offer(taskId, timeout);return ResponseEntity.ok().body(redissonQueue.delayedQueue());
}Operation(summary 移除任务, description 移除任务)
DeleteMapping(value /{taskId})
public ResponseEntity? remove(PathVariable(taskId) String taskId) {redissonQueue.remove(taskId);return ResponseEntity.ok().body(redissonQueue.delayedQueue());
}5.测试结果
添加10个任务 删除第1个任务 可以看到第一个任务删除后没有被执行没有设置到期时间默认为30秒到期 实现原理
redisson_delay_queue_timeout:delayQueuesorted set 数据类型存放所有延迟任务按延迟任务的到期时间戳提交任务时间戳 延迟时间排序所以列表最前面第一个元素就是整个延迟队列中最早被执行的任务。redisson_delay_queue:delayQueuelist 数据类型也是存放所有任务。delayQueuelist 数据类型被称为目标队列这个里面存放的任务都是已经到延迟时间的可以被消费者获取的任务所以上面示例中 RBlockingQueue 的 take 方法是从此目标队列中获取任务的。redisson_delay_queue_channel:delayQueue是一个 channel用来通知客户端开启一个延迟任务生产者提交任务时将任务放到 redisson_delay_queue_timeout:delayQueue 中提交任务的时间戳延迟时间客户端会有一个延迟任务这个延迟任务会向 Redis Server 发送一段 lua 脚本Redis 执行 lua 脚本中的命令此操作是原子性的
lua 脚本主要干两件事
将到了延迟时间的任务从 redisson_delay_queue_timeout:delayQueue 中移除存到 delayQueue 这个目标队列获取到 redisson_delay_queue_timeout:delayQueue 中最早到期时间的任务的到期时间戳发布到 redisson_delay_queue_channel: delayQueue channel 中 当客户端监听到 redisson_delay_queue_channel:delayQueue 这个 channel 的消息时会再次提交一个客户端延迟任务延迟时间就是消息最早到期时间任务的到期时间戳当前时间戳 这个时间其实也就是 redisson_delay_queue_channel:delayQueue 中最早到期时间的任务的剩余的延迟时间。 一旦时间来到最早到期时间任务的到期时间戳redisson_delay_queue_timeout:delayQueue 中最早到期时间的任务已经到期客户端的延迟任务也同时到期 于是开始执行 lua 脚本操作及时将到期任务放到目标队列中。然后再次发布剩余的延迟任务中最早到期任务的到期时间戳到 channel 中 如此循环运行下去保证 redisson_delay_queue_timeout:delayQueue 中到期数据能及时放到目标队列中。 这里存在一个特殊情况需要项目启动时就执行一次延时队列。因为由于没有客户端延迟任务的执行 可能会出现 redisson_delay_queue_timeout:delayQueue 队列中有到期但是没有被放到目标队列的可能启动就执行一次是为了保证到期的数据能被及时放到目标队列中。 结论 Redisson 方案理论上没有延迟但当消息数量剧增消费者消费缓慢这种情况下可能会导致延迟任务消费的延迟。 消息丢失问题 Redisson 方案最大程度上减轻消息丢失的可能性因为所有任务都是存在 list 和 sorted set 两种数据类型中Redis 有持久化机制。除非整个 redis 集群宕机可能丢失一小部分数据。 广播任务问题是不会出现的因为每个客户端都是从同一个目标队列中获取任务。
Redisson 这种实现方案是比较合适且靠谱的一般中小型项目建议用 Redisson 实现延迟队列规模较大的项目直接上 MQ。
整合DEMO仓库地址