外贸建站推广公司,河北保定建设工程信息网站,房地产企业网站开发,合肥知名网页制作公司事务消息
应用场景#xff1a;
事务消息是RocketMQ非常有特色的一个高级功能。他的基础诉求是通过RocketMQ的事务机制#xff0c;来保证上下游的数据一致性。
以电商为例#xff0c;用户支付订单这一核心操作的同时会涉及到下游物流发货、积分变更、购…事务消息
应用场景
事务消息是RocketMQ非常有特色的一个高级功能。他的基础诉求是通过RocketMQ的事务机制来保证上下游的数据一致性。
以电商为例用户支付订单这一核心操作的同时会涉及到下游物流发货、积分变更、购物车状态清空等多个子系统的变更。这种场景非常适合使用RocketMQ的解耦功能来进行串联。 考虑到事务的安全性即要保证相关联的这几个业务一定是同时成功或者同时失败的。如果要将四个服务一起作为一个分布式事务来控制可以做到但是会非常麻烦。而使用RocketMQ在中间串联了之后事情可以得到一定程度的简化。由于RocketMQ与消费者端有失败重试机制所以只要消息成功发送到RocketMQ了那么可以认为Branch2.1Branch2.2Branch2.3这几个分支步骤是可以保证最终的数据一致性的。 在此基础上RocketMQ提出了事务消息机制采用两阶段提交的思路保证Main Branch1和Branch2之间的事务一致性。 具体的实现思路 生产者将消息发送至Apache RocketMQ服务端。Apache RocketMQ服务端将消息持久化成功之后向生产者返回Ack确认消息已经发送成功此时消息被标记为暂不能投递这种状态下的消息即为半事务消息。生产者开始执行本地事务逻辑。生产者根据本地事务执行结果向服务端提交二次确认结果Commit或是Rollback服务端收到确认结果后处理逻辑如下 二次确认结果为Commit服务端将半事务消息标记为可投递并投递给消费者。二次确认结果为Rollback服务端将回滚事务不会将半事务消息投递给消费者。在断网或者是生产者应用重启的特殊情况下若服务端未收到发送者提交的二次确认结果或服务端收到的二次确认结果为Unknown未知状态经过固定时间后服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查。生产者收到消息回查后需要检查对应消息的本地事务执行的最终结果。生产者根据检查到的本地事务的最终状态再次提交二次确认服务端仍按照步骤4对半事务消息进行处理。
核心代码 producer.setTransactionListener(transactionListener);public class TransactionListenerImpl implements TransactionListener {private AtomicInteger transactionIndex new AtomicInteger(0);private ConcurrentHashMapString, Integer localTrans new ConcurrentHashMap();Overridepublic LocalTransactionState executeLocalTransaction(Message msg, Object arg) {//执行本地事务}Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg) {//本地事务检查}
}重点使用RocketMQ提供的TransactionMQProducer事务生产者在TransactionMQProducer中注入一个TransactionListener事务监听器来执行本地事务以及后续对本地事务的检查。
注意点
1、半消息是对消费者不可见的一种消息。实际上RocketMQ的做法是将消息转到了一个系统TopicRMQ_SYS_TRANS_HALF_TOPIC。
2、事务消息中本地事务回查次数通过参数transactionCheckMax设定默认15次。本地事务回查的间隔通过参数transactionCheckInterval设定默认60秒。超过回查次数后消息将会被丢弃。
3、其实了解了事务消息的机制后在具体执行时可以对事务流程进行适当的调整。