网站开发要什么软件,wordpress多站点是什么意思,杭州昨晚发生大事了,58同城一样的网站怎样建设延时队列是一种特殊的消息队列#xff0c;它允许消息在指定的时间后被消费。在微服务架构、电商系统和任务调度场景中#xff0c;延时队列扮演着关键角色。例如#xff0c;订单超时自动取消、定时提醒、延时支付等都依赖延时队列实现。
Redis作为高性能的内存数据库#x…延时队列是一种特殊的消息队列它允许消息在指定的时间后被消费。在微服务架构、电商系统和任务调度场景中延时队列扮演着关键角色。例如订单超时自动取消、定时提醒、延时支付等都依赖延时队列实现。
Redis作为高性能的内存数据库具备原子操作、数据结构丰富和简单易用的特性本文将介绍基于Redis实现分布式延时队列的四种方式。
1. 基于Sorted Set的延时队列
原理
利用Redis的Sorted Set(有序集合)将消息ID作为member执行时间戳作为score进行存储。通过ZRANGEBYSCORE命令可以获取到达执行时间的任务。
代码实现
public class RedisZSetDelayQueue {private final StringRedisTemplate redisTemplate;private final String queueKey delay_queue:tasks;public RedisZSetDelayQueue(StringRedisTemplate redisTemplate) {this.redisTemplate redisTemplate;}/*** 添加延时任务* param taskId 任务ID* param taskInfo 任务信息(JSON字符串)* param delayTime 延迟时间(秒)*/public void addTask(String taskId, String taskInfo, long delayTime) {// 计算执行时间long executeTime System.currentTimeMillis() delayTime * 1000;// 存储任务详情redisTemplate.opsForHash().put(delay_queue:details, taskId, taskInfo);// 添加到延时队列redisTemplate.opsForZSet().add(queueKey, taskId, executeTime);System.out.println(Task added: taskId , will execute at: executeTime);}/*** 轮询获取到期任务*/public ListString pollTasks() {long now System.currentTimeMillis();// 获取当前时间之前的任务SetString taskIds redisTemplate.opsForZSet().rangeByScore(queueKey, 0, now);if (taskIds null || taskIds.isEmpty()) {return Collections.emptyList();}// 获取任务详情ListString tasks new ArrayList();for (String taskId : taskIds) {String taskInfo (String) redisTemplate.opsForHash().get(delay_queue:details, taskId);if (taskInfo ! null) {tasks.add(taskInfo);// 从集合和详情中移除任务redisTemplate.opsForZSet().remove(queueKey, taskId);redisTemplate.opsForHash().delete(delay_queue:details, taskId);}}return tasks;}// 定时任务示例public void startTaskProcessor() {ScheduledExecutorService scheduler Executors.newScheduledThreadPool(1);scheduler.scheduleAtFixedRate(() - {try {ListString tasks pollTasks();for (String task : tasks) {processTask(task);}} catch (Exception e) {e.printStackTrace();}}, 0, 1, TimeUnit.SECONDS);}private void processTask(String taskInfo) {System.out.println(Processing task: taskInfo);// 实际任务处理逻辑}
}优缺点
优点
实现简单易于理解任务按执行时间自动排序支持精确的时间控制
缺点
需要轮询获取到期任务消耗CPU资源大量任务情况下ZRANGEBYSCORE操作可能影响性能没有消费确认机制需要额外实现
2. 基于List 定时轮询的延时队列
原理
这种方式使用多个List作为存储容器按延迟时间的不同将任务分配到不同的队列中。通过定时轮询各个队列将到期任务移动到一个立即执行队列。
代码实现
public class RedisListDelayQueue {private final StringRedisTemplate redisTemplate;private final String readyQueueKey delay_queue:ready; // 待处理队列private final MapInteger, String delayQueueKeys; // 延迟队列按延时时间分级public RedisListDelayQueue(StringRedisTemplate redisTemplate) {this.redisTemplate redisTemplate;// 初始化不同延迟级别的队列delayQueueKeys new HashMap();delayQueueKeys.put(5, delay_queue:delay_5s); // 5秒delayQueueKeys.put(60, delay_queue:delay_1m); // 1分钟delayQueueKeys.put(300, delay_queue:delay_5m); // 5分钟delayQueueKeys.put(1800, delay_queue:delay_30m); // 30分钟}/*** 添加延时任务*/public void addTask(String taskInfo, int delaySeconds) {// 选择合适的延迟队列String queueKey selectDelayQueue(delaySeconds);// 任务元数据包含任务信息和执行时间long executeTime System.currentTimeMillis() delaySeconds * 1000;String taskData executeTime : taskInfo;// 添加到延迟队列redisTemplate.opsForList().rightPush(queueKey, taskData);System.out.println(Task added to queueKey : taskData);}/*** 选择合适的延迟队列*/private String selectDelayQueue(int delaySeconds) {// 找到最接近的延迟级别int closestDelay delayQueueKeys.keySet().stream().filter(delay - delay delaySeconds).min(Integer::compareTo).orElse(Collections.max(delayQueueKeys.keySet()));return delayQueueKeys.get(closestDelay);}/*** 移动到期任务到待处理队列*/public void moveTasksToReadyQueue() {long now System.currentTimeMillis();// 遍历所有延迟队列for (String queueKey : delayQueueKeys.values()) {boolean hasMoreTasks true;while (hasMoreTasks) {// 查看队列头部任务String taskData redisTemplate.opsForList().index(queueKey, 0);if (taskData null) {hasMoreTasks false;continue;}// 解析任务执行时间long executeTime Long.parseLong(taskData.split(:, 2)[0]);// 检查是否到期if (executeTime now) {// 通过LPOP原子性地移除队列头部任务String task redisTemplate.opsForList().leftPop(queueKey);// 任务可能被其他进程处理再次检查if (task ! null) {// 提取任务信息并添加到待处理队列String taskInfo task.split(:, 2)[1];redisTemplate.opsForList().rightPush(readyQueueKey, taskInfo);System.out.println(Task moved to ready queue: taskInfo);}} else {// 队列头部任务未到期无需检查后面的任务hasMoreTasks false;}}}}/*** 获取待处理任务*/public String getReadyTask() {return redisTemplate.opsForList().leftPop(readyQueueKey);}/*** 启动任务处理器*/public void startTaskProcessors() {// 定时移动到期任务ScheduledExecutorService scheduler Executors.newScheduledThreadPool(2);// 移动任务线程scheduler.scheduleAtFixedRate(this::moveTasksToReadyQueue, 0, 1, TimeUnit.SECONDS);// 处理任务线程scheduler.scheduleAtFixedRate(() - {String task getReadyTask();if (task ! null) {processTask(task);}}, 0, 100, TimeUnit.MILLISECONDS);}private void processTask(String taskInfo) {System.out.println(Processing task: taskInfo);// 实际任务处理逻辑}
}优缺点
优点
分级队列设计降低单队列压力相比Sorted Set占用内存少支持队列监控和任务优先级
缺点
延迟时间精度受轮询频率影响实现复杂度高需要维护多个队列时间判断和队列操作非原子性需特别处理并发问题
3. 基于发布/订阅(Pub/Sub)的延时队列
原理
结合Redis发布/订阅功能与本地时间轮算法实现延迟任务的分发和处理。任务信息存储在Redis中而时间轮负责任务的调度和发布。
代码实现
public class RedisPubSubDelayQueue {private final StringRedisTemplate redisTemplate;private final String TASK_TOPIC delay_queue:task_channel;private final String TASK_HASH delay_queue:tasks;private final HashedWheelTimer timer;public RedisPubSubDelayQueue(StringRedisTemplate redisTemplate) {this.redisTemplate redisTemplate;// 初始化时间轮刻度100ms轮子大小512this.timer new HashedWheelTimer(100, TimeUnit.MILLISECONDS, 512);// 启动消息订阅subscribeTaskChannel();}/*** 添加延时任务*/public void addTask(String taskId, String taskInfo, long delaySeconds) {// 存储任务信息到RedisredisTemplate.opsForHash().put(TASK_HASH, taskId, taskInfo);// 添加到时间轮timer.newTimeout(timeout - {// 发布任务就绪消息redisTemplate.convertAndSend(TASK_TOPIC, taskId);}, delaySeconds, TimeUnit.SECONDS);System.out.println(Task scheduled: taskId , delay: delaySeconds s);}/*** 订阅任务通道*/private void subscribeTaskChannel() {redisTemplate.getConnectionFactory().getConnection().subscribe((message, pattern) - {String taskId new String(message.getBody());// 获取任务信息String taskInfo (String) redisTemplate.opsForHash().get(TASK_HASH, taskId);if (taskInfo ! null) {// 处理任务processTask(taskId, taskInfo);// 删除任务redisTemplate.opsForHash().delete(TASK_HASH, taskId);}}, TASK_TOPIC.getBytes());}private void processTask(String taskId, String taskInfo) {System.out.println(Processing task: taskId - taskInfo);// 实际任务处理逻辑}// 模拟HashedWheelTimer类public static class HashedWheelTimer {private final ScheduledExecutorService scheduler Executors.newScheduledThreadPool(1);private final long tickDuration;private final TimeUnit unit;public HashedWheelTimer(long tickDuration, TimeUnit unit, int wheelSize) {this.tickDuration tickDuration;this.unit unit;}public void newTimeout(TimerTask task, long delay, TimeUnit timeUnit) {long delayMillis timeUnit.toMillis(delay);scheduler.schedule(() - task.run(null), delayMillis, TimeUnit.MILLISECONDS);}public interface TimerTask {void run(Timeout timeout);}public interface Timeout {}}
}优缺点
优点
即时触发无需轮询高效的时间轮算法可以跨应用订阅任务分离任务调度和执行降低耦合
缺点
依赖本地时间轮非纯Redis实现Pub/Sub模式无消息持久化可能丢失消息服务重启时需要重建时间轮订阅者需要保持连接
4. 基于Redis Stream的延时队列
原理
Redis 5.0引入的Stream是一个强大的数据结构专为消息队列设计。结合Stream的消费组和确认机制可以构建可靠的延时队列。
代码实现
public class RedisStreamDelayQueue {private final StringRedisTemplate redisTemplate;private final String delayQueueKey delay_queue:stream;private final String consumerGroup delay_queue_consumers;private final String consumerId UUID.randomUUID().toString();public RedisStreamDelayQueue(StringRedisTemplate redisTemplate) {this.redisTemplate redisTemplate;// 创建消费者组try {redisTemplate.execute((RedisCallbackString) connection - {connection.streamCommands().xGroupCreate(delayQueueKey.getBytes(), consumerGroup, ReadOffset.from(0), true);return OK;});} catch (Exception e) {// 消费者组可能已存在System.out.println(Consumer group may already exist: e.getMessage());}}/*** 添加延时任务*/public void addTask(String taskInfo, long delaySeconds) {long executeTime System.currentTimeMillis() delaySeconds * 1000;MapString, Object task new HashMap();task.put(executeTime, String.valueOf(executeTime));task.put(taskInfo, taskInfo);redisTemplate.opsForStream().add(delayQueueKey, task);System.out.println(Task added: taskInfo , execute at: executeTime);}/*** 获取待执行的任务*/public ListString pollTasks() {long now System.currentTimeMillis();ListString readyTasks new ArrayList();// 读取尚未处理的消息ListMapRecordString, Object, Object records redisTemplate.execute((RedisCallbackListMapRecordString, Object, Object) connection - {return connection.streamCommands().xReadGroup(consumerGroup.getBytes(),consumerId.getBytes(),StreamReadOptions.empty().count(10),StreamOffset.create(delayQueueKey.getBytes(), ReadOffset.from()));});if (records ! null) {for (MapRecordString, Object, Object record : records) {String messageId record.getId().getValue();MapObject, Object value record.getValue();long executeTime Long.parseLong((String) value.get(executeTime));String taskInfo (String) value.get(taskInfo);// 检查任务是否到期if (executeTime now) {readyTasks.add(taskInfo);// 确认消息已处理redisTemplate.execute((RedisCallbackString) connection - {connection.streamCommands().xAck(delayQueueKey.getBytes(),consumerGroup.getBytes(),messageId.getBytes());return OK;});// 可选从流中删除消息redisTemplate.opsForStream().delete(delayQueueKey, messageId);} else {// 任务未到期放回队列redisTemplate.execute((RedisCallbackString) connection - {connection.streamCommands().xAck(delayQueueKey.getBytes(),consumerGroup.getBytes(),messageId.getBytes());return OK;});// 重新添加任务可选使用延迟重新入队策略MapString, Object newTask new HashMap();newTask.put(executeTime, String.valueOf(executeTime));newTask.put(taskInfo, taskInfo);redisTemplate.opsForStream().add(delayQueueKey, newTask);}}}return readyTasks;}/*** 启动任务处理器*/public void startTaskProcessor() {ScheduledExecutorService scheduler Executors.newScheduledThreadPool(1);scheduler.scheduleAtFixedRate(() - {try {ListString tasks pollTasks();for (String task : tasks) {processTask(task);}} catch (Exception e) {e.printStackTrace();}}, 0, 1, TimeUnit.SECONDS);}private void processTask(String taskInfo) {System.out.println(Processing task: taskInfo);// 实际任务处理逻辑}
}优缺点
优点
支持消费者组和消息确认提供可靠的消息处理内置消息持久化机制支持多消费者并行处理消息ID包含时间戳便于排序
缺点
要求Redis 5.0版本实现相对复杂仍需轮询获取到期任务对未到期任务的处理相对繁琐
性能对比与选型建议
实现方式性能可靠性实现复杂度内存占用适用场景Sorted Set★★★★☆★★★☆☆低中任务量适中需要精确调度List 轮询★★★★★★★★☆☆中低高并发延时精度要求不高Pub/Sub 时间轮★★★★★★★☆☆☆高低实时性要求高可容忍服务重启丢失Stream★★★☆☆★★★★★高中可靠性要求高需要消息确认
总结
在实际应用中可根据系统规模、性能需求、可靠性要求和实现复杂度等因素进行选择也可以组合多种方式打造更符合业务需求的延时队列解决方案。无论选择哪种实现都应关注可靠性、性能和监控等方面确保延时队列在生产环境中稳定运行。