淡水做网站,wordpress阅读量怎么查看,网站建设维护与推广,企业邮箱登录界面前言分布式事务是在微服务开发中经常会遇到的一个问题#xff0c;之前的文章中我们已经实现了利用Seata来实现强一致性事务#xff0c;其实还有一种广为人知的方案就是利用消息队列来实现分布式事务#xff0c;保证数据的最终一致性#xff0c;也就是我们常说的柔性事务。消… 前言分布式事务是在微服务开发中经常会遇到的一个问题之前的文章中我们已经实现了利用Seata来实现强一致性事务其实还有一种广为人知的方案就是利用消息队列来实现分布式事务保证数据的最终一致性也就是我们常说的柔性事务。消息队列实现分布式事务原理首先让我们来看一下基于消息队列实现分布式事务的原理方案。柔性事务发送消息的服务有个OUTBOX数据表在进行INSERT、UPDATE、DELETE 业务操作时也会给OUTBOX数据表INSERT一条消息记录这样可以保证原子性因为这是基于本地的ACID事务。OUTBOX表充当临时消息队列然后我们在引入一个消息中继MessageRelay的服务由他从OUTBOX表中读取数据并发布消息到消息组件。消息中继的实现可以很简单只需要通过定时任务定期从OUTBOX表中拉取最新未发布的数据获取到数据后将数据发送给消息组件最后将完成发送的消息从OUTBOX表中删除即可对于失败的消息可以根据业务规则进行重试。RocketMQ的事务消息RocketMQ本身已经支持事务消息如果你们项目使用了RocketMQ可以直接借助RocketMQ的事务消息实现分布式事务我们先看一下RocketMQ事务消息的原理然后再借助RocketMQ来实现分布式事务。RocketMQ采用了2PC的思想来实现了提交事务消息同时增加一个补偿逻辑来处理二阶段超时或者失败的消息如下图所示。分布式事务RocketMQ实现事务消息主要分为两个阶段正常事务的发送及提交、事务信息的补偿流程整体流程为正常事务发送与提交阶段1、生产者发送一个半消息给MQServer半消息是指消费者暂时不能消费的消息2、服务端响应消息写入结果半消息发送成功3、开始执行本地事务4、根据本地事务的执行状态执行Commit或者Rollback操作事务信息的补偿流程1、如果MQServer长时间没收到本地事务的执行状态会向生产者发起一个确认回查的操作请求2、生产者收到确认回查请求后检查本地事务的执行状态3、根据检查后的结果执行Commit或者Rollback操作补偿阶段主要是用于解决生产者在发送Commit或者Rollback操作时发生超时或失败的情况。RocketMQ事务流程关键事务消息在一阶段对用户不可见事务消息相对普通消息最大的特点就是一阶段发送的消息对用户是不可见的也就是说消费者不能直接消费。这里RocketMQ的实现方法是原消息的主题与消息消费队列然后把主题改成 RMQ_SYS_TRANS_HALF_TOPIC 这样由于消费者没有订阅这个主题所以不会被消费。如何处理第二阶段的失败消息在本地事务执行完成后会向MQServer发送Commit或Rollback操作此时如果在发送消息的时候生产者出故障了那么要保证这条消息最终被消费MQServer会像服务端发送回查请求确认本地事务的执行状态。当然了rocketmq并不会无休止的的信息事务状态回查默认回查15次如果15次回查还是无法得知事务状态RocketMQ默认回滚该消息。消息状态 事务消息有三种状态TransactionStatus.CommitTransaction提交事务消息消费者可以消费此消息TransactionStatus.RollbackTransaction回滚事务它代表该消息将被删除不允许被消费。TransactionStatus.Unknown 中间状态它代表需要检查消息队列来确定状态。代码实现业务需求用户请求订单微服务 order-service 接口删除订单退货删除订单时需要调用 account-service的方法给账户增加余额一个典型的分布式事务问题。基础配置在Order-Service和Account-Service中引入Rocket消息组件dependencygroupIdorg.apache.rocketmq/groupIdartifactIdrocketmq-spring-boot-starter/artifactId
/dependency在配置中心添加RocketMQ的相关配置rocketmq:name-server: xxx.xx.x.xx:9876producer:group: cloud-group在OrderService服务中建立一张事务日志表rocketmq_transaction_log作用稍后说发送半消息Order-Service作为分布式事务开始的入口在Service层我们给RocketMQ发送一条半消息OrderController入口/*** 根据订单号删除订单* param orderNo 订单编号*/
PostMapping(/order/delete)
public ResultDataString delete(RequestParam String orderNo){log.info(delete order id is {},orderNo);orderService.delete(orderNo);return ResultData.success(订单删除成功);
}直接调用orderService的delete方法OrderServiceImpl业务逻辑Override
public void delete(String orderNo) {Order order orderMapper.selectByNo(orderNo);//如果订单存在且状态为有效进行业务处理if (order ! null CloudConstant.VALID_STATUS.equals(order.getStatus())) {String transactionId UUID.randomUUID().toString();//如果可以删除订单则发送消息给rocketmq让用户中心消费消息rocketMQTemplate.sendMessageInTransaction(add-amount,MessageBuilder.withPayload(UserAddMoneyDTO.builder().userCode(order.getAccountCode()).amount(order.getAmount()).build()).setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId).setHeader(order_id,order.getId()).build(),order);}
}首先校验一下订单状态然后使用 rocketMQTemplate.sendMessageInTransaction()发送事务消息。sendMessageInTransaction方法有三个参数destination目的地(主题)这里发送给add-amount 这个topicmessage发送给消费者的消息体需要使用MessageBuilder.withPayload() 来构建消息arg参数注意这里我们生成了一个transactionId并放在header中跟消息一起发送这里实际也可以构造成一个对象放在arg里进行发送作用后面再讲消息封装实体UserAddMoneyDTOData
NoArgsConstructor
AllArgsConstructor
Builder
public class UserAddMoneyDTO {/*** 用户编码*/private String userCode;/*** 金额*/private BigDecimal amount;
}这个类生产者和消费者都需要用到所以我直接丢到common包中大家根据项目实际情况决定放哪。执行本地事务与回查MQServer收到半消息后会告诉生产者order-service确认收到半消息这时候order-service需要执行本地事务执行完本地事务后再告诉MQServer本地事务的执行状态确认此消息究竟是Commit还是Rollback。RocketMQ提供了 RocketMQLocalTransactionListener 接口本地事务监听器这个接口类的实现如下第一个方法 executeLocalTransaction 为执行本地事务第二个方法 checkLocalTransaction 为检查本地事务的执行状态也就是回查动作。我们需要实现 RocketMQLocalTransactionListener接口在 executeLocalTransaction方法中执行本地事务在执行 checkLocalTransaction回查方法时告诉RocketMQ到底该提交还是回滚。这里大家思考一个问题本地事务已经执行完成了怎么去回查本地事务的执行结果呢答案如下我们可以在执行本地事务的时候同时生成一条事务日志让本地事务与日志事务在同一个方法中同时添加 Transactional 注解保证两个操作事务是一个原子操作。这样如果事务日志表中有这个本地事务的信息那就代表本地事务执行成功需要Commit相反如果没有对应的事务日志则表示执行失败需要Rollback。这就是为什么我们上面在OrderService中需要建立一张事务日志表的原因。实现RocketMQLocalTransactionListener接口完成事务执行逻辑/*** 监听事务消息* author javadaily*/
Slf4j
RocketMQTransactionListener
RequiredArgsConstructor(onConstructor __(Autowired))
public class AddUserAmountListener implements RocketMQLocalTransactionListener {private final OrderService orderService;private final RocketMqTransactionLogMapper rocketMqTransactionLogMapper;/*** 执行本地事务*/Overridepublic RocketMQLocalTransactionState executeLocalTransaction(Message message, Object arg) {log.info(执行本地事务);MessageHeaders headers message.getHeaders();//获取事务IDString transactionId (String) headers.get(RocketMQHeaders.TRANSACTION_ID);Integer orderId Integer.valueOf((String)headers.get(order_id));log.info(transactionId is {}, orderId is {},transactionId,orderId);try{//执行本地事务并记录日志orderService.changeStatuswithRocketMqLog(orderId, CloudConstant.INVALID_STATUS,transactionId);//执行成功可以提交事务return RocketMQLocalTransactionState.COMMIT;}catch (Exception e){return RocketMQLocalTransactionState.ROLLBACK;}}/*** 本地事务的检查检查本地事务是否成功*/Overridepublic RocketMQLocalTransactionState checkLocalTransaction(Message message) {MessageHeaders headers message.getHeaders();//获取事务IDString transactionId (String) headers.get(RocketMQHeaders.TRANSACTION_ID);log.info(检查本地事务,事务ID:{},transactionId);//根据事务id从日志表检索QueryWrapperRocketmqTransactionLog queryWrapper new QueryWrapper();queryWrapper.eq(transaction_id,transactionId);RocketmqTransactionLog rocketmqTransactionLog rocketMqTransactionLogMapper.selectOne(queryWrapper);if(null ! rocketmqTransactionLog){return RocketMQLocalTransactionState.COMMIT;}return RocketMQLocalTransactionState.ROLLBACK;}
}本地事务执行逻辑Transactional(rollbackFor RuntimeException.class)
Override
public void changeStatuswithRocketMqLog(Integer id,String status,String transactionId){orderMapper.changeStatus(id,status);rocketMqTransactionLogMapper.insert(RocketmqTransactionLog.builder().transactionId(transactionId).log(执行删除订单操作).build());
}修改订单状态为删除状态同时往事务日志表中插入一条事务日志用Transactional注解保证事务。Account-Service消费消息监听消息并处理给用户增加余额逻辑Slf4j
Service
RocketMQMessageListener(topic add-amount,consumerGroup cloud-group)
RequiredArgsConstructor(onConstructor __(Autowired) )
public class AddUserAmountListener implements RocketMQListenerUserAddMoneyDTO {private final AccountMapper accountMapper;/*** 收到消息的业务逻辑*/Overridepublic void onMessage(UserAddMoneyDTO userAddMoneyDTO) {log.info(received message: {},userAddMoneyDTO);accountMapper.increaseAmount(userAddMoneyDTO.getUserCode(),userAddMoneyDTO.getAmount());log.info(add money success);}
}测试测试数据订单表用户表事务日志表如果事务消息成功消费最终用户表中jianzh5这个用户的amount应该变成300100200测试准备我们在执行本地事务成功并需要通知消息队列提交事务处打个断点然后在执行到此处时手动模拟异常模拟异常在准备提交事务时我们通过命令 taskkill /pid 10116 -t -f命令强制杀掉OrderService进程。先通过jps获取OrderService进程ID重启服务器检查是否会执行回查方法重启OrderService程序会自动执行回查方法结合事务日志表判断是否提交事务。运行后的结果小结本篇文章我们介绍了使用消息队列实现柔性事务的方案重点剖析了RocketMQ事务消息的原理并通过Demo案例实现了分布式事务柔性事务。