正规的企业网站建设,邯郸市网站建设多少钱,网站建设行业税率,制作网站的公司不干了RocketMQ 基本架构消息模型消费者消费消息模式顺序消息机制延迟消息批量消息事务消息消息重试最佳实践 基本架构 nameServer: 维护broker列表信息#xff0c;客户端连接时只需要连接nameServer。可配置成集群。 broker#xff1a;broker分为master和slave#xff0c;master负… RocketMQ 基本架构消息模型消费者消费消息模式顺序消息机制延迟消息批量消息事务消息消息重试最佳实践 基本架构 nameServer: 维护broker列表信息客户端连接时只需要连接nameServer。可配置成集群。 brokerbroker分为master和slavemaster负责消息的发送和消费slave是master的备份。master-slaver的集群方式master挂掉时候slave不能主动转换为master提供服务5.X版本后可以通过配置实现mater挂掉后slave转为master提供服务。 leader-follower的集群方式即高可用集群各个broker是对等的通过选举产生leader在dashboart中显示为master如果leader挂掉在剩下的follower显示为slave中选举再产生新的leader。注意只有超过半数的几点存活才能选举出leader。
消息模型 ⽣产者和消费者都可以指定⼀个Topic发送消息或者拉取消息。⽽Topic是⼀个逻辑概念。 Topic中的消息会分布在后⾯多个MessageQueue当中。这些MessageQueue会分布到⼀个或者多个broker中。
消费者消费消息模式
广播模式所有关注topic的消费者都收到消息。广播模式下消息队列的消费位点由客户端自己维护消费失败服务端不会重发。 集群模式同一个消费者组只有一个成员收到消息。集群模式下消费点位由服务端维护消费者组的所有成员共用一个位点消费失败服务端会重发。
顺序消息机制
⽣产者只有将⼀批有顺序要求的消息放到同⼀个MesasgeQueue上通过MessageQueue的FIFO特性保证这⼀批消息的顺序。如果不指定MessageSelector对象 那么⽣产者会采⽤轮询的⽅式将多条消息依次发送到不同的MessageQueue上。消费者需要实现MessageListenerOrderly接⼝实际上在服务端处理MessageListenerOrderly时会给⼀个MessageQueue加锁拿到MessageQueue上所有的消息然后再去读取下⼀个MessageQueue的消息。消费消息失败时不建议抛出异常可以返回ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT作为替代。因为消费者端只进⾏有限次数的重试。如果⼀条消息处理失败RocketMQ会将后续消息阻塞住让消费者进⾏重试。但是如果消费者⼀直处理失败超过最⼤重试次数那么RocketMQ就会跳过这⼀条消息处理后⾯的消息这会造成消息乱序。
延迟消息
定固定的延迟级别对于指定固定延迟级别的延迟消息RocketMQ的实现⽅式是预设⼀个系统Topic名字叫做SCHEDULE_TOPIC_XXXXX。在这个Topic下预设了18个MessageQueue。这⾥每个对列就对应了⼀种延迟级别。然后每次扫描这18个队列⾥的消息进⾏延迟操作就可以了。指定消息发送时间RocketMQ是通过时间轮算法实现。
批量消息
⽣产者要发送的消息⽐较多时可以将多条消息合并成⼀个批量消息⼀次性发送出去。这样可以减少⽹络IO提升消息发送的吞吐量。同⼀批消息的Topic必须相同另外不⽀持延迟消息。还有批量消息的⼤⼩不要超过1M如果太⼤就需要⾃⾏分割。
事务消息 ⽣产者将消息发送⾄ApacheRocketMQ服务端。ApacheRocketMQ服务端将消息持久化成功之后向⽣产者返回Ack确认消息已经发送成功此时消息被标记为暂不能投递这种状态下的消息即为半事务消息。⽣产者开始执⾏本地事务逻辑。⽣产者根据本地事务执⾏结果向服务端提交⼆次确认结果Commit或是Rollback服务端收到确认结果后处理逻辑如下⼆次确认结果为Commit服务端将半事务消息标记为可投递并投递给消费者。⼆次确认结果为Rollback服务端将回滚事务不会将半事务消息投递给消费者。在断⽹或者是⽣产者应⽤重启的特殊情况下若服务端未收到发送者提交的⼆次确认结果或服务端收到的⼆次确认结果为Unknown未知状态经过固定时间后服务端将对消息⽣产者即⽣产者集群中任⼀⽣产者实例发起消息回查。⽣产者收到消息回查后需要检查对应消息的本地事务执⾏的最终结果。⽣产者根据检查到的本地事务的最终状态再次提交⼆次确认服务端仍按照步骤4对半事务消息进⾏处理。
消息重试
RocketMQ的消费者端如果处理消息失败了Broker是会将消息重新进⾏投送的。⽽在重试时RocketMQ实际上会为每个消费者组创建⼀个对应的重试队列。重试的消息会进⼊⼀个“%RETRY%”ConsumeGroup的队列中。 RocketMQ默认允许每条消息最多重试16次每次重试的间隔时间如下 如果消息重试16次后仍然失败消息将不再投递转为进⼊死信队列。重试次数可以通过consumer.setMaxReconsumeTimes(20);将重试次数设定为20次。当定制的重试次数超过16次后消息的重试时间间隔均为2⼩时。 如果消息超过最⼤重试次数RocketMQ会将消息发送到死信队列。⼀个死信队列对应⼀个消费组。死信队列的默认权限为2禁读。如果需要处理死信队列的消息需要把权限修改为6可读可写后消费该Topic的消息进行处理。队列中超过有效期默认3天的消息会被删除不管有没有消费。
最佳实践
⼀个应⽤尽可能⽤⼀个Topic⽽消息⼦类型则可以⽤tags来标识。tags过滤消息的性能很高相当于索引。消费端幂等控制RocketMQ的每条消息都有⼀个唯⼀的MessageId这个参数在多次投递的过程中是不会改变的所以业务上可以⽤这个MessageId来作为判断幂等的关键依据。但是这个MessageId是⽆法保证全局唯⼀的也会有冲突的情况。所以在⼀些对幂等性要求严格的场景最好是使⽤业务上唯⼀的⼀个标识⽐较靠谱。例如订单ID。⽽这个业务标识可以使⽤Message的Key来进⾏传递。