怎么用网站做远控,做网站购买什么,中山如何制作网站,老年门户网站建设的意义目录 1. 事务消息
1.1 RocketMQ事务消息的原理
1.2 RocketMQ订单支付功能设计 1. 事务消息
RocketMQ的事务消息#xff0c;是指发送消息事件和其他事件需要同时成功或同时失败。比如银行转账#xff0c; A银行的某账户要转一万元到B银行的某账户。A银行发送“B银行账户增加…目录 1. 事务消息
1.1 RocketMQ事务消息的原理
1.2 RocketMQ订单支付功能设计 1. 事务消息
RocketMQ的事务消息是指发送消息事件和其他事件需要同时成功或同时失败。比如银行转账 A银行的某账户要转一万元到B银行的某账户。A银行发送“B银行账户增加一万元”这个消息要和“从A银 行账户扣除一万元”这个操作同时成功或者同时失败。RocketMQ采用两阶段提交的方式实现事务消息。
1.1 RocketMQ事务消息的原理 半事务消息发送生产者将半事务消息发送至RocketMQ服务端。 消息持久化及返回Ack确认RocketMQ服务端接收到半事务消息并持久化成功后向生产者返回Ack确认消息已经发送成功。此时消息状态为半事务消息。 执行本地事务逻辑根据发送结果执行本地事务如果写入失败此时half消息对业务不可见本地事务逻辑不执行。 提交二次确认结果根据本地事务状态执行Commit或者Rollback。RocketMQ 的事务消息分为3种状态分别是提交状态、回滚状态、未知状态。TransactionStatus.CommitTransaction: 提交事务它允许消费者消费此消息。TransactionStatus.RollbackTransaction: 回滚事务它代表该消息将被删除不允许被消费。TransactionStatus.Unknown: 未知状态它代表需要检查消息队列来确定状态。 消息回查(1) 对没有Commit/Rollback的事务消息从服务端发起一次回查 (2) Producer收到回查消息检查回查消息对应的本地事务的状态 (3) 根据本地事务状态重新Commit或者Rollback。第一次回查后仍未获取到事务状态则之后每隔30s会再次回查最多重试15次超过了就会默认丢弃此消息。
1.2 RocketMQ订单支付功能设计
数据库设计
/*
SQLyog Community v13.2.0 (64 bit)
MySQL - 8.0.33 : Database - shop
*********************************************************************
*//*!40101 SET NAMES utf8 */;/*!40101 SET SQL_MODE*/;/*!40014 SET OLD_UNIQUE_CHECKSUNIQUE_CHECKS, UNIQUE_CHECKS0 */;
/*!40014 SET OLD_FOREIGN_KEY_CHECKSFOREIGN_KEY_CHECKS, FOREIGN_KEY_CHECKS0 */;
/*!40101 SET OLD_SQL_MODESQL_MODE, SQL_MODENO_AUTO_VALUE_ON_ZERO */;
/*!40111 SET OLD_SQL_NOTESSQL_NOTES, SQL_NOTES0 */;
CREATE DATABASE /*!32312 IF NOT EXISTS*/shop /*!40100 DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci */ /*!80016 DEFAULT ENCRYPTIONN */;USE shop;/*Table structure for table shop_order */DROP TABLE IF EXISTS shop_order;CREATE TABLE shop_order (id VARCHAR(50) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin NOT NULL COMMENT 订单id,total_num INT DEFAULT NULL COMMENT 数量合计,moneys INT DEFAULT NULL COMMENT 金额合计,pay_type VARCHAR(1) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin DEFAULT NULL COMMENT 支付类型1、在线支付、0 货到付款,create_time DATETIME DEFAULT NULL COMMENT 订单创建时间,update_time DATETIME DEFAULT NULL COMMENT 订单更新时间,pay_time DATETIME DEFAULT NULL COMMENT 付款时间,consign_time DATETIME DEFAULT NULL COMMENT 发货时间,end_time DATETIME DEFAULT NULL COMMENT 交易完成时间,username VARCHAR(50) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin DEFAULT NULL COMMENT 用户名称,recipients VARCHAR(50) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin DEFAULT NULL COMMENT 收货人,recipients_mobile VARCHAR(12) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin DEFAULT NULL COMMENT 收货人手机,recipients_address VARCHAR(200) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin DEFAULT NULL COMMENT 收货人地址,weixin_transaction_id VARCHAR(30) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin DEFAULT NULL COMMENT 交易流水号,order_status INT DEFAULT NULL COMMENT 订单状态,0:未完成,1:已完成2已退货,pay_status INT DEFAULT NULL COMMENT 支付状态,0:未支付1已支付2支付失败,is_delete INT DEFAULT NULL COMMENT 是否删除,PRIMARY KEY (id),KEY create_time (create_time),KEY status (order_status),KEY payment_type (pay_type)
) ENGINEINNODB DEFAULT CHARSETutf8mb3 COLLATEutf8mb3_bin;/*Data for the table shop_order *//*!40101 SET SQL_MODEOLD_SQL_MODE */;
/*!40014 SET FOREIGN_KEY_CHECKSOLD_FOREIGN_KEY_CHECKS */;
/*!40014 SET UNIQUE_CHECKSOLD_UNIQUE_CHECKS */;
/*!40111 SET SQL_NOTESOLD_SQL_NOTES */;
添加RocketMQ依赖
dependencygroupIdorg.apache.rocketmq/groupIdartifactIdrocketmq-spring-boot-starter/artifactId
/dependency
bootstrap.yaml配置
server:port: 8085
spring:application:name: mall-orderdatasource:driver-class-name: com.mysql.cj.jdbc.Driverurl: jdbc:mysql://localhost:3306/shop?useUnicodetruecharacterEncodingUTF-8serverTimezoneUTCusername: rootpassword: 123456cloud:nacos:config:file-extension: yamlserver-addr: localhost:8848discovery:#Nacos的注册地址server-addr: localhost:8848rocketmq:name-server: localhost:9876producer:group: test-group-producer
Service层
public interface OrderService extends IServiceOrder {//添加订单void add(Order order);//修改订单支付状态void pay(String id);
}Service
public class OrderServiceImpl extends ServiceImplOrderMapper, Orderimplements OrderService{AutowiredOrderMapper orderMapper;Transactional(rollbackFor Exception.class)Overridepublic void add(Order order) {order.setCreateTime(new Date());orderMapper.insert(order); //这里仅仅生成订单还有扣减库存等等一系列操作省略}Transactional(rollbackFor Exception.class)Overridepublic void pay(String id) {//模拟支付完成修改订单的支付状态Order order orderMapper.selectById(id);order.setPayStatus(1);order.setPayTime(new Date());orderMapper.updateById(order);}
}
创建生产者
RestController
Slf4j
public class TestController {AutowiredOrderMapper orderMapper;AutowiredRocketMQTemplate rocketMQTemplate;RequestMapping(/send)public String send(){String id UUID.randomUUID().toString();String msg 订单id支付成功;Order ordernew Order();order.setId(id);order.setCreateTime(new Date());order.setMoneys(100);order.setUsername(张三);MessageString message MessageBuilder.withPayload(msg).setHeader(key,id).build();TransactionSendResult result rocketMQTemplate.sendMessageInTransaction(order, message, order);String transactionId result.getTransactionId();String status result.getSendStatus().name();log.info(发送消息成功 transactionId{} status{} ,transactionId,status);return success;}
}
创建消费者
Component
Slf4j
RocketMQMessageListener(consumerGroup test-consumer,topic order,messageModel MessageModel.CLUSTERING)
public class RocketMQListen implements RocketMQListenerMessageExt {Overridepublic void onMessage(MessageExt messageExt) {String body new String(messageExt.getBody(), StandardCharsets.UTF_8);System.out.println(body);}
}
生产者消息监听器
Component
RocketMQTransactionListener
public class TransactionMsgListener implements RocketMQLocalTransactionListener {AutowiredOrderService orderService;Overridepublic RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {Order order (Order) o;try {//生成订单orderService.add(order);return RocketMQLocalTransactionState.UNKNOWN;}catch (Throwable throwable){throwable.printStackTrace();return RocketMQLocalTransactionState.ROLLBACK;}}Overridepublic RocketMQLocalTransactionState checkLocalTransaction(Message message) {String key message.getHeaders().get(key).toString();System.out.println(回查订单id key 回查时间new Date());Order order orderService.getById(key);if(order!null) {long l new Date().getTime() - order.getCreateTime().getTime();long time l / (1000 * 60);//超时1分钟后就会把未支付的订单进行删除if (time 1) {orderService.removeById(key);System.out.println(订单 key 删除);//订单库存等一系列操作return RocketMQLocalTransactionState.ROLLBACK;}Integer payStatus order.getPayStatus();if (payStatus 1) {return RocketMQLocalTransactionState.COMMIT;}return RocketMQLocalTransactionState.UNKNOWN;}elsereturn RocketMQLocalTransactionState.ROLLBACK;}
}
测试
这里通过生产者发送五个事务消息生成五个订单然后两个订单在一分钟内修改支付状态为已支付超时一分钟未支付就会删除订单回退。运行截图如下