中山网站建设品牌,桂林北站到桂林站多远,做网站做什么好,功能网站开发多少钱目录
本地启动测试可视化
核心概念
集群
主从 集群
Dledger 集群
总结
客户端消息确认机制
广播模式
消息过滤机制
顺序消息机制
延迟消息与批量消息
事务消息机制
ACL权限控制体系
RocketMQ客户端注意事项
消息的 ID、Key、Tag
最佳实践
消费者端…目录
本地启动测试可视化
核心概念
集群
主从 集群
Dledger 集群
总结
客户端消息确认机制
广播模式
消息过滤机制
顺序消息机制
延迟消息与批量消息
事务消息机制
ACL权限控制体系
RocketMQ客户端注意事项
消息的 ID、Key、Tag
最佳实践
消费者端进行幂等控制
关注错误消息重试
手动处理死信队列
MQ如何保证消息不丢失
1、哪些环节可能丢消息
2、生产者发送消息如何保证不丢
3、Borker写入数据如何保证不丢失
4、Broker主从同步如何保证不丢失
5、消费者消费消息如何不丢失
6、如果MQ服务全部挂了如何保证不丢失
7、MQ消息零丢失方案总结
MQ如何保证消息的顺序性
MQ如何保证消息幂等性
MQ如何快速处理积压的消息 本地启动测试可视化
官网文档及下载地址https://rocketmq.apache.org/zh/docs/
RocketMQ 也是Java开发的程序所有需要有JDK环境
1、可以在 bin/runserver.sh 文件中修改初始堆内存、最大堆内存、年轻代初始内存大小默认都是2g太大了我的电脑吃不消调小一点。注意jdk8及之前的是在图片中的 if 修改后面的版本要在 else 中修改 2、在bin/runbroker.sh中修改 3、配置RocketMQ环境变量 # RocketMQ 相关环境变量配置,:$PATH注意这里要追加到现有的 PATH 而不是覆盖否则所有基础命令都会失效 export ROCKETMQ_HOME/home/app/rocketmq/rocketmq-5.3.1-bin export PATH$ROCKETMQ_HOME/bin:$PATH export NAMESRV_ADDRlocalhost:9876 刷新 source /etc/profile 4、启动NameServer进入RocketMQ的bin目录运行 nohup bin/mqnamesrv cat nohup.out 查看输出日志 出现The Name Server boot success. serializeTypeJSON, address 0.0.0.0:9876 表示成功 5、启动Broker 在/conf/broker.conf后面添加 brokerIP1 服务器ip autoCreateTopicEnabletrue namesrvAddlocalhost:9876 #NameServer的地址 启动命令nohup mqbroker -c ../conf/broker.conf 如果报错Unrecognized VM option UseBiasedLocking Error: Could not create the Java Virtual Machine. Error: A fatal exception has occurred. Program will exit. 这个是jdk版本已经不再支持UseBiasedLocking虚拟机只需要在runbroker 将包含UseBiasedLocking这个一行注释掉即可 查看命令cat nohup.out 出现The broker[iZ7xvhygbzymeb78axyu9pZ, 172.18.226.151:10911] boot success. serializeTypeJSON and name server is localhost:9876 表示成功 6、发送测试消息 sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer 7、消费测试消息 sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer 8、RocketMQ0-Dashboard 可视化页面 docker run -d --name rocketmq-dashboard \ --network docker-net \ -p 8080:8080 \ --restartalways \ -e JAVA_OPTS-Drocketmq.namesrv.addrrmqnamesrv_dj:9876 \ apacherocketmq/rocketmq-dashboard 然后将在容器中的jar拷贝出来 docker cp abc123:/rocketmq-dashboard.jar /home/app/rocketmq 启动 nohup java --add-opens java.base/java.nioALL-UNNAMED \ --add-opens java.base/sun.nio.chALL-UNNAMED \ -Drocketmq.namesrv.addrlocalhost:9876 \ -jar rocketmq-dashboard.jar rocketmq-dashboard.log 21 访问ip:8080 即可 核心概念
Topic主题是消息的一级分类单元生产者将消息发送到特定的主题而消费者订阅该主题以接收消息。一个Topic可以有多个MessageQueue。 MessageQueue消息队列每个Topic可以包含一个或多个消息队列。RocketMQ通过在不同的消息队列之间分配消息来实现负载均衡和高吞吐量。MessageQueue专属于某个Topic一个MessageQueue仅存储其所属Topic的消息不会跨Topic存储消息 Consumer Group消费者组一组逻辑上相同的消费者构成一个消费者组。同一个消费者组内的所有消费者共同消费来自某个Topic的所有消息但每条消息只会被该组中的一个消费者处理。 Consumer Instance消费者实例这是实际运行的消费者程序的一个实例代码中 new 出来就算一个实例。一个消费者组可以包含多个消费者实例。 关系与工作机制 消息队列与消费者的对应关系 在RocketMQ中对于任何一个给定的消费者组它所订阅的Topic下的每个MessageQueue最多只能由该组内的一个Consumer实例进行消费。这种一对一的关系确保了每条消息只会在消费者组内被处理一次从而避免重复消费的问题。 负载均衡机制 当有多个消费者实例存在于同一消费者组时RocketMQ会自动在这几个消费者实例之间平均分配Topic下的所有MessageQueue。 如果增加的消费者实例数量超过了Topic下MessageQueue的数量则多余的消费者实例将处于空闲状态因为没有额外的MessageQueue供它们消费。 实际应用场景示例 假设有一个名为OrderTopic的主题它包含4个MessageQueue并且有两个消费者实例C1和C2属于同一个消费者组G1
RocketMQ会将这4个MessageQueue平均分配给C1和C2例如C1负责消费两个MessageQueueC2也负责另外两个MessageQueue。 如果再启动第三个消费者实例C3加入到消费者组G1由于只有4个MessageQueue因此其中一个消费者实例将会闲置不会分配到任何MessageQueue。 反之如果减少至只有一个消费者实例C1那么这个实例将会负责消费所有的4个MessageQueue。 集群
主从 集群 修改配置就可以搭建了对应的集群了slave负责给broker做备份
优点性能快
缺点当broker挂了之后 slave 不会自动升级为broker可用性没有Dledger集群高
如果业务对高可用要求没那么高对性能要求较高可选用该种方式搭建集群。
Dledger 集群 却点性能没有上一种方式的快
优点当brokerleader挂了之后其他的broker会自动选举leader高可用
如果业务对性能要求没那么高对高可用要求较高可选用该种方式搭建集群。 总结
broker要有多数存活才能从新选举为leader例如有5个broker但是只有1个存活这样就不能选举而nameserver只要有一个存活就可以正常工作 1、nameServer 命名服务 nameServer不依赖于任何其他的服务自己独立就能启动。并且不管是broker还是客户端都需要明确指定nameServer的服务地址。以一台电脑为例nameServer可以理解为是整 个RocketMQ的CPU整个RocketMQ集群都要在CPU的协调下才能正常工作。 2、broker 核心服务 broker是RocketMQ中最为娇贵的一个组件。RockeMQ提供了各种各样的重要设计来保护broker的安全。同时broker也是RocketMQ中配置最为繁琐的部分。同样以电脑为例broker就是整个 RocketMQ中的硬盘、显卡这一类的核心硬件。RocketMQ最核心的消息存储、传递、查询等功能都要由broker提供。 3、client 客户端 Client包括消息生产者和消息消费者。同样以电脑为例Client可以认为是RocketMQ中的键盘、鼠标、显示器这类的输入输出设备。鼠标、键盘输入的数据需要传输到硬盘、显卡等硬件才能进行处理。但是键盘、鼠标是不能 直接将数据输入到硬盘、显卡的这就需要CPU进行协调。通过CPU鼠标、键盘就可以将输入的数据最终传输到核心的硬件设备中。经过硬件设备处理完成后再通过CPU协调显示器这样的输出设备就能最终从核心硬件设备中获取到输出的数据。 topic是逻辑结构而真正的物理存储结构是MessageQueue队列 最小位点是当前队列中最开始的消息编号如果删掉了前面的消息那么最小位点就往前移了 最大位点是最小位点 消息数前提是位点连续消息被删除或者由于消费失败而进行重试时可能会导致位点不连续 消息模型图 客户端消息确认机制
消息确认机制生产者向broker发送一个消息broker会个生产者响应一个确认消息
单向发送生产者只负责发送消息不关心是否发送成功也不等待服务器响应。这种方式适用于对消息可靠性要求不高的场景。
同步发送确认 生产者发送消息后同步等待 Broker 的响应。若 Broker 返回 SEND_OK 状态码如 SendStatus.SEND_OK说明消息已持久化到 CommitLog 并被主从同步。若超时未收到响应或返回错误生产者会触发重试默认重试 2 次。
// 示例生产者同步发送消息网页4
SendResult sendResult defaultMQProducer.send(message);
System.out.println(发送结果 sendResult.getSendStatus());
优点消息传递可靠性很强适用于需严格保证消息送达的场景如订单创建。
缺点需要阻塞性的等待broker的响应吞吐量受限频繁的同步等待限制并发性能。
适用场景金融交易、关键业务通知如支付结果 异步发送确认 生产者通过回调函数异步处理 Broker 的响应适用于高吞吐场景。发送线程不阻塞通过 SendCallback 接收成功或失败通知
producer.send(message, new SendCallback() {Overridepublic void onSuccess(SendResult sendResult) { /* 处理成功逻辑 */ }Overridepublic void onException(Throwable e) { /* 处理失败并重试 */ }
});
优点低延迟响应时间短提升系统整体性能消息传递可靠性相对较强
缺点业务层处理可能的重复或丢失消息如幂等设计producer主线程需要一直开启比较消耗服务器资源
适用场景日志采集、实时监控等允许少量数据丢失的高吞吐场景 广播模式
广播模式和集群模式是RocketMQ的消费者端处理消息最基本的两种模式。 集群模式一条消息只会被分配给同一个消费者组中的一个消费者实例进行处理。在此模式下broker会在内部以消费者组的概念来维护一个消费者位。
广播模式每条消息都会被发送到每个订阅了该主题的消费者实例。在此模式下各自的Consumer内部消费者客户端本地电脑offset.json文件中,该文件目前无法在Windows上创建自行维护消费者位点如果该文件丢失会自动创建并从MessageQueue的最后一条开始消费消费失败无法重试 消息过滤机制
1、Tag过滤生产者在发送消息的时候不仅可以指定Topic还可以知道Tag进一步细分消息类型那么在消费者端就可以通过Topic Tag 拿到指定的消息但是这种过滤方式只能简单的进行字符匹配无法进行复杂匹配例如匹配字符数字大于3的所有Tag。
2、SQL过滤RocketMQ还支持基于SQL92表达式的高级消息过滤功能。通过这种方式可以根据消息属性进行更复杂的过滤操作如数值比较、字符串匹配等。使用方法在发送消息时除了基本的消息体外还可以添加一些自定义属性当消息非常多的时候不建议使用自定义属性来过滤。订阅消息时使用SQL表达式来描述过滤条件。例如
Message message new Message(TopicTest, (Hello RocketMQ).getBytes(RemotingHelper.DEFAULT_CHARSET));
message.putUserProperty(a, 3); // 添加自定义属性
SendResult sendResult producer.send(message);
consumer.subscribe(TopicTest, MessageSelector.bySql(a 3 and b abc));
表示只消费那些属性a大于3且属性b等于abc的消息。**但是这种功能需要手动开启在broker.conf文件中添加 enablePropertyFiltertrue 即可。 性能影响虽然SQL92表达式过滤提供了更大的灵活性但它可能会对消息队列的性能产生一定影响因为它需要更多的计算资源来进行条件判断。相比之下Tag过滤由于其实现较为简单直接性能开销较小甚至可以忽略不记。用SQL92表达式过滤时要注意合理设置过滤条件避免过于复杂的查询导致性能下降。 顺序消息机制
保证局部有序不保证全局有序好比每一笔订单中的每一个步骤都是有序的局部有序但是不保证所有的订单都按照创建时间来依次处理全局无序因为全局有序是没有意义的 原理就是将一笔订单中的所有步骤都发送到同一个massegeQueue然后broker按照massegeQueue中的消息顺序依次推送给消费者 就可以消息的顺序了因为在不同的massegeQueue去拉去消息会存在网络因素导致顺序不一致所有所有步骤都发送到同一个massegeQueue就可以保证了。也就是说如果把所有的消息都存储到用一个massegeQueue中就可以保证全局消息的顺序性了但是这样队列就会有性能问题。因此我们在存储消息的时候尽可能大散在不同的队列中。 费者端如果确实处理逻辑中出现问题不建议抛出异常可以返回ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT作为替代用于指示当前消息队列暂停片刻再尝试消费例如一个队列中的步骤1成功步骤2失败那么就会将整个队列暂时阻塞一段时间基于内部实现中的退避算法通常是几秒到几十秒不等后在从步骤2开始重试来保证顺序 延迟消息与批量消息 延迟消息 Message message new Message(TopicTest, TagA, (Hello RocketMQ).getBytes(RemotingHelper.DEFAULT_CHARSET));
// 设置延迟级别为3即表示延迟10秒根据默认的延迟级别表
message.setDelayTimeLevel(3);
producer.send(message); RocketMQ内置有一些延迟时间可以直接使用但是不支持在代码中自定义延迟时间如果想在代码中设置延迟可以考虑spring的Task等内部实现定时然后发送到topic要自定义延迟消息的时间间隔您需要编辑Broker的配置文件broker.conf并调整messageDelayLevel参数。这个参数允许您定义一系列以逗号分隔的延迟级别每个级别表示一个从消息发送到可消费的时间间隔。 messageDelayLevel1s 5s 10s 30s 1m 批量消息
每次发送消息都要进行一次网络的io所有可以批量发送消息减少网络io但是一次发送的消息量不宜过多具体多少需要自行在客户端写逻辑判断。批量消息也有限制如不能做延迟消息要求所有消息都是同一个topic等。 事务消息机制
** 事务控制是在生产者端的 **rockermq的事务和我们常规的事务不一样rockermq的事务是保证某个事件与发送消息组成原子性
事务消息是RocketMQ非常有特色的一个高级功能。他的基础诉求是通过RocketMQ的事务机制来保证上下游的数据一致性。以电商为例用户支付订单这一核心操作的同时会涉及到下游物流发货、积分变更、购物车状态清空等多个子系统的变更。这种场景非常适合使用RocketMQ的解耦功能来进行串联。 1.发送方向 MQ 服务端发送事务消息就和正常的消息一样只不过消费者不可见实现原理是将消息存储到系统的另一个特殊的topic中 2.MQ Server 将消息持久化成功之后向发送方 ACK 确认消息已经发送成功回调此时消息为半消息。如果ACK 失败那么客户端就重试当超过最大重试次数就可以做告警了比如邮箱告警等。 3.发送方开始执行本地事务逻辑。 4.发送方根据本地事务执行结果向 MQ Server 提交二次确认Commit 或 Rollback 或MQ Server 收到 Commit 状态则将半消息标记为可投递将消息转存到正常的topic订阅方最终将收到该消息MQ Server 收到 Rollback 状态则删除半消息订阅方将不会接受该消息。 5.在断网或者是应用重启的特殊情况下上述步骤4提交的二次确认最终未到达 MQ Server经过固定时间后 MQ Server 将对该消息发起消息回查。如果会查状态是UNKNOWN待检测那么broker就会隔一段时间重试如果超过重试次数就会放到死信队列中。 6.发送方收到消息回查后需要检查对应消息的本地事务执行的最终结果。 7.发送方根据检查得到的本地事务的最终状态再次提交二次确认MQ Server 仍按照步骤4对半消息进行操作。
RocketMQTransactionListener
public class TransactionListenerImpl implements RocketMQLocalTransactionListener {Overridepublic RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {// 执行本地事务try {// 假设这里是您的本地事务逻辑System.out.println(Executing local transaction for message: new String(msg.getBody()));return RocketMQLocalTransactionState.COMMIT;} catch (Exception e) {return RocketMQLocalTransactionState.ROLLBACK;}}Overridepublic RocketMQLocalTransactionState checkLocalTransaction(Message msg) {// 检查本地事务状态System.out.println(Checking local transaction for message: new String(msg.getBody()));return RocketMQLocalTransactionState.UNKNOWN;}
}
Service
public class TransactionService {private final RocketMQTemplate rocketMQTemplate;public TransactionService(RocketMQTemplate rocketMQTemplate) {this.rocketMQTemplate rocketMQTemplate;}public void sendMessageInTransaction() {MessageString message MessageBuilder.withPayload(Hello RocketMQ).setHeader(tag, TagA).build();rocketMQTemplate.sendMessageInTransaction(TopicTest:TagA, message, null);}
}
对于事务消息还有一个使用场景---充当定时任务
订单创建等待支付比如订单要在15分钟内支付否则关闭订单常规做法是使用Xxl-Job等类似的定时任务中间件比如每个30秒一次查询用户是否支付。
更优雅的做法是使用 rockermq 事务消息充当定时任务 当订单创建时向 broker 发送一个 UNKNOWN 状态的事务消息broker 就会自动的向客户端发送回调检测状态这个就可以在回调方法里面查询用户15分钟之内是否支付成功如果支付失败就将优惠卷状态恢复等等并返回Rollback成功就返回Commit消费者就可以看到这个消息就可以通知到后面的物流系统等如果支付成功了但是消费者执行失败了例如库存扣减失败那么broker就会进行重试会在某一次执行成功达到最终一致性。 ACL权限控制体系
1、是否允许自动创建 topic : autoCreateTopicEnabletrue 表示允许在生产环境一般不允许
2、topic权限 perm 代表权限对生产者和消费者做限权
0 (0000)无权限Neither Read nor Write2 (0010)仅读权限Only Read4 (0100)仅写权限Only Write6 (0110)读写权限Both Read and Write
当消费者消费失败会创建一个perm为6的重试队列当重试次数达到一定次数就会创建一个perm为2死信队列。要处理死信队列只能手动去修改权限后再处理。
也可以在 /conf/plain_acl.yml文件中配置其他权限例如 如果再这个配置中配置了账号和密码那么在客户端声明RocketMQ实例的时候要去指明即可 RocketMQ 作为一个内部服务并不需要对外所有权限控制很少使用。 RocketMQ客户端注意事项
消息的 ID、Key、Tag
这里有个小细节需要注意producer生产者端发送的是Message对象而Consumer消费端处理的却是MessageExt对象。也就是说虽然都是传递消息但是Consumer端拿到的信息会比Producer端发送的消息更多也就有几个重点的参数需要理解。那就是Messageld,Key和Tag。
Messageld是RocketMQ内部给每条消息分配的唯一索引 Producer发送的Message对象是没有msgld属性的。Broker端接收到Producer发过来的消息后会给每条消息单独分配一个唯一的msgld。这个msgID可以作为消息的唯一主键来使用。但是需要注意对于客户端来说毕竟是不知道这个msgld是如何产生的。实际上在RocketMQ内部也会针对批量消息、事务消息等特殊的消息机制有特殊的msgld分配机制当使用某些框架的时候可能会导致Messageld不唯一。因此在复杂业务场景下不建msgld来作为消息的唯一索引l而建议采用下面的key属性自行指定业务层面上的唯一索引。例如订单消息就将订单ID设置为key。 最佳实践
一个应用尽可能用一个Topic而消息子类型则可以用tags来标识。tags可以由应用自由设置只有生产者在发送消息设置了tags消费方在订阅消息时才可以利用tags通过broker做消息过滤message.setTags(tags) Kafka的一大问题是Topic过多会造成Partition文件过多影响性能。而RocketMQ中的Topic完全不会对消息转发性能有影响。但是Topic过多还是会加大RocketMQ的元数据维护的性能消耗。所以在使用时还是需要对Topic进行合理的分配。使用Tag区分消息时尽量直接使用Tag过滤不要使用复杂的SQL过滤。因为消息过滤机制虽然可以减少网络IO但是毕竟会加大Broker端的消息处理压力。所以消息过滤的逻辑还是越简单越好。 消费者端进行幂等控制
官方回答中说道RocketMQ确保所有消息至少传递一次。在大多数情况下消息不会重复。
消息幂等的必要性 在互联网应用中尤其在网络不稳定的情况下消息队列RocketMQ的消息有可能会出现重复这个重复简单可以概括为以下情况 1、发送时消息重复 当一条消息已被成功发送到服务端并完成持久化此时出现了网络闪断或者客户端宕机导致服务端对客户端应答失败。如果此时生产者意识到消息发送失败并尝试再次发送消息消费者后续会收到两条内容相同并且 MessageID 也相同的消息。 2、投递时消息重复 消息消费的场景下消息已投递到消费者并完成业务处理当客户端给服务端反馈应答的时候网络闪断。为了保证消息至少被消费一次消息队列RocketMQ的服务端将在网络恢复后再次尝试投递之前已被处理过的消息消费者后续会收到两条内容相同并且MessageID 也相同的消息。 3、负载均衡时消息重复包括但不限于网络抖动、Broker重启以及订阅方应用重启 当消息队列RocketMQ的Broker或客户端重启、扩容或缩容时会触发 Rebalance此时消费者可能会收到重复消息。
处理方式 从上面的分析中知道在RocketMQ中是无法保证每个消息只被投递一次的所以要在业务上自行来保证消息消费的幂等性。而要处理这个问题RocketMQ的每条消息都有一个唯一的Messageld这个参数在多次投递的过程中是不会改变的所以业务上可以用这个Messageld来作为判断幂等的关键依据。但是这个Messageld是无法保证全局唯一的也会有冲突的情况。所以在一些对幂等性要求严格的场景最好是使用业务上唯一的一个标识比较靠谱。例如订单ID。而这个业务标识可以使用Message的Key来进行传递。 关注错误消息重试
如果消费者返回的状态是 RECONSUME_LATER稍后重试那么这个消息会被放到重试队列中以消费者组的规则和分配策略重新推送给消费者这个重试队列是系统自动创建的一般来说有了重试队列就代表消费者处理有异常那么我们可以监控是否有重试队列以此来错告警。 手动处理死信队列
死信队列的特征 1、一个死信队列对应一个ConsumGroup而不是对应某个消费者实例。 2、如果一个ConsumeGroup没有产生死信队列RocketMQ就不会为其创建相应的死信队列。 3、一个死信队列包含了这个ConsumeGroup里的所有死信消息而不区分该消息属于哪个Topic。 4、死信队列中的消息不会再被消费者正常消费。 5、死信队列的有效期跟正常消息相同。默认3天对应broker.conf中的fileReservedTime属性。超过这个最长时间的消息都会被删除而不管消息是否消费过。 注默认创建出来的死信队列他里面的消息是无法读取的在控制台和消费者中都无法读取。这是因为这些默认的死信队列他们的权限perm被设置成了2:禁读(这个权限有三种2:禁读4:禁写,6:可读可写)。需要手动将死信队列的权限配置成6才能被消费(可以通过mqadmin指定或者web控制台)。
MQ如何保证消息不丢失
1、哪些环节可能丢消息 其中124三个场景都是跨网络的而跨网络就肯定会有丢消息的可能。 然后关于3这个环节通常MQ存盘时都会先写入操作系统的缓存pagecache中然后再由操作系统异步的将消息写入硬盘。这个中间有个时间差就可能会造成消息丢失。如果服务挂了缓存中还没有来得及写入硬盘的消息就会丢失。
2、生产者发送消息如何保证不丢
生产者发送消息之所以可能会丢消息都是因为网络。因为网络的不稳定性容易造成请求丢失。怎么解决这样的问题呢其实一个统一的思路就是生产者确认。简单来说就是生产者发出消息后给生产者一个确定的通知这个消息在Broker端是否写入完成了。就好比打电话不确定电话通没通那就互相说个“喂”具体确认一下。只不过基于这个同样的思路各个MQ产品有不同的实现方式。
1、通过RockerMQ客户端的消息确认机制保证消息不丢失
2、通过发送事务消息来保证息不丢失 3、Borker写入数据如何保证不丢失 broker接收到消息并不会马上写到磁盘上而是先写到操作系统的pagecache缓存页中过一段时间才才写到磁盘冲。以Linux为例用户态的应用程序不管是什么应用程序想要写入磁盘文件时都只能调用操作系统提供的write系统调用申请写磁盘。至于消息如何经过PageCache再写入到磁盘中这个过程这个过程是在内核态执行的也就是操作系统自己执行的应用程序无法干预。这个过程中应用系统唯一能够干预的就是调用操作系统提供的sync系统调用申请一次刷盘操作主动将PageCache中的数据写入到磁盘。
RocketMQ如何调用fsync的 RocketMQ的Broker提供了一个很明确的配置项flushDiskType可以选择刷盘模式。有两个可选项SYNC_FLUSH同步刷盘和ASYNC_FLUSH异步刷盘。 所谓同步刷盘是指broker每往日志文件中写入一条消息就调用一次刷盘操作。而异步刷盘则是指broker每隔一个固定的时间才去调用一次刷盘操作。异步刷盘性能更稳定但是会有丢消息的可能。而同步刷盘的消息安全性就更高但是操作系统的IO压力就会非常大。 在RocketMQ中就算是同步刷盘其实也并不是真的写一次消息就刷盘一次这在海量消息的场景下操作系统是撑不住的。所以我们在之前梳理RocketMQ核心源码的过程中看到RocketMQ的同步刷盘的实现方式其实也是以10毫秒的间隔去调用刷盘操作。从理论上来说也还是会有非正常断电造成消息丢失的可能甚至严格意义上来说任何应用程序都不可能完全保证断电消息不丢失。但是RocketMQ的这一套同步刷盘机制却可以 通过绝大部分业务场景的验证。这其实就是一种平衡。 4、Broker主从同步如何保证不丢失 在这种集群机制下消息的安全性还是比较高的。但是有一种极端的情况需要考虑。因为消息需要从Master往Slave同步这个过程是跨网络的因此也是有时间延迟的。所以如果Master出现非正常崩溃那么就有可能有一部分数据是已经写入到了Master但是还来得及同步到Slave。这一部分未来得及同步的数据在RocketMQ的这种集群机制下就会一直记录在Master节点上。等到Master重启后就可以继续同步了。另外由于Slave并不会主动切换成Master所以Master服务崩溃后也不会有新的消息写进来因此也不会有消息冲突的问题。所以只要Mater的磁盘没有坏那么在这种普通集群下主从同步通常不会造成消息丢失。 他优先保证的是集群内的数据一致性而并不是保证不丢失。在某些极端场景下比如出现网络分区情况时也会丢失一些未经过集群内确认的消息。不过基于RocketMQ的使用场景这种丢失消息的可能性非常小。另外这种服务端无法保证消息安全的问题其实结合客户端的生产者确认机制是可以得到比较好的处理的。因此在RocketMQ中使用Dledger集群的话数据主从同步这个过程数据安全性还是比较高的。基本可以认为不会造成消息丢失。 5、消费者消费消息如何不丢失
消费者消费消息的过程中需要从Broker上拉取消息这些消息也是跨网络的所以拉取消息的请求也可能丢失。这时会不会有丢消息的可能呢 几乎所有的MQ产品都设置了消费状态确认机制。也就是消费者处理完消息后需要给Broker一个响应表示消息被正常处理了。如果Broker端没有拿到这个响应不管是因为Consumer没有拿到消息还是Consumer处理完消息后没有给出相应Broker都会认为消息没有处理成功。之后Broker就会向Consumer重复投递这些没有处理成功的消息如果超过重试次数机会放到死信队列。RocketMQ和Kafka是根据Offset机制重新投递而RabbitMQ的ClassicQueue经典对列则是把消息重新入队。因此正常情况下Consumer消费消息这个过程是不会造成消息丢失的相反可能需要考虑下消息幂等的问题。 6、如果MQ服务全部挂了如何保证不丢失
最后有一种小概率的极端情况就是MQ的服务全部挂掉了这时要如何保证业务能够继续稳定进行同时业务数据不会丢失呢 通常的做法是设计一个降级缓存。Producer往MQ发消息失败了就往降级缓存中写然后依然正常去进行后续的业务。此时再启动一个线程不断尝试将降级缓存中的数据往MQ中发送。这样至少当MQ服务恢复过来后这些消息可以尽快进入到MQ中继续往下游Conusmer推送而不至于造成消息丢失。 7、MQ消息零丢失方案总结 最后要注意到这里讨论到的各种MQ消息防止丢失的方案其实都是以增加集群负载降低吞吐为代价的。这必然会造成集群效率下降。因此这些保证消息安全的方案通常都需要根据业务场景进行灵活取舍而不是一股脑的直接用上。 这些消息零丢失方案其实是没有最优解的。因为如果有最优解那么这些MQ产品就不需要保留各种各样的设计了。这和很多面试八股文是有冲突的。面试八股文强调标准答案而实际业务中这个问题是没有标准答案的一切都需要根据业务场景去调整。 MQ如何保证消息的顺序性 这里首先需要明确的是通常讨论MQ的消息顺序性其实是在强调局部有序而不是全局有序。就好比QQ和微信的聊天消息通常只要保证同一个聊天窗口内的消息是严格有序的。至于不同窗口之间的消息顺序出了点偏差其实是无所谓的。所谓全局有序通常在业务上没有太多的使用场景。在RocketMQ和Kafka中把Topic的分区数设置成1这类强行保证消息全局有序的方案纯属思维体操。 这个机制需要两个方面的保障。 1、Producer将一组有序的消息写入到同一个MessageQueue中。 2、Consumer每次集中从一个MessageQueue中拿取消息。 在Producer端RocketMQ和Kafka都提供了分区计算机制可以让应用程序自己决定消息写入到哪一个分区。所以这一块是由业务自己决定的。只要通过定制数据分片算法把一组局部有序的消息发到同一个对列当中就可以通过对列的FIFO特性保证消息的处理顺序。对于RabbitMQ则可以通过维护Exchange与Queue之间的绑定关系将这一组局部有序的消息转发到同一个对列中从而保证这一组有序的消息在RabbitMQ内部保存时是有序的。在Conusmer端RocketMQ是通过让Consumer注入不同的消息监听器来进行区分的。而具体的实现机制核心是通过Consumer的消费线程进行并发控制来保证消息的消费顺序的。类比到Kafka呢。Kafka中并没有这样的并发控制。而实际上Kafka的Consumer对某一个Partition拉取消息时天生就是单线程的所以参照RocketMQ的顺序消费模型Kafka的Consumer天生就是能保证局部顺序消费的。 至于RabbitMQ以他的ClassicQueue经典对列为例他的消息被一个消费者从队列中拉取后就直接从队列中把消息删除了。所以基本不存在资源竞争的问题。那就简单的是一个队列只对应一 个Consumer那就是能保证顺序消费的。如果一个队列对应了多个Consumer同一批消息可能会进入不同的Consumer处理所以也就没法保证消息的消费顺序 MQ如何保证消息幂等性 通用解决方案 1. 使用唯一消息标识 在发送消息时为每条消息生成一个全局唯一的标识符如UUID、订单号等。在消费端通过检查这个唯一标识来判断消息是否已被处理过。 实现步骤 发送端在发送消息时将唯一标识作为消息的Keys或属性。消费端在接收到消息后首先检查这个唯一标识是否已存在于数据库中。如果存在则说明消息已被处理过直接跳过如果不存在则进行业务处理并将唯一标识存入数据库。 2. 引入消息去重表或者布隆过滤器 在数据库中创建一个消息去重表用于记录已处理消息的唯一标识。消费端在处理消息前先查询去重表判断消息是否已被处理。或者使用布隆过滤器来判断是否消费过了。 实现步骤 定义一个消息去重表包含唯一标识和消息状态等字段。消费端在处理消息前先插入一条记录到去重表中使用唯一标识作为主键以处理并发插入时的冲突。如果插入成功则说明消息是新的进行业务处理如果插入失败主键冲突则说明消息已被处理过直接跳过。 3. 利用RocketMQ的消息ID 虽然RocketMQ的消息ID在大多数情况下是唯一的但不建议直接依赖它来实现消息的幂等性因为存在生产者手动重发相同消息但Message ID不同的情况。 然而在某些场景下可以结合业务唯一标识和消息ID来辅助实现幂等性。例如在数据库中同时记录业务唯一标识和RocketMQ的消息ID通过这两个字段的组合来确保消息的唯一性。 4. 引入分布式锁 对于需要严格保证幂等性的场景可以考虑在消费消息前引入分布式锁。通过分布式锁来确保同一时间只有一个消费者能处理某条消息。 实现步骤 在处理消息前尝试获取分布式锁以消息的唯一标识作为锁键。如果获取成功则进行业务处理如果获取失败则说明有其他消费者正在处理该消息直接跳过。 注意事项 性能考虑在实现幂等性时要注意避免引入过多的数据库操作或分布式锁以免影响系统的整体性能。容错性要确保幂等性实现方案具有容错性能够在各种异常情况下正确运行。业务逻辑适配幂等性实现应紧密结合业务逻辑确保在复杂业务场景下的有效性和正确性。 MQ如何快速处理积压的消息 1、消息积压会有哪些问题。 对RocketMQ和Kafka来说他们的消息积压能力本来就是很强的因此短时间的消息积压是没有太多问题的。但是需要注意如果消息积压问题一直得不到解决RocketMQ和Kafka在日志文件过期后就会直接删除过期的日志文件。而这些日志文件上未消费的消息就会直接丢失。 2、怎么处理大量积压的消息 产生消息积压的根本原因还是Consumer处理消息的效率太低所以最核心的目标还是要提升Consumer消费消息的效率。如果不能从业务上提升Consumer消费消息的性能那么最直接的办法就是针对处理消息比较慢的消费者组增加更多的Consumer实例。但是这里需要注意一下增加Consumer实例是不是会有上限。因为同一个消费者组下的多个Cosumer需要和对应Topic下的MessageQueue建立对应关系而一个MessageQueue最多只能被一个Consumer消费因此增加的Consumer实例最多也只能和Topic下的MessageQueue个数相同。如果此时再继续增加Consumer的实例那么就会有些Consumer实例是没有MessageQueue去消费的因此也就没有用了。 这时如果Topic下的MessageQueue配置本来就不够多的话那就无法一直增加Consumer节点个数了。这时怎么处理呢如果要快速处理积压的消息可以创建一个新的Topic配置足够多的MessageQueue。然后把Consumer实例的Topic转向新的Topic在消费者代码中发消息并紧急上线一组新的消费者只负责消费旧Topic中的消息并转存到新的Topic中。这个速度明显会比普通Consumer处理业务逻辑要快很多。然后在新的Topic上就可以通过添加消费者个数来提高消费速度了。之后再根据情况考虑是否要恢复成正常情况。其实这种思路和RocketMQ内部很多特殊机制的处理方式是一样的。例如固定级别的延迟消息机制也是把消息临时转到一个系统内部的Topic下处理过后再转回来。