网站付费推广,网站建设在国内外研究现状,蛇口网站建设公司,无锡专业做网站的公司日常开发中#xff0c;可能会遇到一些延迟处理的消息任务#xff0c;例如以下场景 ①订单支付超时未支付 ②考试时间结束试卷自动提交 ③身份证或其他验证信息超时未提交等场景。 ④用户申请退款#xff0c;一天内没有响应默认自动退款等等。 如何处理这类任务#xff0c;最…日常开发中可能会遇到一些延迟处理的消息任务例如以下场景 ①订单支付超时未支付 ②考试时间结束试卷自动提交 ③身份证或其他验证信息超时未提交等场景。 ④用户申请退款一天内没有响应默认自动退款等等。 如何处理这类任务最简单的方法就是将消息插入到数据库然后使用定时任务扫描数据库。但是如果如果大量用户请求需要处理就需要线程频繁的连接数据库这样可能会对其他数据库请求造成影响这样情况下我们可以使用延迟队列方式解决此类问题。
1.DelayQueue实现方案
首先使用java自带的DelayQueue完成此方案。 DelayQueue内部使用优先级队列PriorityQueue完成任务存储而 PriorityQueue 采用二叉堆的思想确保在数据插入到队列中时最小值的排在堆顶每次从拿数据只要从堆顶取即可。 同时 DelayQueue 还是用了可重入锁 ReentrantLock来确保线程并发安全。 DelayQueue的源码解析可以查看DelayQueue源码解析 使用DelayQueue完成延迟需要定义Delayed 实现类来充当任务元素具体使用方法 //DelayQueue的元素必须是Delayed的实现类
class DelayTask implements Delayed {private long time;private Consumer consumer;public DelayTask(long time, Consumer consumer) {this.time time;this.consumer consumer;}Overridepublic long getDelay(TimeUnit unit) {return time - System.currentTimeMillis();}Overridepublic int compareTo(Delayed o) {DelayTask delayTask (DelayTask) o;return (int) (time - delayTask.getTime());}public long getTime() {return time;}public void call() {this.consumer.accept(this);}
}public class QueueTest {public static void main(String[] args) throws InterruptedException {long startTime System.currentTimeMillis();DelayTask d1 new DelayTask(10000 startTime, (o) - System.out.println(3333));DelayTask d2 new DelayTask(1000 startTime, (o) - System.out.println(1111));DelayTask d3 new DelayTask(2000 startTime, (o) - System.out.println(2222));DelayQueueDelayTask delayQueue new DelayQueue();delayQueue.add(d1);delayQueue.add(d2);delayQueue.add(d3);while (!delayQueue.isEmpty()) {//阻塞等待如果有任务到期就取出如果没有任务到期就等待DelayTask delayTask delayQueue.take();delayTask.call();}}
}①getDelay()方法用于从队列中取任务时查看是否到期如果小于等于0则表示可以取出如果大于当前线程需要根据是否是leader判断等待时间。 ②compareTo()方法用于任务入队时判断该任务元素在堆位置时比较的逻辑。 ③Consumer consumer;存储实际执行的任务也可以使用RunnableCallable以及其他自定义类。 note:该方法支持动态添加和删除任务而且线程安全但是只适用于单机环境而且需要自己定义查询逻辑实现稍微复杂。
2.定时任务实现方案
通过线程池对象ScheduledExecutorService也可以实现延迟处理任务的功能而且操作更简单。ScheduledExecutorService是jdk提供的类来完成指定时间或定期执行某些任务。代码如下 class Task implements Callable {private int idx;public Task(Integer idx) {this.idx idx;}Overridepublic Object call() throws Exception {System.out.println(--- this.idx);return null;}
}public class DelayedTest {public static void main(String[] args) {ScheduledExecutorService scheduledExecutorService new ScheduledThreadPoolExecutor(10);scheduledExecutorService.schedule(new Task(3), 1, TimeUnit.SECONDS);scheduledExecutorService.schedule(new Task(2), 2, TimeUnit.SECONDS);scheduledExecutorService.schedule(new Task(1), 1, TimeUnit.SECONDS);}
}ScheduledExecutorService继承了ExecutorService与ExecutorService的逻辑大致相同 ①schedule()方法是ScheduledExecutorService特有的方法这个方法会将我们定义的Task封装成ScheduledFutureTask ②然后生成Worker线程内部存在一个Thread是真正的执行类 ③Worker线程在执行的时候会先判断当前firstTask就是要执行的Runnable属性是否为空如果有就先执行firstTask执行完成firstTask之后然后再从workQueue中取任务红字也是ExecutorService 的执行逻辑 但是ScheduledExecutorService的 schedule()会先生成null 的firstTaskWorker会直接从workQueue中阻塞的获取任务
④workQueue是在我们new对象的时候生成的DelayedWorkQueue它的逻辑定义和DelayedQueue基本相同下面是DelayedWorkQueuetake()方法的代码逻辑 public ScheduledThreadPoolExecutor(int corePoolSize) {super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue());
}scheduleAtFixedRate()是在Worker执行完task后计算任务下次的执行时间并重新将任务放入workQueue中来实现循环执行。下面的代码就是ScheduledFutureTask再执行过程中判断逻辑如果periodic是true则执行run方法。如果period是false则执行计算下次执行时间和重新放入任务的逻辑。 定时器ScheduledExecutorService原理分析
noteScheduledExecutorService是jdk提供的非常方便的延迟消息处理类支持多线程处理消息前一个任务的阻塞并不会影响下一个任务的运行。内部使用的是类似DelayQueue的逻辑而且不需要再实现轮询过程。但是它和DelayQueue一样只能单机使用。
3.Redis实现方案
通过Redis的Zset结构也可以实现延迟队列的功能。通过将过期时间的时间戳作为score存入Zset中然后调用zrangebyscore key 0 当前时间命令定时扫描Zset数据如果返回的结果那一定是已经过期的数据然后再执行删除命令删除指定的key-value。为了防止多线程执行过程中可能存在的问题需要配置lua脚本使用。 //lua脚本定义查询zset数据和删除数据的原子操作
//定义查询的最大值和最小值
local minscore ARGV[1]
local maxscore ARGV[2]
local key KEYS[1]local tables redis.call(zrangebyscore, key, minscore, maxscore)for i, value in ipairs(tables) doredis.call(zrem, key, value)
endreturn tables//java代码定义轮询线程Testpublic void test() throws InterruptedException {String script lua脚本;String key test;long time System.currentTimeMillis();redisTemplate.opsForZSet().add(key, 123, time 1000 * 3);Thread scanExpireThread new Thread(() - {System.out.println(开始扫描过期数据...);while (true) {try {long currentTime System.currentTimeMillis();long count redisTemplate.opsForZSet().count(key, 0, currentTime);if (count 0) {// 查询最小等待时间并睡眠减少cpu空转sleep(key);}System.out.println(获取数据...);RedisScript redisScript new DefaultRedisScript(script, List.class);ListString expireList (List) redisTemplate.execute(redisScript, Arrays.asList(key), 0, System.currentTimeMillis());System.out.println(expireList);if (!CollectionUtils.isEmpty(expireList)) {String msg RandomStringUtils.random(3, 1234567890);Integer delayedTime RandomUtils.nextInt(0, 10);System.out.println(随机生成延迟信息 msg 延迟时间 delayedTime);redisTemplate.opsForZSet().add(key, msg, System.currentTimeMillis() 1000 * delayedTime);} else {// 查询最小等待时间并睡眠减少cpu空转sleep(key);}TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}}});scanExpireThread.start();TimeUnit.MINUTES.sleep(10);}public void sleep(String key) throws InterruptedException {SetDefaultTypedTuple objs redisTemplate.opsForZSet().rangeWithScores(key, 0, 0);for (DefaultTypedTupleString typedTuple : objs) {Long minTime typedTuple.getScore().longValue();long diffTime minTime - System.currentTimeMillis();TimeUnit.MINUTES.sleep(diffTime / (1000 * 60));}}以上的示例代码只是延迟队列的简单时间。并没有考虑任务失败重试的问题。而且上面的方案还可以优化比如当获取的元素的集合是空的时候可以使用LockSupport.park()阻塞线程。只有有延迟任务被推送到redis中时才重新唤醒轮询线程避免轮询线程空转。
noteredis实现版本中并发性高。需要自己定义轮询线程。在消息量较少的时候会浪费资源在消息量非常多的时候又会出现因为轮询间隔设置不合理导致延时时间不准确的问题。
######4.Rabbitmq/Rocketmq实现 很多MQ消息中间件自带延迟消息功能如果系统本身RocketMQ组件则可以使用MQ来完成。不仅使用方便而且可能存在的诸多细节问题。 ①RocketMQ RocketMQ本身支持延迟消息功能但是RocketMQ4.x只支持固定级别的延迟消息并没有自定义延迟时间的功能。如果想实现自定义延迟消息的功能可以使用Rocket5.x或者RabbitMQ 实现原理RocketMQ实现延迟消息的过程是先将消息写入到SCHEDULE_TOPIC_XXXX的topic中然后根据 level 存入特定的queue每个queue都有一个调度线程消费消息如果发现消息到期就会将消息投递到指定的topic中。 以下是RocketMQ5.x文档中的示例程序 //生产者 延时消息发送
MessageBuilder messageBuilder new MessageBuilderImpl();;
//以下示例表示延迟时间为10分钟之后的Unix时间戳。
Long deliverTimeStamp System.currentTimeMillis() 10L * 60 * 1000;
Message message messageBuilder.setTopic(topic)//设置消息索引键可根据关键字精确查找某条消息。.setKeys(messageKey)//设置消息Tag用于消费端根据指定Tag过滤消息。.setTag(messageTag).setDeliveryTimestamp(deliverTimeStamp)//消息体.setBody(messageBody.getBytes()).build();
try {//发送消息需要关注发送结果并捕获失败等异常。SendReceipt sendReceipt producer.send(message);System.out.println(sendReceipt.getMessageId());
} catch (ClientException e) {e.printStackTrace();
}//消费示例一使用PushConsumer消费定时消息只需要在消费监听器处理即可。
MessageListener messageListener new MessageListener() {Overridepublic ConsumeResult consume(MessageView messageView) {System.out.println(messageView.getDeliveryTimestamp());//根据消费结果返回状态。return ConsumeResult.SUCCESS;}
};noteRocketMQ的实现版本性能较好可靠性较高但是不支持动态添加或删除队列。
RabbitMQ RabbitMQ也可以根据自身的特性实现延迟消息的功能。比如利用RabbitMQ的TTL和DLX特性 TTL是指存活时间可以作用在消息中也可以作用在队列中 DLX是指死信队列(是指消息被拒绝或者消息过期后存放的) 使用TTL与DLX存在的问题 1TTL作用在队列中需要为每一个延迟时间定义一种队列灵活性太差。 2TTL作用在消息上消息是在即将投递到消费者之前判定是否过期的所以如果前一个消息阻塞了太长将导致后面的消息不能即时的被执行。 而且使用上面的方式需要定义普通交换机和死信交换机所以一般使用延迟消息插件 rabbitmq-delayed-message-exchange来完成。使用插件生成的消息不会立即进入对应队列而是先将消息保存至 Mnesia (RabbitMQ中的一种数据存储形式) 然后插件会尝试确认是否过期再投递到对应绑定的队列之中 下载地址 rabbitmq-delayed-message-exchange 插件使用步骤
下载延迟插件然后解压放置到 RabbitMQ 的插件目录。注意一定要解压并且把版本名字取消 如果是使用docker安装的rabbitmq可以使用docker cp rabbitmq_delayed_message_exchange containerId:/RabbitMQ_HOME/plugins/拷贝到RabbitMQ容器中。进入 RabbitMQ 的安装目录下的 plgins 目录执行下面命令让该插件生效rabbitmq-plugins enable rabbitmq_delayed_message_exchange 可以使用rabbitmq-plugins list查看生效的插件插件前面带上*号的才是真正生效的插件 最后重启 RabbitMQ就可以在管理界面看到Type是x-delayed-messageexchange 更加详细的安装步骤可以查看链接 RabbitMQ 学习笔记 – 13 使用插件方式实现延迟队列
接下来就可以测试RabbitMQ的延迟消息功能了以下是示例代码 Rabbitmq配置 Configuration
public class RabbitMqConfig {//定义队列交换机队列路由public static final String DELAYED_QUEUE delayed_queue;public static final String DELAYED_EXCHANGE delayed_exchange;public static final String DELAYED_ROUTINGKEY delayed_test;Bean(DELAYED_EXCHANGE)public Exchange DELAYED_EXCHANGE() {HashMapString, Object map new HashMap(1);map.put(x-delayed-type, direct);return new CustomExchange(DELAYED_EXCHANGE, x-delayed-message, true, false, map);}Bean(DELAYED_QUEUE)public Queue DELAYED_QUEUE() {return new Queue(DELAYED_QUEUE);}//队列绑定交换机Beanpublic Binding BINDING_DELAYED_QUEUE(Qualifier(DELAYED_QUEUE) Queue queue,Qualifier(DELAYED_EXCHANGE) Exchange exchange) {return BindingBuilder.bind(queue).to(exchange).with(DELAYED_ROUTINGKEY).noargs();}
}延迟消息生产者 RestController
public class TestController {Autowiredprivate RabbitTemplate rabbitTemplate;RequestMapping(/test)public String test(RequestParam(required false) Integer delay) {rabbitTemplate.convertAndSend(RabbitMqConfig.DELAYED_EXCHANGE, RabbitMqConfig.DELAYED_ROUTINGKEY,hello, msg - {//设置消息的延迟时间msg.getMessageProperties().setDelay(delay * 1000);//设置优先级msg.getMessageProperties().setPriority(9);//设置消息的持久化方式msg.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);//设置唯一标识msg.getMessageProperties().setMessageId(UUID.randomUUID().toString());return msg;});return success;}
}延迟消息消费者 Component
public class RabbitmqHandler {//监听delayed_test队列RabbitListener(queues {RabbitMqConfig.DELAYED_QUEUE})public void receive_delayed_test(Message message, Channel channel) {System.out.println(----delayed_test----);System.out.println(properties: message.getMessageProperties().toString());System.out.println(body: new String(message.getBody()));System.out.println();}
}noteRabbitMQ支持集群分布式高并发场景性能较好可靠性高不需要自己处理轮询线程。
参照 延迟队列解决方案 有赞延迟队列设计 盘点JAVA中延时任务的几种实现方式 RabbitMQ之延迟队列