南昌网站设计怎么选,重庆建设工程造价管理协会,高校英文网站建设,沧州哪家做网站好您好#xff0c;我是码农飞哥#xff08;wei158556#xff09;#xff0c;感谢您阅读本文#xff0c;欢迎一键三连哦。 #x1f4aa;#x1f3fb; 1. Python基础专栏#xff0c;基础知识一网打尽#xff0c;9.9元买不了吃亏#xff0c;买不了上当。 Python从入门到精… 您好我是码农飞哥wei158556感谢您阅读本文欢迎一键三连哦。 1. Python基础专栏基础知识一网打尽9.9元买不了吃亏买不了上当。 Python从入门到精通 2. 毕业设计专栏毕业季咱们不慌忙几百款毕业设计等你选。 ❤️ 3. Python爬虫专栏系统性的学习爬虫的知识点。9.9元买不了吃亏买不了上当 。python爬虫入门进阶 ❤️ 4. Ceph实战从原理到实战应有尽有。 Ceph实战 ❤️ 5. Java高并发编程入门打卡学习Java高并发。 Java高并发编程入门 文章目录 1. 消息堆积2. 消息堆积出现的原因3. 如何解决消息堆积 1. 消息堆积
消息堆积顾名思义就是消息队列中堆积了大量未被处理的消息主要发生在高并发的场景下生产者发送消息的速率远大于消费者组消息的速度。在物联网的AIOT场景中比较常见。
在RocketMQ的Console上可以查看某个Topic上消息堆积的情况。 这里有个延迟就表示目前堆积的消息数。
2. 消息堆积出现的原因
消息堆积的本质原因还是消费者消费消息的速度赶不上生产者发送消息的速度。可能的情况有 第一种情况 新上线的消费者的消费逻辑存在Bug导致消息不能被正常消费。这种场景主要存在于代码逻辑不严谨导致某些消息消费失败或者消费超时从而导致消息被大量堆积。 第二种情况消费者实例宕机或者由于网络的原因不能连上Broker集群。这种情况主要是消费者实例可能是单节点或者机房网络不好的情况。 第三种情况生产者短时间内大量发送消息到Broker端消费者的消费能力不足。消费者消费消息往往是一些比较耗时的IO操作比如操作数据库调用其他服务。这导致消费者的消费速率远低于生产者发送速率。这种情况也是消息堆积的常见场景。
3. 如何解决消息堆积 解决第一种情况对需要上线的消费者进行严格的测试确保每种消息的场景都能覆盖到。另外在上线的时候采用灰度发布先灰度小范围的用户进行使用确认没有问题了在全量放开所有用户使用。 解决第二种情况在上线消费者实例时需要采用多实例异地多活的方式确保极端的情况下都能有消费者能够正常消费消息。 解决第三种情况这种情况的解决本质上是如何提高消费者的消费速率。主要可以从如下方面解决 同一个消费者组下增加消费者实例。比如Topic中有8个队列那么可以将消费者数量最多增加到8个。那么有同学会问为啥只增加到8个我增加到9个乃至10个行不行答案是你可以增加10个消费者但是多余的2个消费者是分不到Queue的。这是因为 在RocketMQ中某个topic下的某个队列只能被同一消费者组中的某个消费者消费。 如果消费者数量少于Queue的数量那么有可能会出现消费不均的情况。 提高单个消费者的消费并行线程。RocketMQ 支持批量消费消息可以通过修改DefaultMQPushConsumer 消费者类的consumeThreadMin最少消费线程数以及consumeThreadMax最大消费线程数来提高单个消费者的消费能力。 批量消费消息 某些业务流程如果支持批量方式消费则可以很大程度上提高消费吞吐量例如订单扣款类应用一次处理一个订单耗时 1 s一次处理 10 个订单可能也只耗时 2 s这样即可大幅度提高消费的吞吐量。建议使用5.x SDK的SimpleConsumer每次接口调用设置批次大小一次性拉取消费多条消息。 下面就让我们来看个例子 生产者使用的是DefaultMQProducer //4.创建消息StopWatch stopWatch new StopWatch();stopWatch.start();for (int i 0; i 20000; i) {// 创建消息指定topic,以及消息体Message message new Message(heap_topic, (消息堆积测试 i).getBytes());//5.发送消息SendResult send defaultMQProducer.send(message);System.out.println(send);}stopWatch.stop();System.out.println(生产者发送2万条消息用时stopWatch.getTotalTimeSeconds()秒);消费者使用的是DefaultMQPushConsumer // 4.创建一个回调函数consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) - {System.out.println(本批次收到的消息数msgs.size());// 5.处理消息for (MessageExt msg : msgs) {try {Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println(当前时间System.currentTimeMillis() 收到的消息内容 new String(msg.getBody()));}// 返回消费成功的对象return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});生产者329秒内发送了2万条消息平均60条 而消费者消费一条消息需要一秒所以生产者发送完消息之后两个消费者还在消费。
这里消费者使用的是DefaultMQPushConsumer消费者 每批次Broker端会向消费者推送32条消息通过pullBatchSize字段设置而消费者每次消费1条消息通过consumeMessageBatchMaxSize字段设置。 当然官方推荐使用SimpleConsumer进行批量消费消息。 //每批次拉取16条消息int maxMessageNum 16;// Set message invisible duration after it is received.Duration invisibleDuration Duration.ofSeconds(15);// Receive message, multi-threading is more recommended.do {final ListMessageView messages consumer.receive(maxMessageNum, invisibleDuration);log.info(Received {} message(s), messages.size());for (MessageView message : messages) {final MessageId messageId message.getMessageId();try {consumer.ack(message);log.info(Message is acknowledged successfully, messageId{}, messageId);} catch (Throwable t) {log.error(Message is failed to be acknowledged, messageId{}, messageId, t);}}} while (true);官方的代码示例