seo综合检测,抚州网站seo,宁夏建设职业技术学院官方网站,seo在哪可以学目录 一、简介1.1、特点 二、Maven依赖三、application配置四、批量发送4.1、同步消息4.2、异步消息4.3、顺序消息4.4、关于异步批量发送4.5、结论 五、其他 一、简介 在之前的文章中#xff0c;我讲过了#xff0c;同步发送单条消息#xff0c;异步发送单条消息#xff0c… 目录 一、简介1.1、特点 二、Maven依赖三、application配置四、批量发送4.1、同步消息4.2、异步消息4.3、顺序消息4.4、关于异步批量发送4.5、结论 五、其他 一、简介 在之前的文章中我讲过了同步发送单条消息异步发送单条消息发送单向消息发送顺序消息今天我们讲讲如何批量发送消息主要还是使用方法RocketMQTemplate的syncSend方法。
1.1、特点 批量发送和单条发送消息的主要区别有以下几点
网络开销 发送单条消息时每个消息都需要单独建立网络连接、发送数据包、等待响应等网络开销较大。批量发送可以将多条消息打包在一起发送减少网络连接建立的次数降低网络开销吞吐量 由于批量发送减少了网络开销所以可以在单位时间内发送更多的消息提高了吞吐量。在高并发高流量场景下批量发送能够发挥更好的性能消息顺序 单条发送消息的顺序是有序的后发送的在队列中排在前发送的后面。而对于批量发送一个批次内的消息顺序是固定的但不同批次之间的消息顺序是无序的会按照到达顺序存储在队列中。如果需要严格消息顺序单条发送更合适消息重试 如果批量发送的一个批次中有部分消息发送失败需重发整个批次没有选择重发其中部分消息的功能涉及幂等性问题。单条发送失败时只需重发该单条消息编程复杂度 批量发送需要构造MessageBatch或Message列表对象编程略微复杂些。单条发送只需构造单个Message对象
二、Maven依赖
pom.xml
?xml version1.0 encodingUTF-8?
project xmlnshttp://maven.apache.org/POM/4.0.0xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsdparentartifactIdrocketmq/artifactIdgroupIdcom.alian/groupIdversion1.0.0-SNAPSHOT/version/parentmodelVersion4.0.0/modelVersionartifactId06-send-batched-message/artifactIdpropertiesmaven.compiler.source8/maven.compiler.sourcemaven.compiler.target8/maven.compiler.target/propertiesdependenciesdependencygroupIdcom.alian/groupIdartifactIdcommon-rocketmq-dto/artifactIdversion1.0.0-SNAPSHOT/version/dependency/dependencies/project父工程已经在我上一篇文章里通用公共包也在我上一篇文章里有说明包括消费者。具体参考RocketMQ笔记一SpringBoot整合RocketMQ发送同步消息
三、application配置
application.properties
server.port8005# rocketmq地址
rocketmq.name-server192.168.0.234:9876
# 默认的生产者组
rocketmq.producer.groupbatched_group
# 发送同步消息超时时间
rocketmq.producer.send-message-timeout3000
# 用于设置在消息发送失败后生产者是否尝试切换到下一个服务器。设置为 true 表示启用在发送失败时尝试切换到下一个服务器
rocketmq.producer.retry-next-servertrue
# 用于指定消息发送失败时的重试次数
rocketmq.producer.retry-times-when-send-failed3
# 设置消息压缩的阈值,为0表示禁用消息体的压缩
rocketmq.producer.compress-message-body-threshold0四、批量发送 在 RocketMQ 中RocketMQTemplate的syncSend方法它允许你批量发送同步消息主要参数
topic普通消息都发送到topicstring_message_topicCollectionT消息集合
测试类都引入依赖 Autowiredprivate RocketMQTemplate rocketMQTemplate;4.1、同步消息 Testpublic void syncSendBatchStringMessagesWithBuilder() {String topic string_message_topic;String message 超级喜欢Golang语言;ListMessageString messageList new ArrayList();for (int i 0; i 10; i) {MessageString rocketMessage MessageBuilder.withPayload(message i)// 设置消息类型.setHeader(MessageHeaders.CONTENT_TYPE, text/plain).build();// 加入到列表messageList.add(rocketMessage);}// 使用syncSend发送批量消息SendResult sendResult rocketMQTemplate.syncSend(topic, messageList);log.info(同步批量发送普通消息结果{},sendResult);}运行结果
[_GROUP_STRING_1] c.a.concurrent.StringMessageConsumer : 字符串消费者接收到的消息: 同步批量发送普通消息0
[_GROUP_STRING_2] c.a.concurrent.StringMessageConsumer : 字符串消费者接收到的消息: 同步批量发送普通消息1
[_GROUP_STRING_4] c.a.concurrent.StringMessageConsumer : 字符串消费者接收到的消息: 同步批量发送普通消息3
[_GROUP_STRING_5] c.a.concurrent.StringMessageConsumer : 字符串消费者接收到的消息: 同步批量发送普通消息4
[_GROUP_STRING_3] c.a.concurrent.StringMessageConsumer : 字符串消费者接收到的消息: 同步批量发送普通消息2
[_GROUP_STRING_6] c.a.concurrent.StringMessageConsumer : 字符串消费者接收到的消息: 同步批量发送普通消息5
[_GROUP_STRING_7] c.a.concurrent.StringMessageConsumer : 字符串消费者接收到的消息: 同步批量发送普通消息6
[_GROUP_STRING_8] c.a.concurrent.StringMessageConsumer : 字符串消费者接收到的消息: 同步批量发送普通消息7
[_GROUP_STRING_9] c.a.concurrent.StringMessageConsumer : 字符串消费者接收到的消息: 同步批量发送普通消息8
[GROUP_STRING_10] c.a.concurrent.StringMessageConsumer : 字符串消费者接收到的消息: 同步批量发送普通消息94.2、异步消息 Testpublic void asyncSendBatchStringMessageWithBuilder() {String topic string_message_topic;String message Alian超级喜欢Golang语言;ListMessageString messageList new ArrayList();for (int i 0; i 10; i) {MessageString rocketMessage MessageBuilder.withPayload(message i)// 设置消息类型.setHeader(MessageHeaders.CONTENT_TYPE, text/plain).build();// 加入到列表messageList.add(rocketMessage);}// 使用asyncSend发送批量消息rocketMQTemplate.asyncSend(topic, messageList, new SendCallback() {Overridepublic void onSuccess(SendResult sendResult) {// 异步发送成功的回调逻辑log.info(异步批量发送普通消息成功: sendResult);}Overridepublic void onException(Throwable e) {// 异步发送失败的回调逻辑log.info(异步批量发送普通消息失败: e.getMessage());}});}运行结果
[_GROUP_STRING_1] c.a.concurrent.StringMessageConsumer : 字符串消费者接收到的消息: 异步批量发送普通消息0
[_GROUP_STRING_8] c.a.concurrent.StringMessageConsumer : 字符串消费者接收到的消息: 异步批量发送普通消息1
[_GROUP_STRING_3] c.a.concurrent.StringMessageConsumer : 字符串消费者接收到的消息: 异步批量发送普通消息7
[_GROUP_STRING_6] c.a.concurrent.StringMessageConsumer : 字符串消费者接收到的消息: 异步批量发送普通消息4
[_GROUP_STRING_9] c.a.concurrent.StringMessageConsumer : 字符串消费者接收到的消息: 异步批量发送普通消息2
[_GROUP_STRING_5] c.a.concurrent.StringMessageConsumer : 字符串消费者接收到的消息: 异步批量发送普通消息6
[GROUP_STRING_10] c.a.concurrent.StringMessageConsumer : 字符串消费者接收到的消息: 异步批量发送普通消息3
[_GROUP_STRING_2] c.a.concurrent.StringMessageConsumer : 字符串消费者接收到的消息: 异步批量发送普通消息8
[_GROUP_STRING_4] c.a.concurrent.StringMessageConsumer : 字符串消费者接收到的消息: 异步批量发送普通消息9
[_GROUP_STRING_7] c.a.concurrent.StringMessageConsumer : 字符串消费者接收到的消息: 异步批量发送普通消息54.3、顺序消息 在 RocketMQ 中RocketMQTemplate的syncSendOrderly方法它允许你批量发送同步消息主要参数
topic和之前有区别普通消息都发送到topicordered_string_message_topicCollectionT消息集合hashKey通过hashKey发送到同一个队列 Testpublic void syncSendBatchOrderlyStringMessagesWithBuilder() {String topic ordered_string_message_topic;String message 同步批量发送顺序消息超级喜欢Go语言;ListMessageString messageList new ArrayList();for (int i 0; i 10; i) {MessageString rocketMessage MessageBuilder.withPayload(message i)// 设置消息类型.setHeader(MessageHeaders.CONTENT_TYPE, text/plain).build();// 加入到列表messageList.add(rocketMessage);}// 使用syncSendOrderly发送批量顺序消息消费者线程设置为1SendResult sendResult rocketMQTemplate.syncSendOrderly(topic, messageList, alian_sync_ordered);log.info(批量发送顺序消息发送结果{},sendResult);}运行结果
[GROUP_STRING_10] com.alian.ordered.StringMessageConsumer : 字符串消费者接收到的消息: 同步批量发送顺序消息超级喜欢Go语言0
[GROUP_STRING_10] com.alian.ordered.StringMessageConsumer : 字符串消费者接收到的消息: 同步批量发送顺序消息超级喜欢Go语言1
[GROUP_STRING_10] com.alian.ordered.StringMessageConsumer : 字符串消费者接收到的消息: 同步批量发送顺序消息超级喜欢Go语言2
[GROUP_STRING_10] com.alian.ordered.StringMessageConsumer : 字符串消费者接收到的消息: 同步批量发送顺序消息超级喜欢Go语言3
[GROUP_STRING_10] com.alian.ordered.StringMessageConsumer : 字符串消费者接收到的消息: 同步批量发送顺序消息超级喜欢Go语言4
[GROUP_STRING_10] com.alian.ordered.StringMessageConsumer : 字符串消费者接收到的消息: 同步批量发送顺序消息超级喜欢Go语言5
[GROUP_STRING_10] com.alian.ordered.StringMessageConsumer : 字符串消费者接收到的消息: 同步批量发送顺序消息超级喜欢Go语言6
[GROUP_STRING_10] com.alian.ordered.StringMessageConsumer : 字符串消费者接收到的消息: 同步批量发送顺序消息超级喜欢Go语言7
[GROUP_STRING_10] com.alian.ordered.StringMessageConsumer : 字符串消费者接收到的消息: 同步批量发送顺序消息超级喜欢Go语言8
[GROUP_STRING_10] com.alian.ordered.StringMessageConsumer : 字符串消费者接收到的消息: 同步批量发送顺序消息超级喜欢Go语言9所以我之前说批量发送消息的topic不一样因为
Slf4j
Service
RocketMQMessageListener(topic ordered_string_message_topic, consumerGroup ORDERED_GROUP_STRING, consumeMode ConsumeMode.ORDERLY)
public class StringMessageConsumer implements RocketMQListenerString {Overridepublic void onMessage(String message) {log.info(字符串消费者接收到的消息: {}, message);// 处理消息的业务逻辑}
}顺序消息要顺序消费也就是每次是一个线程去消费相当于单线程也就有序了。关键就是配置了consumeMode ConsumeMode.ORDERLY 当然我们也可以把消费者线程数设置为 consumeThreadNumber 1也就是单线程消费了从而确保了消息的顺序消费指单实例
RocketMQMessageListener(topic ordered_string_message_topic, consumerGroup CONCURRENT_GROUP_STRING,consumeThreadNumber 1)
public class StringMessageConsumer implements RocketMQListenerString {Overridepublic void onMessage(String message) {log.info(字符串消费者接收到的消息: {}, message);// 处理消息的业务逻辑}
}4.4、关于异步批量发送 有可能你会写下面的异步批量发送顺序消息 Testpublic void asyncSendBatchOrderlyStringMessageWithBuilder2() {String topic ordered_string_message_topic;String message Alian超级喜欢Golang语言;ListString messageList new ArrayList();for (int i 0; i 10; i) {// 加入到列表messageList.add(message i);}// 使用 asyncSendOrderly 发送批量消息rocketMQTemplate.asyncSendOrderly(topic, messageList, alian_async_ordered, new SendCallback() {Overridepublic void onSuccess(SendResult sendResult) {// 异步发送成功的回调逻辑log.info(异步消息发送字符串消息成功: sendResult);}Overridepublic void onException(Throwable e) {// 异步发送失败的回调逻辑log.info(异步消息发送字符串消息失败: e.getMessage());}});}其实这个是不对的最终的结果是一个把你这里的messageList当做了一个消息列表接收了如下结果
[GROUP_STRING_18] com.alian.ordered.StringMessageConsumer : 字符串消费者接收到的消息: [Alian超级喜欢Golang语言0,Alian超级喜欢Golang语言1,Alian超级喜欢Golang语言2,Alian超级喜欢Golang语言3,Alian超级喜欢Golang语言4,Alian超级喜欢Golang语言5,Alian超级喜欢Golang语言6,Alian超级喜欢Golang语言7,Alian超级喜欢Golang语言8,Alian超级喜欢Golang语言9]RocketMQ对于单条消息和批量消息在队列中是如何被处理的 对于单条发送的消息RocketMQ会按照队列中的顺序将每条消息分发给一个消费者线程。因此即使有多个消费者线程由于每条消息都被单独处理消费的顺序仍然会与发送的顺序一致。 对于批量发送的消息情况就有所不同。批量消息是作为一个整体发送的因此在队列中它们被视为一个单独的实体。当RocketMQ从队列中取出批量消息时它会将整个批量消息作为一个整体分发给一个消费者线程。如果有多个消费者线程由于操作系统的线程调度策略处理批量消息的线程可能会在处理消息的过程中被调度出去从而允许其他线程处理后面的消息。这样就可能导致消费的顺序与发送的顺序不一致。
4.5、结论
为此我测试了多次得到结论
单条发送消息到同一个队列使用多个消费线程消费该队列由于消息本身是有序的所以消费顺序也是有序的单批次批量发送消息到同一个队列使用单个消费线程消费该队列由于消费线程是单一的所以消费顺序也是有序的单批次批量发送消息到同一个队列使用多个消费线程消费时消费顺序就不是有序的了
五、其他 既然知道批量消息是作为一个整体的那么肯定就会对消息大小有限制在 Apache RocketMQ 中批量消息的大小默认限制是4MB。这意味着你不能发送总大小超过4MB的批量消息。
如果你想修改这个限制你需要修改RocketMQ的配置。具体的修改方法如下
找到RocketMQ的配置文件broker.conf这个文件通常位于RocketMQ安装目录的conf目录下。在broker.conf文件中找到maxMessageSize这个配置项。这个配置项决定了批量消息的最大大小。修改maxMessageSize的值为你想要的大小。注意这个值是以字节为单位的所以如果你想设置批量消息的最大大小为8MB你应该设置maxMessageSize8388608。保存并关闭broker.conf文件。重启RocketMQ的Broker服务以使新的配置生效。 虽然你可以通过修改配置来增加批量消息的最大大小但是你应该谨慎地考虑这个决定。增加批量消息的最大大小可能会增加Broker的内存使用量并可能影响到消息的发送和接收性能。因此在修改这个配置之前你应该先考虑你的应用的需求和Broker的性能。 因为优先的是RocketMQMessageListener 注解中设置 consumerGroup 和messageModel 参数。